# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any, Callable, List, Sequence, Tuple, Union
from nanoarrow._array import CArrayView
from nanoarrow._buffer import CBuffer, CBufferBuilder
from nanoarrow.c_array_stream import c_array_stream
from nanoarrow.c_schema import c_schema_view
from nanoarrow.iterator import ArrayViewBaseIterator, PyIterator
from nanoarrow.schema import Type
from nanoarrow import _types
class ArrayViewVisitable:
"""Mixin class providing conversion methods based on visitors
Can be used with classes that implement ``__arrow_c_stream__()``
or ``__arrow_c_array__()``.
"""
def to_pylist(self) -> List:
"""Convert to a ``list`` of Python objects
Computes an identical value to ``list(iter_py())`` but can be much
faster.
Examples
--------
>>> import nanoarrow as na
>>> from nanoarrow import visitor
>>> array = na.Array([1, 2, 3], na.int32())
>>> array.to_pylist()
[1, 2, 3]
"""
return ToPyListConverter.visit(self)
def to_columns_pysequence(
self, *, handle_nulls=None
) -> Tuple[List[str], List[Sequence]]:
"""Convert to a ``list`` of contiguous sequences
Experimentally converts a stream of struct arrays into a list of contiguous
sequences using the same logic as :meth:`to_pysequence`.
Paramters
---------
handle_nulls : callable
A function returning a sequence based on a validity bytemap and a
contiguous buffer of values. If the array contains no nulls, the
validity bytemap will be ``None``. Built-in handlers include
:func:`nulls_as_sentinel`, :func:`nulls_forbid`, and
:func:`nulls_separate`). The default value is :func:`nulls_forbid`.
Examples
--------
>>> import nanoarrow as na
>>> import pyarrow as pa
>>> batch = pa.record_batch({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
>>> names, columns = na.Array(batch).to_columns_pysequence()
>>> names
['col1', 'col2']
>>> columns
[nanoarrow.c_buffer.CBuffer(int64[24 b] 1 2 3), ['a', 'b', 'c']]
"""
return ToColumnsPysequenceConverter.visit(self, handle_nulls=handle_nulls)
def to_pysequence(self, *, handle_nulls=None) -> Sequence:
"""Convert to a contiguous sequence
Experimentally converts a stream of arrays into a columnar representation
such that each column is either a contiguous buffer or a ``list``.
Integer, float, and interval arrays are currently converted to their
contiguous buffer representation; other types are returned as a list
of Python objects. The sequences returned by :meth:`to_pysequence` are
designed to work as input to ``pandas.Series`` and/or ``numpy.array()``.
The default conversions are subject to change based on initial user
feedback.
Parameters
----------
handle_nulls : callable
A function returning a sequence based on a validity bytemap and a
contiguous buffer of values. If the array contains no nulls, the
validity bytemap will be ``None``. Built-in handlers include
:func:`nulls_as_sentinel`, :func:`nulls_forbid`, and
:func:`nulls_separate`). The default value is :func:`nulls_forbid`.
Examples
--------
>>> import nanoarrow as na
>>> na.Array([1, 2, 3], na.int32()).to_pysequence()
nanoarrow.c_buffer.CBuffer(int32[12 b] 1 2 3)
"""
return ToPySequenceConverter.visit(self, handle_nulls=handle_nulls)
def nulls_forbid() -> Callable[[CBuffer, Sequence], Sequence]:
"""Erroring null handler
A null handler that errors when it encounters nulls.
Examples
--------
>>> import nanoarrow as na
>>> na.Array([1, 2, 3], na.int32()).to_pysequence(handle_nulls=na.nulls_forbid())
nanoarrow.c_buffer.CBuffer(int32[12 b] 1 2 3)
>>> na.Array([1, None, 3], na.int32()).to_pysequence(handle_nulls=na.nulls_forbid())
Traceback (most recent call last):
...
ValueError: Null present with null_handler=nulls_forbid()
"""
def handle(is_valid, data):
# the is_valid bytemap is only created if there was at least one null
if is_valid is not None:
raise ValueError("Null present with null_handler=nulls_forbid()")
return data
return handle
def nulls_as_sentinel(sentinel=None):
"""Sentinel null handler
A null handler that assigns a sentinel to null values. This is
done using numpy using the expression ``data[~is_valid] = sentinel``.
The default sentinel value of ``None`` will result in float output and ``nan``
assigned to null values for numeric and boolean inputs. This
corresponds to numpy's handling of ``None`` in ``np.result_type()``
and ``result[~is_valid] = None``.
Parameters
----------
sentinel : scalar, optional
The value with which nulls should be replaced.
Examples
--------
>>> import nanoarrow as na
>>> na_array = na.Array([1, 2, 3], na.int32())
>>> na_array.to_pysequence(handle_nulls=na.nulls_as_sentinel())
array([1, 2, 3], dtype=int32)
>>> na_array = na.Array([1, None, 3], na.int32())
>>> na_array.to_pysequence(handle_nulls=na.nulls_as_sentinel())
array([ 1., nan, 3.])
>>> na_array.to_pysequence(handle_nulls=na.nulls_as_sentinel(-999))
array([ 1, -999, 3], dtype=int32)
"""
import numpy as np
def handle(is_valid, data):
data = np.array(data, copy=False)
if is_valid is not None:
is_valid = np.array(is_valid, copy=False)
out_type = np.result_type(data, sentinel)
data = np.array(data, dtype=out_type, copy=True)
data[~is_valid] = sentinel
return data
else:
return data
return handle
def nulls_separate() -> Callable[[CBuffer, Sequence], Tuple[CBuffer, Sequence]]:
"""Return nulls as a tuple of is_valid, data
A null handler that returns its components.
Examples
--------
>>> import nanoarrow as na
>>> na_array = na.Array([1, 2, 3], na.int32())
>>> na_array.to_pysequence(handle_nulls=na.nulls_separate())
(None, nanoarrow.c_buffer.CBuffer(int32[12 b] 1 2 3))
>>> na_array = na.Array([1, None, 3], na.int32())
>>> result = na_array.to_pysequence(handle_nulls=na.nulls_separate())
>>> result[0]
nanoarrow.c_buffer.CBuffer(uint8[3 b] True False True)
>>> result[1]
nanoarrow.c_buffer.CBuffer(int32[12 b] 1 0 3)
"""
def handle(is_valid, data):
return is_valid, data
return handle
class ArrayViewVisitor(ArrayViewBaseIterator):
"""Compute a value from one or more arrays as ArrowArrayViews
This class supports a (currently internal) pattern for building
output from a zero or more arrays in a stream.
"""
@classmethod
def visit(cls, obj, schema=None, total_elements=None, **kwargs):
"""Visit all chunks in ``obj`` as a :func:`c_array_stream`."""
if total_elements is None and hasattr(obj, "__len__"):
total_elements = len(obj)
with c_array_stream(obj, schema=schema) as stream:
visitor = cls(stream._get_cached_schema(), **kwargs)
visitor.begin(total_elements)
visitor_set_array = visitor._set_array
visit_chunk_view = visitor.visit_chunk_view
array_view = visitor._array_view
for array in stream:
visitor_set_array(array)
visit_chunk_view(array_view)
return visitor.finish()
def begin(self, total_elements: Union[int, None] = None):
"""Called after the schema has been resolved but before any
chunks have been visited. If the total number of elements
(i.e., the sum of all chunk lengths) is known, it is provided here.
"""
pass
def visit_chunk_view(self, array_view: CArrayView) -> None:
"""Called exactly one for each chunk seen."""
pass
def finish(self) -> Any:
"""Called exactly once after all chunks have been visited."""
return None
class ToPySequenceConverter(ArrayViewVisitor):
def __init__(self, schema, handle_nulls=None, *, array_view=None):
super().__init__(schema, array_view=array_view)
cls, kwargs = _resolve_converter_cls(self._schema, handle_nulls=handle_nulls)
self._visitor = cls(schema, **kwargs, array_view=self._array_view)
def begin(self, total_elements: Union[int, None] = None):
self._visitor.begin(total_elements)
def visit_chunk_view(self, array_view: CArrayView) -> None:
self._visitor.visit_chunk_view(array_view)
def finish(self) -> Any:
return self._visitor.finish()
class ToColumnsPysequenceConverter(ArrayViewVisitor):
def __init__(self, schema, handle_nulls=None, *, array_view=None):
super().__init__(schema, array_view=array_view)
if self.schema.type != Type.STRUCT:
raise ValueError("ToColumnListConverter can only be used on a struct array")
# Resolve the appropriate visitor for each column
self._child_visitors = []
for child_schema, child_array_view in zip(
self._schema.children, self._array_view.children
):
self._child_visitors.append(
self._resolve_child_visitor(
child_schema, child_array_view, handle_nulls
)
)
def _resolve_child_visitor(self, child_schema, child_array_view, handle_nulls):
cls, kwargs = _resolve_converter_cls(child_schema, handle_nulls)
return cls(child_schema, **kwargs, array_view=child_array_view)
def begin(self, total_elements: Union[int, None] = None) -> None:
for child_visitor in self._child_visitors:
child_visitor.begin(total_elements)
def visit_chunk_view(self, array_view: CArrayView) -> Any:
# This visitor does not handle nulls because it has no way to propagate these
# into the child columns. It is designed to be used on top-level record batch
# arrays which typically are marked as non-nullable or do not contain nulls.
if array_view.null_count > 0:
raise ValueError("null_count > 0 encountered in ToColumnListConverter")
for child_visitor, child_array_view in zip(
self._child_visitors, array_view.children
):
child_visitor.visit_chunk_view(child_array_view)
def finish(self) -> Tuple[List[str], List[Sequence]]:
return [child.name for child in self._schema.children], [
v.finish() for v in self._child_visitors
]
class ToPyListConverter(ArrayViewVisitor):
def __init__(self, schema, *, iterator_cls=PyIterator, array_view=None):
super().__init__(schema, array_view=array_view)
# Ensure that self._iterator._array_view is self._array_view
self._iterator = iterator_cls(schema, array_view=self._array_view)
def begin(self, total_elements: Union[int, None] = None):
self._lst = []
def visit_chunk_view(self, array_view: CArrayView):
# The constructor here ensured that self._iterator._array_view
# is populated when self._set_array() is called.
self._lst.extend(self._iterator)
def finish(self) -> List:
return self._lst
class ToPyBufferConverter(ArrayViewVisitor):
def begin(self, total_elements: Union[int, None]):
self._builder = CBufferBuilder()
self._builder.set_format(self._schema_view.buffer_format)
if total_elements is not None:
element_size_bits = self._schema_view.layout.element_size_bits[1]
element_size_bytes = element_size_bits // 8
self._builder.reserve_bytes(total_elements * element_size_bytes)
def visit_chunk_view(self, array_view: CArrayView) -> None:
builder = self._builder
Loading ...