# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import array
from datetime import date, datetime, timezone
import pytest
from nanoarrow._array import CArrayBuilder
from nanoarrow._utils import NanoarrowException
from nanoarrow.c_schema import c_schema_view
import nanoarrow as na
from nanoarrow import device
def test_c_array_from_c_array():
c_array = na.c_array([1, 2, 3], na.int32())
c_array_from_c_array = na.c_array(c_array)
assert c_array_from_c_array.length == c_array.length
assert c_array_from_c_array.buffers == c_array.buffers
assert list(c_array.view().buffer(1)) == [1, 2, 3]
def test_c_array_from_capsule_protocol():
class CArrayWrapper:
def __init__(self, obj):
self.obj = obj
def __arrow_c_array__(self, *args, **kwargs):
return self.obj.__arrow_c_array__(*args, **kwargs)
c_array = na.c_array([1, 2, 3], na.int32())
c_array_wrapper = CArrayWrapper(c_array)
c_array_from_protocol = na.c_array(c_array_wrapper)
assert c_array_from_protocol.length == c_array.length
assert c_array_from_protocol.buffers == c_array.buffers
assert list(c_array_from_protocol.view().buffer(1)) == [1, 2, 3]
def test_c_array_from_old_pyarrow():
# Simulate a pyarrow Array with no __arrow_c_array__
class MockLegacyPyarrowArray:
def __init__(self, obj):
self.obj = obj
def _export_to_c(self, *args):
return self.obj._export_to_c(*args)
MockLegacyPyarrowArray.__module__ = "pyarrow.lib"
pa = pytest.importorskip("pyarrow")
array = MockLegacyPyarrowArray(pa.array([1, 2, 3], pa.int32()))
c_array = na.c_array(array)
assert c_array.length == 3
assert c_array.schema.format == "i"
assert list(c_array.view().buffer(1)) == [1, 2, 3]
# Make sure that this heuristic won't result in trying to import
# something else that has an _export_to_c method
with pytest.raises(TypeError, match="Can't resolve ArrayBuilder"):
not_array = pa.int32()
assert hasattr(not_array, "_export_to_c")
na.c_array(not_array)
def test_c_array_from_bare_capsule():
c_array = na.c_array([1, 2, 3], na.int32())
# Check from bare capsule without supplying a schema
schema_capsule, array_capsule = c_array.__arrow_c_array__()
del schema_capsule
c_array_from_capsule = na.c_array(array_capsule)
assert c_array_from_capsule.length == c_array.length
assert c_array_from_capsule.buffers == c_array.buffers
# Check from bare capsule supplying a schema
schema_capsule, array_capsule = c_array.__arrow_c_array__()
c_array_from_capsule = na.c_array(array_capsule, schema_capsule)
assert c_array_from_capsule.length == c_array.length
assert c_array_from_capsule.buffers == c_array.buffers
assert list(c_array_from_capsule.view().buffer(1)) == [1, 2, 3]
def test_c_array_type_not_supported():
msg = "Can't resolve ArrayBuilder for object of type NoneType"
with pytest.raises(TypeError, match=msg):
na.c_array(None)
def test_c_array_slice():
array = na.c_array([1, 2, 3], na.int32())
assert array.offset == 0
assert array.length == 3
array2 = array[:]
assert array.offset == 0
assert array.length == 3
assert array.buffers == array2.buffers
array2 = array[:2]
assert array2.offset == 0
assert array2.length == 2
array2 = array[:-1]
assert array2.offset == 0
assert array2.length == 2
array2 = array[1:]
assert array2.offset == 1
assert array2.length == 2
array2 = array[-2:]
assert array2.offset == 1
assert array2.length == 2
def test_c_array_slice_errors():
array = na.c_array([1, 2, 3], na.int32())
with pytest.raises(TypeError):
array[None]
with pytest.raises(IndexError):
array[4:]
with pytest.raises(IndexError):
array[:4]
with pytest.raises(IndexError):
array[1:0]
def test_c_array_shallow_copy():
import gc
import platform
from nanoarrow._utils import get_pyobject_buffer_count
if platform.python_implementation() == "PyPy":
pytest.skip(
"Reference counting/garbage collection is non-deterministic on PyPy"
)
gc.collect()
initial_ref_count = get_pyobject_buffer_count()
# Create an array with children
array = na.c_array_from_buffers(
na.struct({"col1": na.int32(), "col2": na.int64()}),
3,
[None],
children=[na.c_array([1, 2, 3], na.int32()), na.c_array([4, 5, 6], na.int32())],
move=True,
)
# The move=True should have prevented a shallow copy of the children
# when constructing the array.
assert get_pyobject_buffer_count() == initial_ref_count
# Force a shallow copy via the array protocol and ensure we saved
# references to two additional buffers.
_, col1_capsule = array.child(0).__arrow_c_array__()
assert get_pyobject_buffer_count() == (initial_ref_count + 1)
_, col2_capsule = array.child(1).__arrow_c_array__()
assert get_pyobject_buffer_count() == (initial_ref_count + 2)
# Ensure that the references can be removed
del col1_capsule
assert get_pyobject_buffer_count() == (initial_ref_count + 1)
del col2_capsule
assert get_pyobject_buffer_count() == initial_ref_count
def test_c_array_builder_init():
builder = CArrayBuilder.allocate()
with pytest.raises(RuntimeError, match="CArrayBuilder is not initialized"):
builder.is_empty()
builder.init_from_type(na.Type.INT32.value)
assert builder.is_empty()
with pytest.raises(RuntimeError, match="CArrayBuilder is already initialized"):
builder.init_from_type(na.Type.INT32.value)
with pytest.raises(RuntimeError, match="CArrayBuilder is already initialized"):
builder.init_from_schema(na.c_schema(na.int32()))
def test_c_array_from_pybuffer_uint8():
data = b"abcdefg"
c_array = na.c_array(data)
assert c_array.length == len(data)
assert c_array.null_count == 0
assert c_array.offset == 0
assert c_schema_view(c_array.schema).type == "uint8"
c_array_view = c_array.view()
assert list(c_array_view.buffer(1)) == list(data)
def test_c_array_from_pybuffer_string():
data = b"abcdefg"
buffer = na.c_buffer(data)._set_format("c")
c_array = na.c_array(buffer)
assert c_array.length == len(data)
assert c_array.null_count == 0
assert c_array.offset == 0
assert c_schema_view(c_array.schema).type == "int8"
c_array_view = c_array.view()
assert list(c_array_view.buffer(1)) == list(data)
def test_c_array_from_pybuffer_fixed_size_binary():
items = [b"abcd", b"efgh", b"ijkl"]
packed = b"".join(items)
buffer = na.c_buffer(packed)._set_format("4s")
c_array = na.c_array(buffer)
assert c_array.length == len(items)
assert c_array.null_count == 0
assert c_array.offset == 0
assert c_schema_view(c_array.schema).type == "fixed_size_binary"
assert c_schema_view(c_array.schema).fixed_size == 4
c_array_view = c_array.view()
assert list(c_array_view.buffer(1)) == items
def test_c_array_from_pybuffer_numpy():
np = pytest.importorskip("numpy")
data = np.array([1, 2, 3], dtype=np.int32)
c_array = na.c_array(data)
assert c_array.length == len(data)
assert c_array.null_count == 0
assert c_array.offset == 0
assert c_schema_view(c_array.schema).type == "int32"
c_array_view = c_array.view()
assert list(c_array_view.buffer(1)) == list(data)
def test_c_array_from_iterable_empty():
empty_string = na.c_array([], na.string())
assert empty_string.length == 0
assert empty_string.null_count == 0
assert empty_string.offset == 0
assert empty_string.n_buffers == 3
array_view = empty_string.view()
assert len(array_view.buffer(0)) == 0
assert len(array_view.buffer(1)) == 0
assert len(array_view.buffer(2)) == 0
def test_c_array_from_iterable_string():
string = na.c_array(["abc", None, "defg"], na.string())
assert string.length == 3
assert string.null_count == 1
array_view = string.view()
assert len(array_view.buffer(0)) == 1
assert len(array_view.buffer(1)) == 4
assert len(array_view.buffer(2)) == 7
# Check an item that is not a str()
with pytest.raises(ValueError):
na.c_array([b"1234"], na.string())
def test_c_array_from_iterable_string_view():
string = na.c_array(
["abc", None, "a string longer than 12 bytes"], na.string_view()
)
assert string.length == 3
assert string.null_count == 1
assert string.n_buffers == 4
array_view = string.view()
assert len(array_view.buffer(0)) == 1
assert bytes(array_view.buffer(2)) == b"a string longer than 12 bytes"
assert list(array_view.buffer(3)) == [len("a string longer than 12 bytes")]
# Make sure this also works when all strings are inlined (i.e., no variadic buffers)
string = na.c_array(["abc", None, "short string"], na.string_view())
assert string.length == 3
assert string.null_count == 1
assert string.n_buffers == 3
array_view = string.view()
assert len(array_view.buffer(0)) == 1
assert len(array_view.buffer(1)) == 3
assert len(bytes(array_view.buffer(1))) == 3 * 16
assert list(array_view.buffer(2)) == []
def test_c_array_from_iterable_bytes():
string = na.c_array([b"abc", None, b"defg"], na.binary())
assert string.length == 3
assert string.null_count == 1
array_view = string.view()
assert len(array_view.buffer(0)) == 1
assert len(array_view.buffer(1)) == 4
assert len(array_view.buffer(2)) == 7
with pytest.raises(ValueError):
na.c_array(["1234"], na.binary())
buf_not_bytes = na.c_buffer([1, 2, 3], na.int32())
with pytest.raises(ValueError, match="Can't append buffer with itemsize != 1"):
na.c_array([buf_not_bytes], na.binary())
np = pytest.importorskip("numpy")
buf_2d = np.ones((2, 2))
with pytest.raises(ValueError, match="Can't append buffer with dimensions != 1"):
na.c_array([buf_2d], na.binary())
def test_c_array_from_iterable_view():
string = na.c_array(
[b"abc", None, b"a string longer than 12 bytes"], na.binary_view()
)
assert string.length == 3
assert string.null_count == 1
Loading ...