# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import io
from nanoarrow._array_stream import CArrayStream
from nanoarrow._ipc_lib import (
CIpcInputStream,
CIpcOutputStream,
CIpcWriter,
init_array_stream,
)
from nanoarrow._utils import obj_is_buffer
from nanoarrow.array import c_array
from nanoarrow.array_stream import c_array_stream
from nanoarrow.iterator import ArrayViewBaseIterator
from nanoarrow import _repr_utils
class InputStream:
"""Stream of serialized Arrow data
Reads file paths or otherwise readable file objects that contain
serialized Arrow data. Arrow documentation typically refers to this format
as "Arrow IPC" because its origin was as a means to transmit tables between
processes; however, this format can also be written to and read from files
or URLs and is essentially a high-performance equivalent of a CSV file that
does a better job maintaining type fidelity.
Use :staticmethod:`from_readable`, :staticmethod:`from_path`, or
:staticmethod:`from_url` to construct these streams.
Examples
--------
>>> import nanoarrow as na
>>> from nanoarrow.ipc import InputStream
>>> with InputStream.example() as inp, na.c_array_stream(inp) as stream:
... stream
<nanoarrow.c_array_stream.CArrayStream>
- get_schema(): struct<some_col: int32>
"""
def __init__(self):
self._stream = None
self._desc = None
def _is_valid(self) -> bool:
return self._stream is not None and self._stream.is_valid()
def __arrow_c_stream__(self, requested_schema=None):
"""Export this stream as an ArrowArrayStream
Implements the Arrow PyCapsule interface by transferring ownership of this
input stream to an ArrowArrayStream wrapped by a PyCapsule.
"""
if not self._is_valid():
raise RuntimeError("nanoarrow.ipc.InputStream is no longer valid")
with CArrayStream.allocate() as array_stream:
init_array_stream(self._stream, array_stream._addr())
array_stream._get_cached_schema()
return array_stream.__arrow_c_stream__(requested_schema=requested_schema)
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
if self._stream is not None:
self._stream.release()
@staticmethod
def from_readable(obj):
"""Wrap an open readable file or buffer as an Arrow IPC stream
Wraps a readable object (specificially, an object that implements a
``readinto()`` method) as a non-owning InputStream. Closing ``obj`` remains
the caller's responsibility: neither this stream nor the resulting array
stream will call ``obj.close()``.
Parameters
----------
obj : readable file-like or buffer
An object implementing the Python buffer protocol or ``readinto()``.
Examples
--------
>>> import nanoarrow as na
>>> from nanoarrow.ipc import InputStream
>>> ipc_stream = InputStream.from_readable(InputStream.example_bytes())
>>> na.c_array_stream(ipc_stream)
<nanoarrow.c_array_stream.CArrayStream>
- get_schema(): struct<some_col: int32>
"""
if not hasattr(obj, "readinto") and obj_is_buffer(obj):
close_obj = True
obj = io.BytesIO(obj)
else:
close_obj = False
out = InputStream()
out._stream = CIpcInputStream.from_readable(obj, close_obj=close_obj)
out._desc = repr(obj)
return out
@staticmethod
def from_path(obj, *args, **kwargs):
"""Wrap a local file as an IPC stream
Wraps a pathlike object (specificially, one that can be passed to ``open()``)
as an owning InputStream. The file will be opened in binary mode and will be
closed when this stream or the resulting array stream is released.
Parameters
----------
obj : path-like
A string or path-like object that can be passed to ``open()``
Examples
--------
>>> import tempfile
>>> import os
>>> import nanoarrow as na
>>> from nanoarrow.ipc import InputStream
>>> with tempfile.TemporaryDirectory() as td:
... path = os.path.join(td, "test.arrows")
... with open(path, "wb") as f:
... nbytes = f.write(InputStream.example_bytes())
...
... with InputStream.from_path(path) as inp, na.c_array_stream(inp) as s:
... s
<nanoarrow.c_array_stream.CArrayStream>
- get_schema(): struct<some_col: int32>
"""
out = InputStream()
out._stream = CIpcInputStream.from_readable(
open(obj, "rb", *args, **kwargs), close_obj=True
)
out._desc = repr(obj)
return out
@staticmethod
def from_url(obj, *args, **kwargs):
"""Wrap a URL as an IPC stream
Wraps a URL (specificially, one that can be passed to
``urllib.request.urlopen()``) as an owning InputStream. The URL will be
closed when this stream or the resulting array stream is released.
Parameters
----------
obj : str
A URL that can be passed to ``urllib.request.urlopen()``
Examples
--------
>>> import pathlib
>>> import tempfile
>>> import os
>>> import nanoarrow as na
>>> from nanoarrow.ipc import InputStream
>>> with tempfile.TemporaryDirectory() as td:
... path = os.path.join(td, "test.arrows")
... with open(path, "wb") as f:
... nbytes = f.write(InputStream.example_bytes())
...
... uri = pathlib.Path(path).as_uri()
... with InputStream.from_url(uri) as inp, na.c_array_stream(inp) as stream:
... stream
<nanoarrow.c_array_stream.CArrayStream>
- get_schema(): struct<some_col: int32>
"""
import urllib.request
out = InputStream()
out._stream = CIpcInputStream.from_readable(
urllib.request.urlopen(obj, *args, **kwargs), close_obj=True
)
out._desc = repr(obj)
return out
@staticmethod
def example():
"""Example IPC InputStream
A self-contained example whose value is the serialized version of
``DataFrame({"some_col": [1, 2, 3]})``. This may be used for testing
and documentation and is useful because nanoarrow does not implement
a writer to generate test data.
Examples
--------
>>> from nanoarrow.ipc import InputStream
>>> InputStream.example()
<nanoarrow.ipc.InputStream <_io.BytesIO object at ...>>
"""
return InputStream.from_readable(InputStream.example_bytes())
@staticmethod
def example_bytes():
"""Example stream bytes
The underlying bytes of the :staticmethod:`example` InputStream. This is useful
for writing files or creating other types of test input.
Examples
--------
>>> import os
>>> import tempfile
>>> from nanoarrow.ipc import InputStream
>>> with tempfile.TemporaryDirectory() as td:
... path = os.path.join(td, "test.arrows")
... with open(path, "wb") as f:
... f.write(InputStream.example_bytes())
440
"""
return _EXAMPLE_IPC_SCHEMA + _EXAMPLE_IPC_BATCH
def __repr__(self) -> str:
class_label = _repr_utils.make_class_label(self)
if self._is_valid():
return f"<{class_label} {self._desc}>"
else:
return f"<{class_label} <invalid>>"
class StreamWriter:
"""Write streams of serialized Arrow data
Provides various ways of writing Arrow schemas and record batches as
binary data serialized using the Arrow IPC streaming format.
Use :staticmethod:`from_writeable` or :staticmethod:`from_path`, or
to construct a writer.
Examples
--------
>>> import io
>>> import nanoarrow as na
>>> from nanoarrow.ipc import StreamWriter
>>>
>>> out = io.BytesIO()
>>> array = na.c_array_from_buffers(
... na.struct({"some_col": na.int32()}),
... length=3,
... buffers=[],
... children=[na.c_array([1, 2, 3], na.int32())]
... )
>>>
>>> with StreamWriter.from_writable(out) as writer:
... writer.write_stream(array)
>>>
>>> na.ArrayStream.from_readable(out.getvalue()).read_all()
nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
{'some_col': 1}
{'some_col': 2}
{'some_col': 3}
"""
def __init__(self):
self._writer = None
self._desc = None
self._iterator = None
def _is_valid(self) -> bool:
return self._writer is not None and self._writer.is_valid()
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.close()
def release(self):
"""Close stream without writing the end-of-stream marker"""
if not self._is_valid():
return
self._writer.release()
self._writer = None
def close(self):
"""Close stream and write end-of-stream marker"""
if not self._is_valid():
return
self._writer.write_end_of_stream()
self.release()
def write_array(self, obj, schema=None, *, write_schema=None):
"""Interpret obj as an array and write to stream
Parameters
----------
obj : array-like
An array-like object as sanitized by :func:`c_array`.
schema : schema-like, optional
An optional schema, passed to :func:`c_array`.
write_schema : bool, optional
See :meth:`write_stream`.
"""
obj = c_array(obj)
return self.write_stream(obj, schema, write_schema=write_schema)
def write_stream(self, obj, schema=None, *, write_schema=None):
"""Interpret obj as a stream of arrays and write to stream
Writes all arrays from obj to the output stream.
Parameters
----------
obj : array stream-like
An array-like or array stream-like object as sanitized by
:func:`c_array_stream`.
schema : schema-like, optional
An optional schema, passed to :func:`c_array_stream`.
write_schema : bool, optional
If True, the schema will always be written to the output stream; if False,
the schema will never be written to the output stream. If omitted, the
schema will be written if nothing has yet been written to the output.
"""
if not self._is_valid():
raise ValueError("Can't write to released StreamWriter")
Loading ...