# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import itertools
from functools import cached_property
from typing import Iterable, Tuple, Union
from nanoarrow._array import CArray, CArrayView
from nanoarrow._array_stream import CMaterializedArrayStream
from nanoarrow._buffer import CBufferView
from nanoarrow._device import DEVICE_CPU, Device
from nanoarrow.c_array import c_array, c_array_view
from nanoarrow.c_array_stream import c_array_stream
from nanoarrow.c_schema import c_schema
from nanoarrow.iterator import iter_array_views, iter_py, iter_tuples
from nanoarrow.schema import Schema, _schema_repr
from nanoarrow.visitor import ArrayViewVisitable
from nanoarrow import _repr_utils
class Scalar:
"""Generic wrapper around an :class:`Array` element
This class exists to provide a generic implementation of
array-like indexing for the :class:`Array`. These objects
can currently only be created by extracting an element from
an :class:`Array`.
Note that it is rarely efficient to iterate over Scalar objects:
use the iterators in :mod:`nanoarrow.iterator` to more effectively
iterate over an :class:`Array`.
Examples
--------
>>> import nanoarrow as na
>>> array = na.Array([1, 2, 3], na.int32())
>>> array[0]
Scalar<int32> 1
>>> array[0].as_py()
1
>>> array[0].schema
<Schema> int32
"""
def __init__(self):
# Private constructor
self._c_array = None
self._offset = None
self._schema = None
self._device = None
@property
def device(self) -> Device:
return self._device
@property
def schema(self) -> Schema:
"""Get the schema (data type) of this scalar"""
return self._schema
def as_py(self):
"""Get the Python object representation of this scalar"""
return next(iter_py(self))
def to_string(self, width_hint=80) -> str:
schema_repr = _schema_repr(
self.schema,
max_char_width=width_hint // 4,
prefix="",
include_metadata=False,
)
prefix = f"Scalar<{schema_repr}> "
width_hint -= len(prefix)
py_repr = repr(self.as_py())
if len(py_repr) > width_hint:
py_repr = py_repr[: (width_hint - 3)] + "..."
return f"{prefix}{py_repr}"
def __repr__(self) -> str:
return self.to_string()
def __arrow_c_array__(self, requested_schema=None):
array = self._c_array[self._offset : (self._offset + 1)]
return array.__arrow_c_array__(requested_schema=requested_schema)
class Array(ArrayViewVisitable):
"""High-level in-memory Array representation
The Array is nanoarrow's high-level in-memory array representation whose
scope maps to that of a fully-consumed ArrowArrayStream in the Arrow C Data
interface.
The :class:`Array` class is nanoarrow's high-level in-memory array
representation, encompasing the role of PyArrow's ``Array``,
``ChunkedArray``, ``RecordBatch``, and ``Table``. This scope maps
to that of a fully-consumed ``ArrowArrayStream`` as represented by
the Arrow C Stream interface.
Note that an :class:`Array` is not necessarily contiguous in memory (i.e.,
it may consist of zero or more ``ArrowArray``s).
Parameters
----------
obj : array or array stream-like
An array-like or array stream-like object as sanitized by
:func:`c_array_stream`.
schema : schema-like, optional
An optional schema, passed to :func:`c_array_stream`.
device : Device, optional
The device associated with the buffers held by this Array.
Defaults to the CPU device.
Examples
--------
>>> import nanoarrow as na
>>> na.Array([1, 2, 3], na.int32())
nanoarrow.Array<int32>[3]
1
2
3
"""
def __init__(self, obj, schema=None, device=None) -> None:
if device is None:
self._device = DEVICE_CPU
elif isinstance(device, Device):
self._device = device
else:
raise TypeError("device must be Device")
if isinstance(obj, CMaterializedArrayStream) and schema is None:
self._data = obj
return
if isinstance(obj, Array) and schema is None:
self._data = obj._data
return
if isinstance(obj, CArray) and schema is None:
self._data = CMaterializedArrayStream.from_c_array(obj)
return
with c_array_stream(obj, schema=schema) as stream:
self._data = CMaterializedArrayStream.from_c_array_stream(stream)
@staticmethod
def from_chunks(obj: Iterable, schema=None, validate: bool = True):
"""Create an Array with explicit chunks
Creates an :class:`Array` with explicit chunking from an iterable of
objects that can be converted to a :func:`c_array`.
Parameters
----------
obj : iterable of array-like
An iterable of objects that can be passed to :func:`c_array`.
schema : schema-like, optional
An optional schema. If present, will be passed to :func:`c_array`
for each item in obj; if not present it will be inferred from the first
chunk.
validate : bool
Use ``False`` to opt out of validation steps performed when constructing
this array.
Examples
--------
>>> import nanoarrow as na
>>> na.Array.from_chunks([[1, 2, 3], [4, 5, 6]], na.int32())
nanoarrow.Array<int32>[6]
1
2
3
4
5
6
"""
obj = iter(obj)
if schema is None:
first = next(obj, None)
if first is None:
raise ValueError("Can't create empty Array from chunks without schema")
first = c_array(first)
out_schema = first.schema
obj = itertools.chain([first], obj)
else:
out_schema = c_schema(schema)
data = CMaterializedArrayStream.from_c_arrays(
(c_array(item, schema) for item in obj), out_schema, validate=validate
)
return Array(data)
def _assert_one_chunk(self, op):
if self._data.n_arrays != 1:
raise ValueError(f"Can't {op} with non-contiguous Array")
def _assert_cpu(self, op):
if self._device != DEVICE_CPU:
raise ValueError(f"Can't {op} with Array on non-CPU device")
def __arrow_c_stream__(self, requested_schema=None):
self._assert_cpu("export ArrowArrayStream")
return self._data.__arrow_c_stream__(requested_schema=requested_schema)
def __arrow_c_array__(self, requested_schema=None):
self._assert_cpu("export ArrowArray")
if self._data.n_arrays == 0:
return c_array([], schema=self._data.schema).__arrow_c_array__(
requested_schema=requested_schema
)
elif self._data.n_arrays == 1:
return self._data.array(0).__arrow_c_array__(
requested_schema=requested_schema
)
self._assert_one_chunk("export ArrowArray")
@property
def device(self) -> Device:
"""Get the device on which the buffers for this array are allocated
Examples
--------
>>> import nanoarrow as na
>>> array = na.Array([1, 2, 3], na.int32())
>>> array.device
<nanoarrow.device.Device>
- device_type: CPU <1>
- device_id: -1
"""
return self._device
@cached_property
def schema(self) -> Schema:
"""Get the schema (data type) of this Array"""
return Schema(self._data.schema)
@property
def n_buffers(self) -> int:
"""Get the number of buffers in each chunk of this Array
Examples
--------
>>> import nanoarrow as na
>>> array = na.Array([1, 2, 3], na.int32())
>>> array.n_buffers
2
"""
return self.schema._c_schema_view.layout.n_buffers
@property
def offset(self) -> int:
"""Access the logical offset of a contiguous array
Examples
--------
>>> import nanoarrow as na
>>> c_array = na.c_array([1, 2, 3], na.int32())
>>> na.Array(c_array[1:]).offset
1
"""
if self._data.n_arrays == 0:
return 0
self._assert_one_chunk("scalar offset")
return self._data.array(0).offset
def buffer(self, i: int) -> CBufferView:
"""Access a single buffer of a contiguous array
Examples
--------
>>> import nanoarrow as na
>>> array = na.Array([1, 2, 3], na.int32())
>>> array.buffer(1)
nanoarrow.c_buffer.CBufferView(int32[12 b] 1 2 3)
"""
return self.buffers[i]
@cached_property
def buffers(self) -> Tuple[CBufferView, ...]:
"""Access buffers of a contiguous array.
Examples
--------
>>> import nanoarrow as na
>>> array = na.Array([1, 2, 3], na.int32())
>>> for buffer in array.buffers:
... print(buffer)
nanoarrow.c_buffer.CBufferView(bool[0 b] )
nanoarrow.c_buffer.CBufferView(int32[12 b] 1 2 3)
"""
view = c_array_view(self)
return tuple(view.buffers)
def iter_chunk_views(self) -> Iterable[CArrayView]:
"""Iterate over prepared views of each chunk
Examples
--------
>>> import nanoarrow as na
>>> array = na.Array([1, 2, 3], na.int32())
>>> for view in array.iter_chunk_views():
... offset, length = view.offset, len(view)
... validity, data = view.buffers
... print(offset, length)
... print(validity)
... print(data)
0 3
nanoarrow.c_buffer.CBufferView(bool[0 b] )
nanoarrow.c_buffer.CBufferView(int32[12 b] 1 2 3)
"""
return iter_array_views(self)
@property
def n_children(self) -> int:
Loading ...