Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / nanoarrow   python

Repository URL to install this package:

/ src / nanoarrow / ipc.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import io

from nanoarrow._array_stream import CArrayStream
from nanoarrow._ipc_lib import (
    CIpcInputStream,
    CIpcOutputStream,
    CIpcWriter,
    init_array_stream,
)
from nanoarrow._utils import obj_is_buffer
from nanoarrow.array import c_array
from nanoarrow.array_stream import c_array_stream
from nanoarrow.iterator import ArrayViewBaseIterator

from nanoarrow import _repr_utils


class InputStream:
    """Stream of serialized Arrow data

    Reads file paths or otherwise readable file objects that contain
    serialized Arrow data. Arrow documentation typically refers to this format
    as "Arrow IPC" because its origin was as a means to transmit tables between
    processes; however, this format can also be written to and read from files
    or URLs and is essentially a high-performance equivalent of a CSV file that
    does a better job maintaining type fidelity.

    Use :staticmethod:`from_readable`, :staticmethod:`from_path`, or
    :staticmethod:`from_url` to construct these streams.

    Examples
    --------

    >>> import nanoarrow as na
    >>> from nanoarrow.ipc import InputStream
    >>> with InputStream.example() as inp, na.c_array_stream(inp) as stream:
    ...     stream
    <nanoarrow.c_array_stream.CArrayStream>
    - get_schema(): struct<some_col: int32>
    """

    def __init__(self):
        self._stream = None
        self._desc = None

    def _is_valid(self) -> bool:
        return self._stream is not None and self._stream.is_valid()

    def __arrow_c_stream__(self, requested_schema=None):
        """Export this stream as an ArrowArrayStream

        Implements the Arrow PyCapsule interface by transferring ownership of this
        input stream to an ArrowArrayStream wrapped by a PyCapsule.
        """
        if not self._is_valid():
            raise RuntimeError("nanoarrow.ipc.InputStream is no longer valid")

        with CArrayStream.allocate() as array_stream:
            init_array_stream(self._stream, array_stream._addr())
            array_stream._get_cached_schema()
            return array_stream.__arrow_c_stream__(requested_schema=requested_schema)

    def __enter__(self):
        return self

    def __exit__(self, *args, **kwargs):
        if self._stream is not None:
            self._stream.release()

    @staticmethod
    def from_readable(obj):
        """Wrap an open readable file or buffer as an Arrow IPC stream

        Wraps a readable object (specificially, an object that implements a
        ``readinto()`` method) as a non-owning InputStream. Closing ``obj`` remains
        the caller's responsibility: neither this stream nor the resulting array
        stream will call ``obj.close()``.

        Parameters
        ----------
        obj : readable file-like or buffer
            An object implementing the Python buffer protocol or ``readinto()``.

        Examples
        --------

        >>> import nanoarrow as na
        >>> from nanoarrow.ipc import InputStream
        >>> ipc_stream = InputStream.from_readable(InputStream.example_bytes())
        >>> na.c_array_stream(ipc_stream)
        <nanoarrow.c_array_stream.CArrayStream>
        - get_schema(): struct<some_col: int32>
        """
        if not hasattr(obj, "readinto") and obj_is_buffer(obj):
            close_obj = True
            obj = io.BytesIO(obj)
        else:
            close_obj = False

        out = InputStream()
        out._stream = CIpcInputStream.from_readable(obj, close_obj=close_obj)
        out._desc = repr(obj)
        return out

    @staticmethod
    def from_path(obj, *args, **kwargs):
        """Wrap a local file as an IPC stream

        Wraps a pathlike object (specificially, one that can be passed to ``open()``)
        as an owning InputStream. The file will be opened in binary mode and will be
        closed when this stream or the resulting array stream is released.

        Parameters
        ----------
        obj : path-like
            A string or path-like object that can be passed to ``open()``

        Examples
        --------

        >>> import tempfile
        >>> import os
        >>> import nanoarrow as na
        >>> from nanoarrow.ipc import InputStream
        >>> with tempfile.TemporaryDirectory() as td:
        ...     path = os.path.join(td, "test.arrows")
        ...     with open(path, "wb") as f:
        ...         nbytes = f.write(InputStream.example_bytes())
        ...
        ...     with InputStream.from_path(path) as inp, na.c_array_stream(inp) as s:
        ...         s
        <nanoarrow.c_array_stream.CArrayStream>
        - get_schema(): struct<some_col: int32>
        """
        out = InputStream()
        out._stream = CIpcInputStream.from_readable(
            open(obj, "rb", *args, **kwargs), close_obj=True
        )
        out._desc = repr(obj)
        return out

    @staticmethod
    def from_url(obj, *args, **kwargs):
        """Wrap a URL as an IPC stream

        Wraps a URL (specificially, one that can be passed to
        ``urllib.request.urlopen()``) as an owning InputStream. The URL will be
        closed when this stream or the resulting array stream is released.

        Parameters
        ----------
        obj : str
            A URL that can be passed to ``urllib.request.urlopen()``

        Examples
        --------

        >>> import pathlib
        >>> import tempfile
        >>> import os
        >>> import nanoarrow as na
        >>> from nanoarrow.ipc import InputStream
        >>> with tempfile.TemporaryDirectory() as td:
        ...     path = os.path.join(td, "test.arrows")
        ...     with open(path, "wb") as f:
        ...         nbytes = f.write(InputStream.example_bytes())
        ...
        ...     uri = pathlib.Path(path).as_uri()
        ...     with InputStream.from_url(uri) as inp, na.c_array_stream(inp) as stream:
        ...         stream
        <nanoarrow.c_array_stream.CArrayStream>
        - get_schema(): struct<some_col: int32>
        """
        import urllib.request

        out = InputStream()
        out._stream = CIpcInputStream.from_readable(
            urllib.request.urlopen(obj, *args, **kwargs), close_obj=True
        )
        out._desc = repr(obj)
        return out

    @staticmethod
    def example():
        """Example IPC InputStream

        A self-contained example whose value is the serialized version of
        ``DataFrame({"some_col": [1, 2, 3]})``. This may be used for testing
        and documentation and is useful because nanoarrow does not implement
        a writer to generate test data.

        Examples
        --------

        >>> from nanoarrow.ipc import InputStream
        >>> InputStream.example()
        <nanoarrow.ipc.InputStream <_io.BytesIO object at ...>>
        """
        return InputStream.from_readable(InputStream.example_bytes())

    @staticmethod
    def example_bytes():
        """Example stream bytes

        The underlying bytes of the :staticmethod:`example` InputStream. This is useful
        for writing files or creating other types of test input.

        Examples
        --------

        >>> import os
        >>> import tempfile
        >>> from nanoarrow.ipc import InputStream
        >>> with tempfile.TemporaryDirectory() as td:
        ...     path = os.path.join(td, "test.arrows")
        ...     with open(path, "wb") as f:
        ...         f.write(InputStream.example_bytes())
        440
        """
        return _EXAMPLE_IPC_SCHEMA + _EXAMPLE_IPC_BATCH

    def __repr__(self) -> str:
        class_label = _repr_utils.make_class_label(self)
        if self._is_valid():
            return f"<{class_label} {self._desc}>"
        else:
            return f"<{class_label} <invalid>>"


class StreamWriter:
    """Write streams of serialized Arrow data

    Provides various ways of writing Arrow schemas and record batches as
    binary data serialized using the Arrow IPC streaming format.

    Use :staticmethod:`from_writeable` or :staticmethod:`from_path`, or
    to construct a writer.

    Examples
    --------

    >>> import io
    >>> import nanoarrow as na
    >>> from nanoarrow.ipc import StreamWriter
    >>>
    >>> out = io.BytesIO()
    >>> array = na.c_array_from_buffers(
    ...     na.struct({"some_col": na.int32()}),
    ...     length=3,
    ...     buffers=[],
    ...     children=[na.c_array([1, 2, 3], na.int32())]
    ... )
    >>>
    >>> with StreamWriter.from_writable(out) as writer:
    ...     writer.write_stream(array)
    >>>
    >>> na.ArrayStream.from_readable(out.getvalue()).read_all()
    nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
    {'some_col': 1}
    {'some_col': 2}
    {'some_col': 3}
    """

    def __init__(self):
        self._writer = None
        self._desc = None
        self._iterator = None

    def _is_valid(self) -> bool:
        return self._writer is not None and self._writer.is_valid()

    def __enter__(self):
        return self

    def __exit__(self, *args, **kwargs):
        self.close()

    def release(self):
        """Close stream without writing the end-of-stream marker"""
        if not self._is_valid():
            return

        self._writer.release()
        self._writer = None

    def close(self):
        """Close stream and write end-of-stream marker"""
        if not self._is_valid():
            return

        self._writer.write_end_of_stream()
        self.release()

    def write_array(self, obj, schema=None, *, write_schema=None):
        """Interpret obj as an array and write to stream

        Parameters
        ----------
        obj : array-like
            An array-like object as sanitized by :func:`c_array`.
        schema : schema-like, optional
            An optional schema, passed to :func:`c_array`.
        write_schema : bool, optional
            See :meth:`write_stream`.
        """
        obj = c_array(obj)
        return self.write_stream(obj, schema, write_schema=write_schema)

    def write_stream(self, obj, schema=None, *, write_schema=None):
        """Interpret obj as a stream of arrays and write to stream

        Writes all arrays from obj to the output stream.

        Parameters
        ----------
        obj : array stream-like
            An array-like or array stream-like object as sanitized by
            :func:`c_array_stream`.
        schema : schema-like, optional
            An optional schema, passed to :func:`c_array_stream`.
        write_schema : bool, optional
            If True, the schema will always be written to the output stream; if False,
            the schema will never be written to the output stream. If omitted, the
            schema will be written if nothing has yet been written to the output.
        """
        if not self._is_valid():
            raise ValueError("Can't write to released StreamWriter")
Loading ...