# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from nanoarrow._array_stream import CArrayStream
from nanoarrow._utils import obj_is_capsule
from nanoarrow.c_array import c_array
from nanoarrow.c_schema import c_schema
def c_array_stream(obj=None, schema=None) -> CArrayStream:
"""ArrowArrayStream wrapper
This class provides a user-facing interface to access the fields of
an ArrowArrayStream as defined in the Arrow C Stream interface.
These objects are usually created using `nanoarrow.c_array_stream()`.
Examples
--------
>>> import pyarrow as pa
>>> import nanoarrow as na
>>> pa_column = pa.array([1, 2, 3], pa.int32())
>>> pa_batch = pa.record_batch([pa_column], names=["col1"])
>>> pa_reader = pa.RecordBatchReader.from_batches(pa_batch.schema, [pa_batch])
>>> array_stream = na.c_array_stream(pa_reader)
>>> array_stream.get_schema()
<nanoarrow.c_schema.CSchema struct>
- format: '+s'
- name: ''
- flags: 0
- metadata: NULL
- dictionary: NULL
- children[1]:
'col1': <nanoarrow.c_schema.CSchema int32>
- format: 'i'
- name: 'col1'
- flags: 2
- metadata: NULL
- dictionary: NULL
- children[0]:
>>> array_stream.get_next().length
3
>>> array_stream.get_next() is None
Traceback (most recent call last):
...
StopIteration
"""
if schema is not None:
schema = c_schema(schema)
if isinstance(obj, CArrayStream) and schema is None:
return obj
# Try capsule protocol
if hasattr(obj, "__arrow_c_stream__"):
schema_capsule = None if schema is None else schema.__arrow_c_schema__()
return CArrayStream._import_from_c_capsule(
obj.__arrow_c_stream__(requested_schema=schema_capsule)
)
# Try import of bare capsule
if obj_is_capsule(obj, "arrow_array_stream"):
if schema is not None:
raise TypeError(
"Can't import c_array_stream from capsule with requested schema"
)
return CArrayStream._import_from_c_capsule(obj)
# Try _export_to_c for RecordBatchReader objects if pyarrow < 14.0
if _obj_is_pyarrow_record_batch_reader(obj):
out = CArrayStream.allocate()
obj._export_to_c(out._addr())
return out
try:
array = c_array(obj, schema=schema)
return CArrayStream.from_c_arrays([array], array.schema, validate=False)
except Exception as e:
raise TypeError(
f"An error occurred whilst converting {type(obj).__name__} "
f"to nanoarrow.c_array_stream or nanoarrow.c_array: \n {e}"
) from e
def allocate_c_array_stream() -> CArrayStream:
"""Allocate an uninitialized ArrowArrayStream wrapper
Examples
--------
>>> import pyarrow as pa
>>> from nanoarrow.c_array_stream import allocate_c_array_stream
>>> pa_column = pa.array([1, 2, 3], pa.int32())
>>> pa_batch = pa.record_batch([pa_column], names=["col1"])
>>> pa_reader = pa.RecordBatchReader.from_batches(pa_batch.schema, [pa_batch])
>>> array_stream = allocate_c_array_stream()
>>> pa_reader._export_to_c(array_stream._addr())
"""
return CArrayStream.allocate()
def _obj_is_pyarrow_record_batch_reader(obj):
obj_type = type(obj)
if not obj_type.__module__.startswith("pyarrow"):
return False
if not obj_type.__name__.endswith("RecordBatchReader"):
return False
return hasattr(obj, "_export_to_c")