Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / nanoarrow   python

Repository URL to install this package:

/ src / nanoarrow / _schema.pyx

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: language_level = 3

from libc.stdint cimport int32_t, int64_t, uintptr_t
from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_AsString, PyBytes_Size
from cpython.pycapsule cimport PyCapsule_GetPointer

from nanoarrow_c cimport (
    ARROW_FLAG_DICTIONARY_ORDERED,
    ARROW_FLAG_MAP_KEYS_SORTED,
    ARROW_FLAG_NULLABLE,
    ArrowFree,
    ArrowLayout,
    ArrowMalloc,
    ArrowMetadataBuilderAppend,
    ArrowMetadataBuilderInit,
    ArrowMetadataReaderInit,
    ArrowMetadataReaderRead,
    ArrowSchema,
    ArrowSchemaAllocateChildren,
    ArrowSchemaAllocateDictionary,
    ArrowSchemaDeepCopy,
    ArrowSchemaInit,
    ArrowSchemaMove,
    ArrowSchemaRelease,
    ArrowSchemaSetMetadata,
    ArrowSchemaSetType,
    ArrowSchemaSetTypeDateTime,
    ArrowSchemaSetTypeDecimal,
    ArrowSchemaSetTypeFixedSize,
    ArrowSchemaSetFormat,
    ArrowSchemaSetName,
    ArrowSchemaToString,
    ArrowSchemaViewInit,
    ArrowStringView,
    ArrowTimeUnit,
    ArrowTimeUnitString,
    ArrowType,
    ArrowTypeString,
    NANOARROW_BUFFER_TYPE_NONE,
    NANOARROW_MAX_FIXED_BUFFERS,
    NANOARROW_TIME_UNIT_SECOND,
    NANOARROW_TIME_UNIT_MILLI,
    NANOARROW_TIME_UNIT_MICRO,
    NANOARROW_TIME_UNIT_NANO,
)

from nanoarrow cimport _types
from nanoarrow._buffer cimport CBuffer
from nanoarrow._utils cimport alloc_c_schema, Error

from typing import Iterable, List, Mapping, Tuple, Union

from nanoarrow import _repr_utils


# This is likely a better fit for a dedicated testing module; however, we need
# it here to produce nice error messages when ensuring that one or
# more arrays conform to a given or inferred schema.
cpdef assert_type_equal(actual, expected, bint check_nullability):
    """Test two schemas for data type equality

    Checks two CSchema objects for type equality (i.e., that an array with
    schema ``actual`` contains elements with the same logical meaning as and
    array with schema ``expected``). Notably, this excludes metadata from
    all nodes in the schema.

    Parameters
    ----------
    actual : CSchema
        The schema to be tested for equality
    expected : CSchema
        The schema against which to test
    check_nullability : bool
        If True, actual and expected will be considered equal if their
        data type information and marked nullability are identical.
    """
    if not isinstance(actual, CSchema):
        raise TypeError(f"actual is {type(actual).__name__}, not CSchema")

    if not isinstance(expected, CSchema):
        raise TypeError(f"expected is {type(expected).__name__}, not CSchema")

    if not actual.type_equals(expected, check_nullability=check_nullability):
        actual_label = actual._to_string(max_chars=80, recursive=True)
        expected_label = expected._to_string(max_chars=80, recursive=True)
        raise ValueError(
            f"Expected schema\n  '{expected_label}'"
            f"\nbut got\n  '{actual_label}'"
        )


cdef class CArrowTimeUnit:
    """
    Wrapper around ArrowTimeUnit to provide implementations in Python access
    to the values.
    """

    SECOND = NANOARROW_TIME_UNIT_SECOND
    MILLI = NANOARROW_TIME_UNIT_MILLI
    MICRO = NANOARROW_TIME_UNIT_MICRO
    NANO = NANOARROW_TIME_UNIT_NANO


cdef class CLayout:
    """Abstract buffer information for Arrow types

    Provides accessors for buffer counts, types, and attributes.
    """

    def __cinit__(self, base, uintptr_t ptr):
        self._base = base
        self._layout = <ArrowLayout*>ptr

        self._n_buffers = NANOARROW_MAX_FIXED_BUFFERS
        for i in range(NANOARROW_MAX_FIXED_BUFFERS):
            if self._layout.buffer_type[i] == NANOARROW_BUFFER_TYPE_NONE:
                self._n_buffers = i
                break

    @property
    def n_buffers(self) -> int:
        return self._n_buffers

    @property
    def buffer_data_type_id(self) -> int:
        return tuple(self._layout.buffer_data_type[i] for i in range(self._n_buffers))

    @property
    def element_size_bits(self) -> int:
        return tuple(self._layout.element_size_bits[i] for i in range(self._n_buffers))

    @property
    def child_size_elements(self) -> int:
        return self._layout.child_size_elements


cdef class SchemaMetadata:
    """Dictionary-like wrapper around a lazily-parsed CSchema.metadata string

    The Arrow C Data interface encodes key/value metadata as a bytes-to-bytes
    mapping using a specific packed binary encoding. This class maintains a
    reference to the underlying storage and parses it as required. Note that
    unlike a Python dictionary, ``SchemaMetadata`` can contain duplicate
    keys.
    """

    def __cinit__(self, object base, uintptr_t ptr):
        self._base = base
        self._metadata = <const char*>ptr

    @staticmethod
    def empty():
        """Create an empty SchemaMetadata with no keys or values"""
        return SchemaMetadata(None, 0)

    cdef _init_reader(self):
        cdef int code = ArrowMetadataReaderInit(&self._reader, self._metadata)
        Error.raise_error_not_ok("ArrowMetadataReaderInit()", code)

    def __len__(self):
        self._init_reader()
        return self._reader.remaining_keys

    def __contains__(self, item):
        for key, _ in self.items():
            if item == key:
                return True

        return False

    def __getitem__(self, k) -> bytes:
        """Get the value associated with a unique key

        Retrieves the unique value associated with k. Raises KeyError if
        k does not point to exactly one value in the metadata.
        """
        out = None

        for key, value in self.items():
            if k == key:
                if out is None:
                    out = value
                else:
                    raise KeyError(f"key {k} matches more than one value in metadata")

        if out is None:
            raise KeyError(f"Key {k} not found")

        return out

    def __iter__(self):
        for key, _ in self.items():
            yield key

    def keys(self) -> List[bytes]:
        """List meadata keys

        The result may contain duplicate keys if they exist in the metadata.
        """
        return list(self)

    def values(self) -> List[bytes]:
        """List metadata values"""
        return [value for _, value in self.items()]

    def items(self) -> Iterable[bytes, bytes]:
        """Iterate over key/value pairs

        The result may contain duplicate keys if they exist in the metadata."""
        cdef ArrowStringView key
        cdef ArrowStringView value
        self._init_reader()
        while self._reader.remaining_keys > 0:
            ArrowMetadataReaderRead(&self._reader, &key, &value)
            key_obj = PyBytes_FromStringAndSize(key.data, key.size_bytes)
            value_obj = PyBytes_FromStringAndSize(value.data, value.size_bytes)
            yield key_obj, value_obj

    def __repr__(self) -> str:
        lines = [
            f"<{_repr_utils.make_class_label(self)}>",
            _repr_utils.metadata_repr(self)
        ]
        return "\n".join(lines)


cdef class CSchema:
    """Low-level ArrowSchema wrapper

    This object is a literal wrapper around a read-only ArrowSchema. It provides field accessors
    that return Python objects and handles the C Data interface lifecycle (i.e., initialized
    ArrowSchema structures are always released).

    See ``nanoarrow.c_schema()`` for construction and usage examples.
    """

    @staticmethod
    def allocate() -> CSchema:
        """Allocate a released CSchema"""
        cdef ArrowSchema* c_schema_out
        base = alloc_c_schema(&c_schema_out)
        return CSchema(base, <uintptr_t>(c_schema_out))

    def __cinit__(self, object base, uintptr_t addr):
        self._base = base
        self._ptr = <ArrowSchema*>addr

    def __deepcopy__(self, memo=None) -> CSchema:
        cdef CSchema out = CSchema.allocate()
        cdef int code = ArrowSchemaDeepCopy(self._ptr, out._ptr)
        Error.raise_error_not_ok("ArrowSchemaDeepCopy()", code)

        return out

    @staticmethod
    def _import_from_c_capsule(schema_capsule) -> CSchema:
        """Import from a ArrowSchema PyCapsule

        Parameters
        ----------
        schema_capsule : PyCapsule
            A valid PyCapsule with name 'arrow_schema' containing an
            ArrowSchema pointer.
        """
        return CSchema(
            schema_capsule,
            <uintptr_t>PyCapsule_GetPointer(schema_capsule, "arrow_schema")
        )

    def __arrow_c_schema__(self):
        """
        Export to a ArrowSchema PyCapsule
        """
        self._assert_valid()

        cdef ArrowSchema* c_schema_out
        schema_capsule = alloc_c_schema(&c_schema_out)

        cdef int code = ArrowSchemaDeepCopy(self._ptr, c_schema_out)
        Error.raise_error_not_ok("ArrowSchemaDeepCopy", code)
        return schema_capsule

    @property
    def _capsule(self):
        """
        Returns the capsule backing this CSchema or None if it does not exist
        or points to a parent ArrowSchema.
        """
        cdef ArrowSchema* maybe_capsule_ptr
        maybe_capsule_ptr = <ArrowSchema*>PyCapsule_GetPointer(self._base, 'arrow_schema')

        # This will return False if this is a child CSchema whose capsule holds
        # the parent ArrowSchema
        if maybe_capsule_ptr == self._ptr:
            return self._base

        return None

    def _addr(self) -> int:
        return <uintptr_t>self._ptr

    def is_valid(self) -> bool:
        """Check for a non-null and non-released underlying ArrowSchema"""
        return self._ptr != NULL and self._ptr.release != NULL

    def _assert_valid(self):
        if self._ptr == NULL:
            raise RuntimeError("schema is NULL")
        if self._ptr.release == NULL:
            raise RuntimeError("schema is released")

    def _to_string(self, int64_t max_chars=0, recursive=False) -> str:
        cdef int64_t n_chars
        if max_chars == 0:
            n_chars = ArrowSchemaToString(self._ptr, NULL, 0, recursive)
        else:
            n_chars = max_chars

        cdef char* out = <char*>ArrowMalloc(n_chars + 1)
        if not out:
            raise MemoryError()

        ArrowSchemaToString(self._ptr, out, n_chars + 1, recursive)
        out_str = out.decode("UTF-8")
        ArrowFree(out)

        return out_str
Loading ...