Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / nanoarrow   python

Repository URL to install this package:

/ src / nanoarrow / _ipc_lib.pyx

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: language_level = 3
# cython: linetrace=True

from libc.stdint cimport uint8_t, int64_t, uintptr_t
from libc.errno cimport EIO, EAGAIN
from libc.stdio cimport snprintf
from cpython.ref cimport PyObject, Py_INCREF, Py_DECREF
from cpython cimport Py_buffer, PyBuffer_FillInfo

from nanoarrow_c cimport (
    ArrowArrayStream,
    ArrowArrayView,
    ArrowSchema,
    ArrowErrorCode,
    ArrowError,
    NANOARROW_OK,
)

from nanoarrow._schema cimport CSchema
from nanoarrow._array cimport CArrayView
from nanoarrow._utils cimport Error


cdef extern from "nanoarrow_ipc.h" nogil:
    struct ArrowIpcInputStream:
        ArrowErrorCode (*read)(ArrowIpcInputStream* stream, uint8_t* buf,
                               int64_t buf_size_bytes, int64_t* size_read_out,
                               ArrowError* error)
        void (*release)(ArrowIpcInputStream* stream)
        void* private_data

    struct ArrowIpcArrayStreamReaderOptions:
        int64_t field_index
        int use_shared_buffers

    ArrowErrorCode ArrowIpcArrayStreamReaderInit(
        ArrowArrayStream* out, ArrowIpcInputStream* input_stream,
        ArrowIpcArrayStreamReaderOptions* options)

    struct ArrowIpcOutputStream:
        ArrowErrorCode (*write)(ArrowIpcOutputStream* stream, const void* buf,
                                int64_t buf_size_bytes, int64_t* size_written_out,
                                ArrowError* error)
        void (*release)(ArrowIpcOutputStream* stream)
        void* private_data

    struct ArrowIpcWriter:
        void* private_data

    ArrowErrorCode ArrowIpcWriterInit(ArrowIpcWriter* writer,
                                      ArrowIpcOutputStream* output_stream)
    void ArrowIpcWriterReset(ArrowIpcWriter* writer)
    ArrowErrorCode ArrowIpcWriterWriteSchema(ArrowIpcWriter* writer,
                                             const ArrowSchema* in_,
                                             ArrowError* error)
    ArrowErrorCode ArrowIpcWriterWriteArrayView(ArrowIpcWriter* writer,
                                                const ArrowArrayView* in_,
                                                ArrowError* error)

    ArrowErrorCode ArrowIpcWriterWriteArrayStream(ArrowIpcWriter* writer,
                                                  ArrowArrayStream* in_,
                                                  ArrowError* error)

cdef class PyStreamPrivate:
    cdef object _obj
    cdef bint _close_obj
    cdef void* _addr
    cdef Py_ssize_t _size_bytes
    cdef bint _buffer_readonly

    def __cinit__(self, obj, bint buffer_readonly, bint close_obj=False):
        self._obj = obj
        self._close_obj = close_obj
        self._addr = NULL
        self._size_bytes = 0
        self._buffer_readonly = buffer_readonly

    @property
    def obj(self):
        return self._obj

    @property
    def close_obj(self):
        return self._close_obj

    def set_buffer(self, uintptr_t addr, Py_ssize_t size_bytes):
        self._addr = <void*>addr
        self._size_bytes = size_bytes

    # Needed for at least some implementations of readinto()
    def __len__(self):
        return self._size_bytes

    # Implement the buffer protocol so that this object can be used as
    # the argument to xxx.readinto() or xxx.write(). This ensures that
    # no extra copies (beyond any buffering done by the upstream file-like object)
    # are held since the upstream object has access to the preallocated output buffer.
    # In the read case, the preallocation is done by the ArrowArrayStream
    # implementation before issuing each read call (two per message, with
    # an extra call for a RecordBatch message to get the actual buffer data).
    # In the write case, this will be a view of whatever information was provided to
    # the write callback.
    def __getbuffer__(self, Py_buffer* buffer, int flags):
        PyBuffer_FillInfo(buffer, self, self._addr, self._size_bytes, self._buffer_readonly, flags)

    def __releasebuffer__(self, Py_buffer* buffer):
        pass


cdef ArrowErrorCode py_input_stream_read(ArrowIpcInputStream* stream, uint8_t* buf,
                                         int64_t buf_size_bytes, int64_t* size_read_out,
                                         ArrowError* error) noexcept nogil:

    with gil:
        stream_private = <object>stream.private_data
        stream_private.set_buffer(<uintptr_t>buf, buf_size_bytes)

        try:
            # Non-blocking streams may return None here, or buffered
            # wrappers of them may raise BufferedIOError
            read_result = stream_private.obj.readinto(stream_private)

            if read_result is None:
                size_read_out[0] = 0
                return EAGAIN
            else:
                size_read_out[0] = read_result
                return NANOARROW_OK
        except Exception as e:
            cls = type(e).__name__.encode()
            msg = str(e).encode()
            snprintf(
                error.message,
                sizeof(error.message),
                "%s: %s",
                <const char*>cls,
                <const char*>msg
            )
            return EIO

cdef void py_input_stream_release(ArrowIpcInputStream* stream) noexcept nogil:
    with gil:
        stream_private = <object>stream.private_data
        if stream_private.close_obj:
            stream_private.obj.close()

        Py_DECREF(stream_private)

    stream.private_data = NULL
    stream.release = NULL



cdef ArrowErrorCode py_output_stream_write(ArrowIpcOutputStream* stream, const void* buf,
                                           int64_t buf_size_bytes, int64_t* size_written_out,
                                           ArrowError* error) noexcept nogil:

    with gil:
        stream_private = <object>stream.private_data
        stream_private.set_buffer(<uintptr_t>buf, buf_size_bytes)

        try:
            # Non-blocking streams may return None here, or buffered
            # wrappers of them may raise BufferedIOError
            write_result = stream_private.obj.write(stream_private)

            # Non-blocking streams may return None here
            if write_result is None:
                size_written_out[0] = 0
                return EAGAIN
            else:
                size_written_out[0] = write_result
                return NANOARROW_OK
        except Exception as e:
            cls = type(e).__name__.encode()
            msg = str(e).encode()
            snprintf(
                error.message,
                sizeof(error.message),
                "%s: %s",
                <const char*>cls,
                <const char*>msg
            )
            return EIO

cdef void py_output_stream_release(ArrowIpcOutputStream* stream) noexcept nogil:
    with gil:
        stream_private = <object>stream.private_data
        if stream_private.close_obj:
            stream_private.obj.close()

        Py_DECREF(stream_private)

    stream.private_data = NULL
    stream.release = NULL


cdef class CIpcInputStream:
    cdef ArrowIpcInputStream _stream

    def __cinit__(self):
        self._stream.release = NULL

    def is_valid(self):
        return self._stream.release != NULL

    def __dealloc__(self):
        # Duplicating release() to avoid Python API calls in the deallocator
        if self._stream.release != NULL:
            self._stream.release(&self._stream)

    def release(self):
        if self._stream.release != NULL:
            self._stream.release(&self._stream)
            return True
        else:
            return False

    @staticmethod
    def from_readable(obj, close_obj=False):
        cdef CIpcInputStream stream = CIpcInputStream()
        cdef PyStreamPrivate private_data = PyStreamPrivate(
            obj,
            buffer_readonly=False,
            close_obj=close_obj
        )

        stream._stream.private_data = <PyObject*>private_data
        Py_INCREF(private_data)
        stream._stream.read = &py_input_stream_read
        stream._stream.release = &py_input_stream_release
        return stream


def init_array_stream(CIpcInputStream input_stream, uintptr_t out):
    cdef ArrowArrayStream* out_ptr = <ArrowArrayStream*>out

    # There are some options here that could be exposed at some point
    cdef int code = ArrowIpcArrayStreamReaderInit(out_ptr, &input_stream._stream, NULL)
    if code != NANOARROW_OK:
        raise RuntimeError(f"ArrowIpcArrayStreamReaderInit() failed with code [{code}]")


cdef class CIpcOutputStream:
    cdef ArrowIpcOutputStream _stream

    def __cinit__(self):
        self._stream.release = NULL

    def is_valid(self):
        return self._stream.release != NULL

    def __dealloc__(self):
        # Duplicating release() to avoid Python API calls in the deallocator
        if self._stream.release != NULL:
            self._stream.release(&self._stream)

    def release(self):
        if self._stream.release != NULL:
            self._stream.release(&self._stream)
            return True
        else:
            return False

    @staticmethod
    def from_writable(obj, close_obj=False):
        cdef CIpcOutputStream stream = CIpcOutputStream()
        cdef PyStreamPrivate private_data = PyStreamPrivate(
            obj,
            buffer_readonly=True,
            close_obj=close_obj
        )

        stream._stream.private_data = <PyObject*>private_data
        Py_INCREF(private_data)
        stream._stream.write = &py_output_stream_write
        stream._stream.release = &py_output_stream_release
        return stream


cdef class CIpcWriter:
    cdef ArrowIpcWriter _writer

    def __cinit__(self, CIpcOutputStream stream):
        self._writer.private_data = NULL
        if not stream.is_valid():
            raise ValueError("Can't create writer from released stream")

        cdef int code = ArrowIpcWriterInit(&self._writer, &stream._stream)
        Error.raise_error_not_ok("ArrowIpcWriterInit()", code)

    def is_valid(self):
        return self._writer.private_data != NULL

    def __dealloc__(self):
        if self._writer.private_data != NULL:
            ArrowIpcWriterReset(&self._writer)

    def release(self):
        if self._writer.private_data != NULL:
            ArrowIpcWriterReset(&self._writer)

    def write_schema(self, CSchema schema):
        cdef Error error = Error()
        cdef int code = ArrowIpcWriterWriteSchema(&self._writer, schema._ptr, &error.c_error)
        error.raise_message_not_ok("ArrowIpcWriterWriteSchema()", code)

    def write_array_view(self, CArrayView array_view):
        cdef Error error = Error()
        cdef int code = ArrowIpcWriterWriteArrayView(&self._writer, array_view._ptr, &error.c_error)
        error.raise_message_not_ok("ArrowIpcWriterWriteArrayView()", code)

    def write_array_stream(self, uintptr_t stream_addr):
        cdef ArrowArrayStream* array_stream = <ArrowArrayStream*>stream_addr
        cdef Error error = Error()
        cdef int code = ArrowIpcWriterWriteArrayStream(&self._writer, array_stream, &error.c_error)
        error.raise_message_not_ok("ArrowIpcWriterWriteArrayStream()", code)

    def write_end_of_stream(self):
        cdef Error error = Error()
        cdef int code = ArrowIpcWriterWriteArrayView(&self._writer, NULL, &error.c_error)
        error.raise_message_not_ok("ArrowIpcWriterWriteArrayView()", code)