Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / nanoarrow   python

Repository URL to install this package:

Version: 0.7.0.dev132 

/ src / nanoarrow / c_array_stream.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from nanoarrow._array_stream import CArrayStream
from nanoarrow._utils import obj_is_capsule
from nanoarrow.c_array import c_array
from nanoarrow.c_schema import c_schema


def c_array_stream(obj=None, schema=None) -> CArrayStream:
    """ArrowArrayStream wrapper

    This class provides a user-facing interface to access the fields of
    an ArrowArrayStream as defined in the Arrow C Stream interface.
    These objects are usually created using `nanoarrow.c_array_stream()`.

    Examples
    --------

    >>> import pyarrow as pa
    >>> import nanoarrow as na
    >>> pa_column = pa.array([1, 2, 3], pa.int32())
    >>> pa_batch = pa.record_batch([pa_column], names=["col1"])
    >>> pa_reader = pa.RecordBatchReader.from_batches(pa_batch.schema, [pa_batch])
    >>> array_stream = na.c_array_stream(pa_reader)
    >>> array_stream.get_schema()
    <nanoarrow.c_schema.CSchema struct>
    - format: '+s'
    - name: ''
    - flags: 0
    - metadata: NULL
    - dictionary: NULL
    - children[1]:
      'col1': <nanoarrow.c_schema.CSchema int32>
        - format: 'i'
        - name: 'col1'
        - flags: 2
        - metadata: NULL
        - dictionary: NULL
        - children[0]:
    >>> array_stream.get_next().length
    3
    >>> array_stream.get_next() is None
    Traceback (most recent call last):
      ...
    StopIteration
    """

    if schema is not None:
        schema = c_schema(schema)

    if isinstance(obj, CArrayStream) and schema is None:
        return obj

    # Try capsule protocol
    if hasattr(obj, "__arrow_c_stream__"):
        schema_capsule = None if schema is None else schema.__arrow_c_schema__()
        return CArrayStream._import_from_c_capsule(
            obj.__arrow_c_stream__(requested_schema=schema_capsule)
        )

    # Try import of bare capsule
    if obj_is_capsule(obj, "arrow_array_stream"):
        if schema is not None:
            raise TypeError(
                "Can't import c_array_stream from capsule with requested schema"
            )
        return CArrayStream._import_from_c_capsule(obj)

    # Try _export_to_c for RecordBatchReader objects if pyarrow < 14.0
    if _obj_is_pyarrow_record_batch_reader(obj):
        out = CArrayStream.allocate()
        obj._export_to_c(out._addr())
        return out

    try:
        array = c_array(obj, schema=schema)
        return CArrayStream.from_c_arrays([array], array.schema, validate=False)
    except Exception as e:
        raise TypeError(
            f"An error occurred whilst converting {type(obj).__name__} "
            f"to nanoarrow.c_array_stream or nanoarrow.c_array: \n {e}"
        ) from e


def allocate_c_array_stream() -> CArrayStream:
    """Allocate an uninitialized ArrowArrayStream wrapper

    Examples
    --------

    >>> import pyarrow as pa
    >>> from nanoarrow.c_array_stream import allocate_c_array_stream
    >>> pa_column = pa.array([1, 2, 3], pa.int32())
    >>> pa_batch = pa.record_batch([pa_column], names=["col1"])
    >>> pa_reader = pa.RecordBatchReader.from_batches(pa_batch.schema, [pa_batch])
    >>> array_stream = allocate_c_array_stream()
    >>> pa_reader._export_to_c(array_stream._addr())
    """
    return CArrayStream.allocate()


def _obj_is_pyarrow_record_batch_reader(obj):
    obj_type = type(obj)
    if not obj_type.__module__.startswith("pyarrow"):
        return False

    if not obj_type.__name__.endswith("RecordBatchReader"):
        return False

    return hasattr(obj, "_export_to_c")