Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / nanoarrow   python

Repository URL to install this package:

/ src / nanoarrow / iterator.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import warnings
from functools import cached_property
from itertools import islice, repeat
from typing import Iterable, Tuple

from nanoarrow._array import CArrayView
from nanoarrow.c_array_stream import c_array_stream
from nanoarrow.c_schema import c_schema, c_schema_view
from nanoarrow.schema import Schema

from nanoarrow import _types


def iter_py(obj, schema=None) -> Iterable:
    """Iterate over items in zero or more arrays

    Returns an iterator over an array stream where each item is a
    Python representation of the next element.

    Paramters
    ---------
    obj : array stream-like
        An array-like or array stream-like object as sanitized by
        :func:`c_array_stream`.
    schema : schema-like, optional
        An optional schema, passed to :func:`c_array_stream`.

    Examples
    --------

    >>> import nanoarrow as na
    >>> from nanoarrow import iterator
    >>> array = na.c_array([1, 2, 3], na.int32())
    >>> list(iterator.iter_py(array))
    [1, 2, 3]
    """
    return PyIterator.get_iterator(obj, schema=schema)


def iter_tuples(obj, schema=None) -> Iterable[Tuple]:
    """Iterate over rows in zero or more struct arrays

    Returns an iterator over an array stream of struct arrays (i.e.,
    record batches) where each item is a tuple of the items in each
    row. This is different than :func:`iter_py`, which encodes struct
    columns as dictionaries.

    Paramters
    ---------
    obj : array stream-like
        An array-like or array stream-like object as sanitized by
        :func:`c_array_stream`.
    schema : schema-like, optional
        An optional schema, passed to :func:`c_array_stream`.

    Examples
    --------

    >>> import nanoarrow as na
    >>> from nanoarrow import iterator
    >>> import pyarrow as pa
    >>> array = pa.record_batch([pa.array([1, 2, 3])], names=["col1"])
    >>> list(iterator.iter_tuples(array))
    [(1,), (2,), (3,)]
    """
    return RowTupleIterator.get_iterator(obj, schema=schema)


def iter_array_views(obj, schema=None) -> Iterable[CArrayView]:
    """Iterate over prepared views of each array

    Returns an iterator which yields a :func:`c_array_view`
    for each chunk in ``obj``.

    Paramters
    ---------
    obj : array stream-like
        An array-like or array stream-like object as sanitized by
        :func:`c_array_stream`.
    schema : schema-like, optional
        An optional schema, passed to :func:`c_array_stream`.

    Examples
    --------

    >>> import nanoarrow as na
    >>> from nanoarrow import iterator
    >>> array = na.c_array([1, 2, 3], na.int32())
    >>> list(iterator.iter_array_views(array))
    [<nanoarrow.c_array.CArrayView>
    - storage_type: 'int32'
    - length: 3
    - offset: 0
    - null_count: 0
    - buffers[2]:
      - validity <bool[0 b] >
      - data <int32[12 b] 1 2 3>
    - dictionary: NULL
    - children[0]:]
    """
    with c_array_stream(obj, schema) as stream:
        for array in stream:
            yield array.view()


class InvalidArrayWarning(UserWarning):
    pass


class LossyConversionWarning(UserWarning):
    pass


class UnregisteredExtensionWarning(UserWarning):
    pass


class ArrayViewBaseIterator:
    """Base class for iterators and visitors that use an internal ArrowArrayView
    as the basis for conversion to Python objects. Intended for internal use.
    """

    def __init__(self, schema, *, array_view=None):
        self._schema = c_schema(schema)
        self._schema_view = c_schema_view(schema)

        if array_view is None:
            self._array_view = CArrayView.from_schema(self._schema)
        else:
            self._array_view = array_view

    @cached_property
    def schema(self) -> Schema:
        return Schema(self._schema)

    @cached_property
    def _object_label(self):
        if self._schema.name:
            return f"{self._schema.name} <{self._schema_view.type}>"
        else:
            return f"<unnamed {self._schema_view.type}>"

    def _contains_nulls(self):
        return self._schema_view.nullable and self._array_view.null_count != 0

    def _set_array(self, array):
        self._array_view._set_array(array)
        return self

    def _warn(self, message, category):
        warnings.warn(f"{self._object_label}: {message}", category)


class PyIterator(ArrayViewBaseIterator):
    """Iterate over the Python object version of values in an ArrowArrayView.
    Intended for internal use.
    """

    @classmethod
    def get_iterator(cls, obj, schema=None):
        with c_array_stream(obj, schema=schema) as stream:
            iterator = cls(stream._get_cached_schema())
            for array in stream:
                iterator._set_array(array)
                yield from iterator

    def __init__(self, schema, *, array_view=None):
        super().__init__(schema, array_view=array_view)

        self._children = list(
            map(self._make_child, self._schema.children, self._array_view.children)
        )

        if self._schema.dictionary is None:
            self._dictionary = None
        else:
            self._dictionary = self._make_child(
                self._schema.dictionary, self._array_view.dictionary
            )

    def _make_child(self, schema, array_view):
        return type(self)(schema, array_view=array_view)

    @cached_property
    def _child_names(self):
        return [child.name for child in self._schema.children]

    def __iter__(self):
        """Iterate over all elements in the current chunk"""
        return self._iter_chunk(0, len(self._array_view))

    def _iter_chunk(self, offset, length):
        """Iterate over all elements in a slice of the current chunk"""
        # Check for an extension type first since this isn't reflected by
        # self._schema_view.type_id. Currently we just return the storage
        # iterator with a warning for extension types.
        maybe_extension_name = self._schema_view.extension_name
        if maybe_extension_name:
            self._warn(
                f"Converting unregistered extension '{maybe_extension_name}' "
                "as storage type",
                UnregisteredExtensionWarning,
            )

        type_id = self._schema_view.type_id
        if type_id not in _ITEMS_ITER_LOOKUP:
            raise KeyError(
                f"Can't resolve iterator for type '{self._schema_view.type}'"
            )

        factory = getattr(self, _ITEMS_ITER_LOOKUP[type_id])
        return factory(offset, length)

    def _dictionary_iter(self, offset, length):
        dictionary = list(
            self._dictionary._iter_chunk(0, len(self._dictionary._array_view))
        )
        for dict_index in self._primitive_iter(offset, length):
            yield None if dict_index is None else dictionary[dict_index]

    def _wrap_iter_nullable(self, validity, items):
        for is_valid, item in zip(validity, items):
            yield item if is_valid else None

    def _struct_tuple_iter(self, offset, length):
        view = self._array_view
        offset += view.offset
        items = zip(*(child._iter_chunk(offset, length) for child in self._children))

        if self._contains_nulls():
            validity = view.buffer(0).elements(offset, length)
            return self._wrap_iter_nullable(validity, items)
        else:
            return items

    def _struct_iter(self, offset, length):
        names = self._child_names
        for item in self._struct_tuple_iter(offset, length):
            yield None if item is None else {key: val for key, val in zip(names, item)}

    def _list_iter(self, offset, length):
        view = self._array_view
        offset += view.offset

        offsets = memoryview(view.buffer(1))[offset : (offset + length + 1)]
        starts = offsets[:-1]
        ends = offsets[1:]
        child = self._children[0]
        child_iter = child._iter_chunk(starts[0], ends[-1] - starts[0])

        if self._contains_nulls():
            validity = view.buffer(0).elements(offset, length)
            for is_valid, start, end in zip(validity, starts, ends):
                item = list(islice(child_iter, end - start))
                yield item if is_valid else None
        else:
            for start, end in zip(starts, ends):
                yield list(islice(child_iter, end - start))

    def _fixed_size_list_iter(self, offset, length):
        view = self._array_view
        offset += view.offset
        child = self._children[0]
        fixed_size = view.layout.child_size_elements
        child_iter = child._iter_chunk(offset * fixed_size, length * fixed_size)

        if self._contains_nulls():
            validity = view.buffer(0).elements(offset, length)
            for is_valid in validity:
                item = list(islice(child_iter, fixed_size))
                yield item if is_valid else None
        else:
            for _ in range(length):
                yield list(islice(child_iter, fixed_size))

    def _string_iter(self, offset, length):
        view = self._array_view
        offset += view.offset
        offsets = memoryview(view.buffer(1))[offset : (offset + length + 1)]
        starts = offsets[:-1]
        ends = offsets[1:]
        data = memoryview(view.buffer(2))

        if self._contains_nulls():
            validity = view.buffer(0).elements(offset, length)
            for is_valid, start, end in zip(validity, starts, ends):
                yield str(data[start:end], "UTF-8") if is_valid else None
        else:
            for start, end in zip(starts, ends):
                yield str(data[start:end], "UTF-8")

    def _binary_iter(self, offset, length):
        view = self._array_view
        offset += view.offset
        offsets = memoryview(view.buffer(1))[offset : (offset + length + 1)]
        starts = offsets[:-1]
        ends = offsets[1:]
        data = memoryview(view.buffer(2))

        if self._contains_nulls():
            validity = view.buffer(0).elements(offset, length)
            for is_valid, start, end in zip(validity, starts, ends):
                yield bytes(data[start:end]) if is_valid else None
        else:
            for start, end in zip(starts, ends):
                yield bytes(data[start:end])

    def _decimal_iter(self, offset, length):
        from decimal import Context, Decimal
        from sys import byteorder

        storage = self._primitive_iter(offset, length)
        precision = self._schema_view.decimal_precision

        # The approach here it to use Decimal(<integer>).scaleb(-scale),
        # which is a balance between simplicity, performance, and
        # safety (ensuring that we stay independent from the global precision).
        # We cache the scaleb and context to avoid doing so in the loop (the
        # argument to scaleb is transformed to a decimal by the .scaleb()
        # implementation).
        #
        # It would probably be fastest to go straight from binary
        # to string to decimal, since creating a decimal from a string
        # appears to be the fastest constructor.
        scaleb = Decimal(-self._schema_view.decimal_scale)
        context = Context(prec=precision)

        for item in storage:
Loading ...