# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: language_level = 3
from libc.stdint cimport uintptr_t, int64_t
from cpython.pycapsule cimport PyCapsule_GetPointer
from nanoarrow_c cimport (
ArrowArray,
ArrowArrayStream,
ArrowArrayStreamGetNext,
ArrowArrayStreamGetSchema,
ArrowArrayStreamMove,
ArrowBasicArrayStreamInit,
ArrowBasicArrayStreamSetArray,
ArrowBasicArrayStreamValidate,
ArrowBufferAppendInt64,
ArrowResolveChunk64,
ArrowType,
)
from nanoarrow cimport _types
from nanoarrow._array cimport CArray
from nanoarrow._buffer cimport CBuffer
from nanoarrow._schema cimport CSchema, assert_type_equal
from nanoarrow._utils cimport (
alloc_c_array_stream,
c_array_shallow_copy,
Error
)
from typing import Iterable, List, Tuple
from nanoarrow import _repr_utils
cdef class CArrayStream:
"""Low-level ArrowArrayStream wrapper
This object is a literal wrapper around an ArrowArrayStream. It provides methods that
that wrap the underlying C callbacks and handles the C Data interface lifecycle
(i.e., initialized ArrowArrayStream structures are always released).
See `nanoarrow.c_array_stream()` for construction and usage examples.
"""
cdef object _base
cdef ArrowArrayStream* _ptr
cdef object _cached_schema
def __cinit__(self, object base, uintptr_t addr):
self._base = base
self._ptr = <ArrowArrayStream*>addr
self._cached_schema = None
@staticmethod
def allocate() -> CArrayStream:
"""Allocate a released ArrowArrayStream"""
cdef ArrowArrayStream* c_array_stream_out
base = alloc_c_array_stream(&c_array_stream_out)
return CArrayStream(base, <uintptr_t>c_array_stream_out)
@staticmethod
def from_c_arrays(arrays: List[CArray], CSchema schema, move=False, validate=True) -> CArrayStream:
"""Create an ArrowArrayStream from an existing set of arrays
Given a previously resolved list of arrays, create an ArrowArrayStream
representation of the sequence of chunks.
Parameters
----------
arrays : List[CArray]
A list of arrays to use as batches.
schema : CSchema
The schema that will be returned. Must be type equal with the schema
of each array (this is checked if validate is ``True``)
move : bool, optional
If True, transfer ownership from each array instead of creating a
shallow copy. This is only safe if the caller knows the origin of the
arrays and knows that they will not be accessed after this stream has been
created.
validate : bool, optional
If True, enforce type equality between the provided schema and the schema
of each array.
"""
cdef ArrowArrayStream* c_array_stream_out
base = alloc_c_array_stream(&c_array_stream_out)
# Don't create more copies than we have to (but make sure
# one exists for validation if requested)
cdef CSchema out_schema = schema
if validate and not move:
validate_schema = schema
out_schema = schema.__deepcopy__()
elif validate:
validate_schema = schema.__deepcopy__()
out_schema = schema
elif not move:
out_schema = schema.__deepcopy__()
cdef int code = ArrowBasicArrayStreamInit(c_array_stream_out, out_schema._ptr, len(arrays))
Error.raise_error_not_ok("ArrowBasicArrayStreamInit()", code)
cdef ArrowArray tmp
cdef CArray array
for i in range(len(arrays)):
array = arrays[i]
if validate:
assert_type_equal(array.schema, validate_schema, False)
if not move:
c_array_shallow_copy(array._base, array._ptr, &tmp)
ArrowBasicArrayStreamSetArray(c_array_stream_out, i, &tmp)
else:
ArrowBasicArrayStreamSetArray(c_array_stream_out, i, array._ptr)
cdef Error error = Error()
if validate:
code = ArrowBasicArrayStreamValidate(c_array_stream_out, &error.c_error)
error.raise_message_not_ok("ArrowBasicArrayStreamValidate()", code)
return CArrayStream(base, <uintptr_t>c_array_stream_out)
def release(self):
"""Explicitly call the release callback of this stream"""
if self.is_valid():
self._ptr.release(self._ptr)
@staticmethod
def _import_from_c_capsule(stream_capsule) -> CArrayStream:
"""Import from a ArrowArrayStream PyCapsule.
Parameters
----------
stream_capsule : PyCapsule
A valid PyCapsule with name 'arrow_array_stream' containing an
ArrowArrayStream pointer.
"""
return CArrayStream(
stream_capsule,
<uintptr_t>PyCapsule_GetPointer(stream_capsule, 'arrow_array_stream')
)
def __arrow_c_stream__(self, requested_schema=None):
"""
Export the stream as an Arrow C stream PyCapsule.
Parameters
----------
requested_schema : PyCapsule | None
A PyCapsule containing a C ArrowSchema representation of a requested
schema. Not supported.
Returns
-------
PyCapsule
"""
self._assert_valid()
if requested_schema is not None:
raise NotImplementedError("requested_schema")
cdef:
ArrowArrayStream* c_array_stream_out
array_stream_capsule = alloc_c_array_stream(&c_array_stream_out)
ArrowArrayStreamMove(self._ptr, c_array_stream_out)
return array_stream_capsule
def _addr(self) -> int:
"""test to see if this causes a ci fail"""
return <uintptr_t>self._ptr
def is_valid(self) -> bool:
"""Check for a non-null and non-released underlying ArrowArrayStream"""
return self._ptr != NULL and self._ptr.release != NULL
def _assert_valid(self):
if self._ptr == NULL:
raise RuntimeError("array stream pointer is NULL")
if self._ptr.release == NULL:
raise RuntimeError("array stream is released")
def _get_schema(self, CSchema schema):
self._assert_valid()
cdef Error error = Error()
cdef int code = ArrowArrayStreamGetSchema(self._ptr, schema._ptr, &error.c_error)
error.raise_message_not_ok("ArrowArrayStream::get_schema()", code)
def _get_cached_schema(self):
if self._cached_schema is None:
self._cached_schema = CSchema.allocate()
self._get_schema(self._cached_schema)
return self._cached_schema
def get_schema(self) -> CSchema:
"""Get the schema associated with this stream
Calling this method will always issue a call to the underlying stream's
get_schema callback.
"""
out = CSchema.allocate()
self._get_schema(out)
return out
def get_next(self) -> CArray:
"""Get the next Array from this stream
Raises StopIteration when there are no more arrays in this stream.
"""
self._assert_valid()
# We return a reference to the same Python object for each
# Array that is returned. This is independent of get_schema(),
# which is guaranteed to call the C object's callback and
# faithfully pass on the returned value.
cdef Error error = Error()
cdef CArray array = CArray.allocate(self._get_cached_schema())
cdef int code = ArrowArrayStreamGetNext(self._ptr, array._ptr, &error.c_error)
error.raise_message_not_ok("ArrowArrayStream::get_next()", code)
if not array.is_valid():
raise StopIteration()
else:
return array
def __iter__(self):
return self
def __next__(self):
return self.get_next()
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.release()
def __repr__(self):
return _repr_utils.array_stream_repr(self)
cdef class CMaterializedArrayStream:
"""Optimized representation of a fully consumed ArrowArrayStream
This class provides a data structure similar to pyarrow's ChunkedArray
where each consumed array is a referenced-counted shared array. This
class wraps the utilities provided by the nanoarrow C library to iterate
over and facilitate log(n) random access to items in this container.
"""
cdef CSchema _schema
cdef CBuffer _array_ends
cdef list _arrays
cdef int64_t _total_length
def __cinit__(self):
self._arrays = []
self._total_length = 0
self._schema = CSchema.allocate()
self._array_ends = CBuffer.empty()
cdef int code = ArrowBufferAppendInt64(self._array_ends._ptr, 0)
Error.raise_error_not_ok("ArrowBufferAppendInt64()", code)
cdef _finalize(self):
self._array_ends._set_data_type(<ArrowType>_types.INT64)
@property
def schema(self) -> CSchema:
return self._schema
def __getitem__(self, k) -> Tuple[CArray, int]:
cdef int64_t kint
cdef int array_i
cdef const int64_t* sorted_offsets = <int64_t*>self._array_ends._ptr.data
if isinstance(k, slice):
raise NotImplementedError("index with slice")
kint = k
if kint < 0:
kint += self._total_length
if kint < 0 or kint >= self._total_length:
raise IndexError(f"Index {kint} is out of range")
array_i = ArrowResolveChunk64(kint, sorted_offsets, 0, len(self._arrays))
kint -= sorted_offsets[array_i]
return self._arrays[array_i], kint
def __len__(self) -> int:
return self._array_ends[len(self._arrays)]
def __iter__(self) -> Iterable[Tuple[CArray, int]]:
for c_array in self._arrays:
for item_i in range(len(c_array)):
yield c_array, item_i
def array(self, int64_t i) -> CArray:
return self._arrays[i]
@property
def n_arrays(self) -> int:
return len(self._arrays)
@property
def arrays(self) -> Iterable[CArray]:
return iter(self._arrays)
def __arrow_c_stream__(self, requested_schema=None):
# When an array stream from iterable is supported, that could be used here
# to avoid unnessary shallow copies.
stream = CArrayStream.from_c_arrays(
self._arrays,
self._schema,
move=False,
validate=False
)
return stream.__arrow_c_stream__(requested_schema=requested_schema)
def child(self, int64_t i) -> CMaterializedArrayStream:
cdef CMaterializedArrayStream out = CMaterializedArrayStream()
cdef int code
out._schema = self._schema.child(i)
out._arrays = [chunk.child(i) for chunk in self._arrays]
for child_chunk in out._arrays:
out._total_length += len(child_chunk)
Loading ...