Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

/ ipc.pxi

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from cpython.pycapsule cimport PyCapsule_CheckExact, PyCapsule_GetPointer, PyCapsule_New

from collections import namedtuple
import warnings
from cython import sizeof

cpdef enum MetadataVersion:
    V1 = <char> CMetadataVersion_V1
    V2 = <char> CMetadataVersion_V2
    V3 = <char> CMetadataVersion_V3
    V4 = <char> CMetadataVersion_V4
    V5 = <char> CMetadataVersion_V5


cdef object _wrap_metadata_version(CMetadataVersion version):
    return MetadataVersion(<char> version)


cdef CMetadataVersion _unwrap_metadata_version(
        MetadataVersion version) except *:
    if version == MetadataVersion.V1:
        return CMetadataVersion_V1
    elif version == MetadataVersion.V2:
        return CMetadataVersion_V2
    elif version == MetadataVersion.V3:
        return CMetadataVersion_V3
    elif version == MetadataVersion.V4:
        return CMetadataVersion_V4
    elif version == MetadataVersion.V5:
        return CMetadataVersion_V5
    raise ValueError("Not a metadata version: " + repr(version))


_WriteStats = namedtuple(
    'WriteStats',
    ('num_messages', 'num_record_batches', 'num_dictionary_batches',
     'num_dictionary_deltas', 'num_replaced_dictionaries'))


class WriteStats(_WriteStats):
    """IPC write statistics

    Parameters
    ----------
    num_messages : int
        Number of messages.
    num_record_batches : int
        Number of record batches.
    num_dictionary_batches : int
        Number of dictionary batches.
    num_dictionary_deltas : int
        Delta of dictionaries.
    num_replaced_dictionaries : int
        Number of replaced dictionaries.
    """
    __slots__ = ()


@staticmethod
cdef _wrap_write_stats(CIpcWriteStats c):
    return WriteStats(c.num_messages, c.num_record_batches,
                      c.num_dictionary_batches, c.num_dictionary_deltas,
                      c.num_replaced_dictionaries)


_ReadStats = namedtuple(
    'ReadStats',
    ('num_messages', 'num_record_batches', 'num_dictionary_batches',
     'num_dictionary_deltas', 'num_replaced_dictionaries'))


class ReadStats(_ReadStats):
    """IPC read statistics

    Parameters
    ----------
    num_messages : int
        Number of messages.
    num_record_batches : int
        Number of record batches.
    num_dictionary_batches : int
        Number of dictionary batches.
    num_dictionary_deltas : int
        Delta of dictionaries.
    num_replaced_dictionaries : int
        Number of replaced dictionaries.
    """
    __slots__ = ()


@staticmethod
cdef _wrap_read_stats(CIpcReadStats c):
    return ReadStats(c.num_messages, c.num_record_batches,
                     c.num_dictionary_batches, c.num_dictionary_deltas,
                     c.num_replaced_dictionaries)


cdef class IpcReadOptions(_Weakrefable):
    """
    Serialization options for reading IPC format.

    Parameters
    ----------
    ensure_native_endian : bool, default True
        Whether to convert incoming data to platform-native endianness.
    use_threads : bool
        Whether to use the global CPU thread pool to parallelize any
        computational tasks like decompression
    included_fields : list
        If empty (the default), return all deserialized fields.
        If non-empty, the values are the indices of fields to read on
        the top-level schema
    """
    __slots__ = ()

    # cdef block is in lib.pxd

    def __init__(self, *, bint ensure_native_endian=True,
                 bint use_threads=True, list included_fields=None):
        self.c_options = CIpcReadOptions.Defaults()
        self.ensure_native_endian = ensure_native_endian
        self.use_threads = use_threads
        if included_fields is not None:
            self.included_fields = included_fields

    @property
    def ensure_native_endian(self):
        return self.c_options.ensure_native_endian

    @ensure_native_endian.setter
    def ensure_native_endian(self, bint value):
        self.c_options.ensure_native_endian = value

    @property
    def use_threads(self):
        return self.c_options.use_threads

    @use_threads.setter
    def use_threads(self, bint value):
        self.c_options.use_threads = value

    @property
    def included_fields(self):
        return self.c_options.included_fields

    @included_fields.setter
    def included_fields(self, list value not None):
        self.c_options.included_fields = value


cdef class IpcWriteOptions(_Weakrefable):
    """
    Serialization options for the IPC format.

    Parameters
    ----------
    metadata_version : MetadataVersion, default MetadataVersion.V5
        The metadata version to write.  V5 is the current and latest,
        V4 is the pre-1.0 metadata version (with incompatible Union layout).
    allow_64bit : bool, default False
        If true, allow field lengths that don't fit in a signed 32-bit int.
    use_legacy_format : bool, default False
        Whether to use the pre-Arrow 0.15 IPC format.
    compression : str, Codec, or None
        compression codec to use for record batch buffers.
        If None then batch buffers will be uncompressed.
        Must be "lz4", "zstd" or None.
        To specify a compression_level use `pyarrow.Codec`
    use_threads : bool
        Whether to use the global CPU thread pool to parallelize any
        computational tasks like compression.
    emit_dictionary_deltas : bool
        Whether to emit dictionary deltas.  Default is false for maximum
        stream compatibility.
    unify_dictionaries : bool
        If true then calls to write_table will attempt to unify dictionaries
        across all batches in the table.  This can help avoid the need for
        replacement dictionaries (which the file format does not support)
        but requires computing the unified dictionary and then remapping
        the indices arrays.

        This parameter is ignored when writing to the IPC stream format as
        the IPC stream format can support replacement dictionaries.
    """
    __slots__ = ()

    # cdef block is in lib.pxd

    def __init__(self, *, metadata_version=MetadataVersion.V5,
                 bint allow_64bit=False, use_legacy_format=False,
                 compression=None, bint use_threads=True,
                 bint emit_dictionary_deltas=False,
                 bint unify_dictionaries=False):
        self.c_options = CIpcWriteOptions.Defaults()
        self.allow_64bit = allow_64bit
        self.use_legacy_format = use_legacy_format
        self.metadata_version = metadata_version
        if compression is not None:
            self.compression = compression
        self.use_threads = use_threads
        self.emit_dictionary_deltas = emit_dictionary_deltas
        self.unify_dictionaries = unify_dictionaries

    @property
    def allow_64bit(self):
        return self.c_options.allow_64bit

    @allow_64bit.setter
    def allow_64bit(self, bint value):
        self.c_options.allow_64bit = value

    @property
    def use_legacy_format(self):
        return self.c_options.write_legacy_ipc_format

    @use_legacy_format.setter
    def use_legacy_format(self, bint value):
        self.c_options.write_legacy_ipc_format = value

    @property
    def metadata_version(self):
        return _wrap_metadata_version(self.c_options.metadata_version)

    @metadata_version.setter
    def metadata_version(self, value):
        self.c_options.metadata_version = _unwrap_metadata_version(value)

    @property
    def compression(self):
        if self.c_options.codec == nullptr:
            return None
        else:
            return frombytes(self.c_options.codec.get().name())

    @compression.setter
    def compression(self, value):
        if value is None:
            self.c_options.codec.reset()
        elif isinstance(value, str):
            codec_type = _ensure_compression(value)
            if codec_type != CCompressionType_ZSTD and codec_type != CCompressionType_LZ4_FRAME:
                raise ValueError("Compression type must be lz4, zstd or None")
            self.c_options.codec = shared_ptr[CCodec](GetResultValue(
                CCodec.Create(codec_type)).release())
        elif isinstance(value, Codec):
            if value.name != "lz4" and value.name != "zstd":
                raise ValueError("Compression type must be lz4, zstd or None")
            self.c_options.codec = (<Codec>value).wrapped
        else:
            raise TypeError(
                "Property `compression` must be None, str, or pyarrow.Codec")

    @property
    def use_threads(self):
        return self.c_options.use_threads

    @use_threads.setter
    def use_threads(self, bint value):
        self.c_options.use_threads = value

    @property
    def emit_dictionary_deltas(self):
        return self.c_options.emit_dictionary_deltas

    @emit_dictionary_deltas.setter
    def emit_dictionary_deltas(self, bint value):
        self.c_options.emit_dictionary_deltas = value

    @property
    def unify_dictionaries(self):
        return self.c_options.unify_dictionaries

    @unify_dictionaries.setter
    def unify_dictionaries(self, bint value):
        self.c_options.unify_dictionaries = value


cdef class Message(_Weakrefable):
    """
    Container for an Arrow IPC message with metadata and optional body
    """

    def __cinit__(self):
        pass

    def __init__(self):
        raise TypeError("Do not call {}'s constructor directly, use "
                        "`pyarrow.ipc.read_message` function instead."
                        .format(self.__class__.__name__))

    @property
    def type(self):
        return frombytes(FormatMessageType(self.message.get().type()))

    @property
    def metadata(self):
        return pyarrow_wrap_buffer(self.message.get().metadata())

    @property
    def metadata_version(self):
        return _wrap_metadata_version(self.message.get().metadata_version())

    @property
    def body(self):
        cdef shared_ptr[CBuffer] body = self.message.get().body()
        if body.get() == NULL:
            return None
        else:
            return pyarrow_wrap_buffer(body)

    def equals(self, Message other):
        """
        Returns True if the message contents (metadata and body) are identical

        Parameters
        ----------
        other : Message

        Returns
        -------
        are_equal : bool
        """
        cdef c_bool result
        with nogil:
            result = self.message.get().Equals(deref(other.message.get()))
        return result

    def serialize_to(self, NativeFile sink, alignment=8, memory_pool=None):
Loading ...