# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: language_level = 3
from libc.stdint cimport int32_t, int64_t, uintptr_t
from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_AsString, PyBytes_Size
from cpython.pycapsule cimport PyCapsule_GetPointer
from nanoarrow_c cimport (
ARROW_FLAG_DICTIONARY_ORDERED,
ARROW_FLAG_MAP_KEYS_SORTED,
ARROW_FLAG_NULLABLE,
ArrowFree,
ArrowLayout,
ArrowMalloc,
ArrowMetadataBuilderAppend,
ArrowMetadataBuilderInit,
ArrowMetadataReaderInit,
ArrowMetadataReaderRead,
ArrowSchema,
ArrowSchemaAllocateChildren,
ArrowSchemaAllocateDictionary,
ArrowSchemaDeepCopy,
ArrowSchemaInit,
ArrowSchemaMove,
ArrowSchemaRelease,
ArrowSchemaSetMetadata,
ArrowSchemaSetType,
ArrowSchemaSetTypeDateTime,
ArrowSchemaSetTypeDecimal,
ArrowSchemaSetTypeFixedSize,
ArrowSchemaSetFormat,
ArrowSchemaSetName,
ArrowSchemaToString,
ArrowSchemaViewInit,
ArrowStringView,
ArrowTimeUnit,
ArrowTimeUnitString,
ArrowType,
ArrowTypeString,
NANOARROW_BUFFER_TYPE_NONE,
NANOARROW_MAX_FIXED_BUFFERS,
NANOARROW_TIME_UNIT_SECOND,
NANOARROW_TIME_UNIT_MILLI,
NANOARROW_TIME_UNIT_MICRO,
NANOARROW_TIME_UNIT_NANO,
)
from nanoarrow cimport _types
from nanoarrow._buffer cimport CBuffer
from nanoarrow._utils cimport alloc_c_schema, Error
from typing import Iterable, List, Mapping, Tuple, Union
from nanoarrow import _repr_utils
# This is likely a better fit for a dedicated testing module; however, we need
# it here to produce nice error messages when ensuring that one or
# more arrays conform to a given or inferred schema.
cpdef assert_type_equal(actual, expected, bint check_nullability):
"""Test two schemas for data type equality
Checks two CSchema objects for type equality (i.e., that an array with
schema ``actual`` contains elements with the same logical meaning as and
array with schema ``expected``). Notably, this excludes metadata from
all nodes in the schema.
Parameters
----------
actual : CSchema
The schema to be tested for equality
expected : CSchema
The schema against which to test
check_nullability : bool
If True, actual and expected will be considered equal if their
data type information and marked nullability are identical.
"""
if not isinstance(actual, CSchema):
raise TypeError(f"actual is {type(actual).__name__}, not CSchema")
if not isinstance(expected, CSchema):
raise TypeError(f"expected is {type(expected).__name__}, not CSchema")
if not actual.type_equals(expected, check_nullability=check_nullability):
actual_label = actual._to_string(max_chars=80, recursive=True)
expected_label = expected._to_string(max_chars=80, recursive=True)
raise ValueError(
f"Expected schema\n '{expected_label}'"
f"\nbut got\n '{actual_label}'"
)
cdef class CArrowTimeUnit:
"""
Wrapper around ArrowTimeUnit to provide implementations in Python access
to the values.
"""
SECOND = NANOARROW_TIME_UNIT_SECOND
MILLI = NANOARROW_TIME_UNIT_MILLI
MICRO = NANOARROW_TIME_UNIT_MICRO
NANO = NANOARROW_TIME_UNIT_NANO
cdef class CLayout:
"""Abstract buffer information for Arrow types
Provides accessors for buffer counts, types, and attributes.
"""
def __cinit__(self, base, uintptr_t ptr):
self._base = base
self._layout = <ArrowLayout*>ptr
self._n_buffers = NANOARROW_MAX_FIXED_BUFFERS
for i in range(NANOARROW_MAX_FIXED_BUFFERS):
if self._layout.buffer_type[i] == NANOARROW_BUFFER_TYPE_NONE:
self._n_buffers = i
break
@property
def n_buffers(self) -> int:
return self._n_buffers
@property
def buffer_data_type_id(self) -> int:
return tuple(self._layout.buffer_data_type[i] for i in range(self._n_buffers))
@property
def element_size_bits(self) -> int:
return tuple(self._layout.element_size_bits[i] for i in range(self._n_buffers))
@property
def child_size_elements(self) -> int:
return self._layout.child_size_elements
cdef class SchemaMetadata:
"""Dictionary-like wrapper around a lazily-parsed CSchema.metadata string
The Arrow C Data interface encodes key/value metadata as a bytes-to-bytes
mapping using a specific packed binary encoding. This class maintains a
reference to the underlying storage and parses it as required. Note that
unlike a Python dictionary, ``SchemaMetadata`` can contain duplicate
keys.
"""
def __cinit__(self, object base, uintptr_t ptr):
self._base = base
self._metadata = <const char*>ptr
@staticmethod
def empty():
"""Create an empty SchemaMetadata with no keys or values"""
return SchemaMetadata(None, 0)
cdef _init_reader(self):
cdef int code = ArrowMetadataReaderInit(&self._reader, self._metadata)
Error.raise_error_not_ok("ArrowMetadataReaderInit()", code)
def __len__(self):
self._init_reader()
return self._reader.remaining_keys
def __contains__(self, item):
for key, _ in self.items():
if item == key:
return True
return False
def __getitem__(self, k) -> bytes:
"""Get the value associated with a unique key
Retrieves the unique value associated with k. Raises KeyError if
k does not point to exactly one value in the metadata.
"""
out = None
for key, value in self.items():
if k == key:
if out is None:
out = value
else:
raise KeyError(f"key {k} matches more than one value in metadata")
if out is None:
raise KeyError(f"Key {k} not found")
return out
def __iter__(self):
for key, _ in self.items():
yield key
def keys(self) -> List[bytes]:
"""List meadata keys
The result may contain duplicate keys if they exist in the metadata.
"""
return list(self)
def values(self) -> List[bytes]:
"""List metadata values"""
return [value for _, value in self.items()]
def items(self) -> Iterable[bytes, bytes]:
"""Iterate over key/value pairs
The result may contain duplicate keys if they exist in the metadata."""
cdef ArrowStringView key
cdef ArrowStringView value
self._init_reader()
while self._reader.remaining_keys > 0:
ArrowMetadataReaderRead(&self._reader, &key, &value)
key_obj = PyBytes_FromStringAndSize(key.data, key.size_bytes)
value_obj = PyBytes_FromStringAndSize(value.data, value.size_bytes)
yield key_obj, value_obj
def __repr__(self) -> str:
lines = [
f"<{_repr_utils.make_class_label(self)}>",
_repr_utils.metadata_repr(self)
]
return "\n".join(lines)
cdef class CSchema:
"""Low-level ArrowSchema wrapper
This object is a literal wrapper around a read-only ArrowSchema. It provides field accessors
that return Python objects and handles the C Data interface lifecycle (i.e., initialized
ArrowSchema structures are always released).
See ``nanoarrow.c_schema()`` for construction and usage examples.
"""
@staticmethod
def allocate() -> CSchema:
"""Allocate a released CSchema"""
cdef ArrowSchema* c_schema_out
base = alloc_c_schema(&c_schema_out)
return CSchema(base, <uintptr_t>(c_schema_out))
def __cinit__(self, object base, uintptr_t addr):
self._base = base
self._ptr = <ArrowSchema*>addr
def __deepcopy__(self, memo=None) -> CSchema:
cdef CSchema out = CSchema.allocate()
cdef int code = ArrowSchemaDeepCopy(self._ptr, out._ptr)
Error.raise_error_not_ok("ArrowSchemaDeepCopy()", code)
return out
@staticmethod
def _import_from_c_capsule(schema_capsule) -> CSchema:
"""Import from a ArrowSchema PyCapsule
Parameters
----------
schema_capsule : PyCapsule
A valid PyCapsule with name 'arrow_schema' containing an
ArrowSchema pointer.
"""
return CSchema(
schema_capsule,
<uintptr_t>PyCapsule_GetPointer(schema_capsule, "arrow_schema")
)
def __arrow_c_schema__(self):
"""
Export to a ArrowSchema PyCapsule
"""
self._assert_valid()
cdef ArrowSchema* c_schema_out
schema_capsule = alloc_c_schema(&c_schema_out)
cdef int code = ArrowSchemaDeepCopy(self._ptr, c_schema_out)
Error.raise_error_not_ok("ArrowSchemaDeepCopy", code)
return schema_capsule
@property
def _capsule(self):
"""
Returns the capsule backing this CSchema or None if it does not exist
or points to a parent ArrowSchema.
"""
cdef ArrowSchema* maybe_capsule_ptr
maybe_capsule_ptr = <ArrowSchema*>PyCapsule_GetPointer(self._base, 'arrow_schema')
# This will return False if this is a child CSchema whose capsule holds
# the parent ArrowSchema
if maybe_capsule_ptr == self._ptr:
return self._base
return None
def _addr(self) -> int:
return <uintptr_t>self._ptr
def is_valid(self) -> bool:
"""Check for a non-null and non-released underlying ArrowSchema"""
return self._ptr != NULL and self._ptr.release != NULL
def _assert_valid(self):
if self._ptr == NULL:
raise RuntimeError("schema is NULL")
if self._ptr.release == NULL:
raise RuntimeError("schema is released")
def _to_string(self, int64_t max_chars=0, recursive=False) -> str:
cdef int64_t n_chars
if max_chars == 0:
n_chars = ArrowSchemaToString(self._ptr, NULL, 0, recursive)
else:
n_chars = max_chars
cdef char* out = <char*>ArrowMalloc(n_chars + 1)
if not out:
raise MemoryError()
ArrowSchemaToString(self._ptr, out, n_chars + 1, recursive)
out_str = out.decode("UTF-8")
ArrowFree(out)
return out_str
Loading ...