# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import warnings
from functools import cached_property
from itertools import islice, repeat
from typing import Iterable, Tuple
from nanoarrow._array import CArrayView
from nanoarrow.c_array_stream import c_array_stream
from nanoarrow.c_schema import c_schema, c_schema_view
from nanoarrow.schema import Schema
from nanoarrow import _types
def iter_py(obj, schema=None) -> Iterable:
"""Iterate over items in zero or more arrays
Returns an iterator over an array stream where each item is a
Python representation of the next element.
Paramters
---------
obj : array stream-like
An array-like or array stream-like object as sanitized by
:func:`c_array_stream`.
schema : schema-like, optional
An optional schema, passed to :func:`c_array_stream`.
Examples
--------
>>> import nanoarrow as na
>>> from nanoarrow import iterator
>>> array = na.c_array([1, 2, 3], na.int32())
>>> list(iterator.iter_py(array))
[1, 2, 3]
"""
return PyIterator.get_iterator(obj, schema=schema)
def iter_tuples(obj, schema=None) -> Iterable[Tuple]:
"""Iterate over rows in zero or more struct arrays
Returns an iterator over an array stream of struct arrays (i.e.,
record batches) where each item is a tuple of the items in each
row. This is different than :func:`iter_py`, which encodes struct
columns as dictionaries.
Paramters
---------
obj : array stream-like
An array-like or array stream-like object as sanitized by
:func:`c_array_stream`.
schema : schema-like, optional
An optional schema, passed to :func:`c_array_stream`.
Examples
--------
>>> import nanoarrow as na
>>> from nanoarrow import iterator
>>> import pyarrow as pa
>>> array = pa.record_batch([pa.array([1, 2, 3])], names=["col1"])
>>> list(iterator.iter_tuples(array))
[(1,), (2,), (3,)]
"""
return RowTupleIterator.get_iterator(obj, schema=schema)
def iter_array_views(obj, schema=None) -> Iterable[CArrayView]:
"""Iterate over prepared views of each array
Returns an iterator which yields a :func:`c_array_view`
for each chunk in ``obj``.
Paramters
---------
obj : array stream-like
An array-like or array stream-like object as sanitized by
:func:`c_array_stream`.
schema : schema-like, optional
An optional schema, passed to :func:`c_array_stream`.
Examples
--------
>>> import nanoarrow as na
>>> from nanoarrow import iterator
>>> array = na.c_array([1, 2, 3], na.int32())
>>> list(iterator.iter_array_views(array))
[<nanoarrow.c_array.CArrayView>
- storage_type: 'int32'
- length: 3
- offset: 0
- null_count: 0
- buffers[2]:
- validity <bool[0 b] >
- data <int32[12 b] 1 2 3>
- dictionary: NULL
- children[0]:]
"""
with c_array_stream(obj, schema) as stream:
for array in stream:
yield array.view()
class InvalidArrayWarning(UserWarning):
pass
class LossyConversionWarning(UserWarning):
pass
class UnregisteredExtensionWarning(UserWarning):
pass
class ArrayViewBaseIterator:
"""Base class for iterators and visitors that use an internal ArrowArrayView
as the basis for conversion to Python objects. Intended for internal use.
"""
def __init__(self, schema, *, array_view=None):
self._schema = c_schema(schema)
self._schema_view = c_schema_view(schema)
if array_view is None:
self._array_view = CArrayView.from_schema(self._schema)
else:
self._array_view = array_view
@cached_property
def schema(self) -> Schema:
return Schema(self._schema)
@cached_property
def _object_label(self):
if self._schema.name:
return f"{self._schema.name} <{self._schema_view.type}>"
else:
return f"<unnamed {self._schema_view.type}>"
def _contains_nulls(self):
return self._schema_view.nullable and self._array_view.null_count != 0
def _set_array(self, array):
self._array_view._set_array(array)
return self
def _warn(self, message, category):
warnings.warn(f"{self._object_label}: {message}", category)
class PyIterator(ArrayViewBaseIterator):
"""Iterate over the Python object version of values in an ArrowArrayView.
Intended for internal use.
"""
@classmethod
def get_iterator(cls, obj, schema=None):
with c_array_stream(obj, schema=schema) as stream:
iterator = cls(stream._get_cached_schema())
for array in stream:
iterator._set_array(array)
yield from iterator
def __init__(self, schema, *, array_view=None):
super().__init__(schema, array_view=array_view)
self._children = list(
map(self._make_child, self._schema.children, self._array_view.children)
)
if self._schema.dictionary is None:
self._dictionary = None
else:
self._dictionary = self._make_child(
self._schema.dictionary, self._array_view.dictionary
)
def _make_child(self, schema, array_view):
return type(self)(schema, array_view=array_view)
@cached_property
def _child_names(self):
return [child.name for child in self._schema.children]
def __iter__(self):
"""Iterate over all elements in the current chunk"""
return self._iter_chunk(0, len(self._array_view))
def _iter_chunk(self, offset, length):
"""Iterate over all elements in a slice of the current chunk"""
# Check for an extension type first since this isn't reflected by
# self._schema_view.type_id. Currently we just return the storage
# iterator with a warning for extension types.
maybe_extension_name = self._schema_view.extension_name
if maybe_extension_name:
self._warn(
f"Converting unregistered extension '{maybe_extension_name}' "
"as storage type",
UnregisteredExtensionWarning,
)
type_id = self._schema_view.type_id
if type_id not in _ITEMS_ITER_LOOKUP:
raise KeyError(
f"Can't resolve iterator for type '{self._schema_view.type}'"
)
factory = getattr(self, _ITEMS_ITER_LOOKUP[type_id])
return factory(offset, length)
def _dictionary_iter(self, offset, length):
dictionary = list(
self._dictionary._iter_chunk(0, len(self._dictionary._array_view))
)
for dict_index in self._primitive_iter(offset, length):
yield None if dict_index is None else dictionary[dict_index]
def _wrap_iter_nullable(self, validity, items):
for is_valid, item in zip(validity, items):
yield item if is_valid else None
def _struct_tuple_iter(self, offset, length):
view = self._array_view
offset += view.offset
items = zip(*(child._iter_chunk(offset, length) for child in self._children))
if self._contains_nulls():
validity = view.buffer(0).elements(offset, length)
return self._wrap_iter_nullable(validity, items)
else:
return items
def _struct_iter(self, offset, length):
names = self._child_names
for item in self._struct_tuple_iter(offset, length):
yield None if item is None else {key: val for key, val in zip(names, item)}
def _list_iter(self, offset, length):
view = self._array_view
offset += view.offset
offsets = memoryview(view.buffer(1))[offset : (offset + length + 1)]
starts = offsets[:-1]
ends = offsets[1:]
child = self._children[0]
child_iter = child._iter_chunk(starts[0], ends[-1] - starts[0])
if self._contains_nulls():
validity = view.buffer(0).elements(offset, length)
for is_valid, start, end in zip(validity, starts, ends):
item = list(islice(child_iter, end - start))
yield item if is_valid else None
else:
for start, end in zip(starts, ends):
yield list(islice(child_iter, end - start))
def _fixed_size_list_iter(self, offset, length):
view = self._array_view
offset += view.offset
child = self._children[0]
fixed_size = view.layout.child_size_elements
child_iter = child._iter_chunk(offset * fixed_size, length * fixed_size)
if self._contains_nulls():
validity = view.buffer(0).elements(offset, length)
for is_valid in validity:
item = list(islice(child_iter, fixed_size))
yield item if is_valid else None
else:
for _ in range(length):
yield list(islice(child_iter, fixed_size))
def _string_iter(self, offset, length):
view = self._array_view
offset += view.offset
offsets = memoryview(view.buffer(1))[offset : (offset + length + 1)]
starts = offsets[:-1]
ends = offsets[1:]
data = memoryview(view.buffer(2))
if self._contains_nulls():
validity = view.buffer(0).elements(offset, length)
for is_valid, start, end in zip(validity, starts, ends):
yield str(data[start:end], "UTF-8") if is_valid else None
else:
for start, end in zip(starts, ends):
yield str(data[start:end], "UTF-8")
def _binary_iter(self, offset, length):
view = self._array_view
offset += view.offset
offsets = memoryview(view.buffer(1))[offset : (offset + length + 1)]
starts = offsets[:-1]
ends = offsets[1:]
data = memoryview(view.buffer(2))
if self._contains_nulls():
validity = view.buffer(0).elements(offset, length)
for is_valid, start, end in zip(validity, starts, ends):
yield bytes(data[start:end]) if is_valid else None
else:
for start, end in zip(starts, ends):
yield bytes(data[start:end])
def _decimal_iter(self, offset, length):
from decimal import Context, Decimal
from sys import byteorder
storage = self._primitive_iter(offset, length)
precision = self._schema_view.decimal_precision
# The approach here it to use Decimal(<integer>).scaleb(-scale),
# which is a balance between simplicity, performance, and
# safety (ensuring that we stay independent from the global precision).
# We cache the scaleb and context to avoid doing so in the loop (the
# argument to scaleb is transformed to a decimal by the .scaleb()
# implementation).
#
# It would probably be fastest to go straight from binary
# to string to decimal, since creating a decimal from a string
# appears to be the fastest constructor.
scaleb = Decimal(-self._schema_view.decimal_scale)
context = Context(prec=precision)
for item in storage:
Loading ...