Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / nanoarrow   python

Repository URL to install this package:

/ src / nanoarrow / _buffer.pyx

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: language_level = 3

from libc.stdint cimport uintptr_t, int8_t, uint8_t, uint16_t, int64_t
from libc.string cimport memcpy
from libc.stdio cimport snprintf
from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer, PyCapsule_IsValid
from cpython cimport (
    Py_buffer,
    PyObject_GetBuffer,
    PyBuffer_Release,
    PyBuffer_ToContiguous,
    PyBuffer_FillInfo,
    PyBUF_ANY_CONTIGUOUS,
    PyBUF_FORMAT,
    PyBUF_WRITABLE,
    PyErr_WriteUnraisable,
)
from cpython.ref cimport Py_INCREF, Py_DECREF

from nanoarrow_c cimport (
    NANOARROW_OK,
    ArrowMalloc,
    ArrowFree,
    ArrowType,
    ArrowTypeString,
    ArrowBitGet,
    ArrowBitsUnpackInt8,
    ArrowBufferReserve,
    ArrowBufferAppendFill,
    ArrowBufferAppendInt8,
    ArrowBitmapInit,
    ArrowBitmapReset,
    ArrowBitmap,
    ArrowBitmapReserve,
    ArrowBitmapAppend,
    ArrowBitmapAppendUnsafe,
    ArrowBuffer,
    ArrowBufferMove,
)

from nanoarrow_device_c cimport (
    ARROW_DEVICE_CPU,
    ARROW_DEVICE_CUDA,
    ArrowDevice,
)

from nanoarrow_dlpack cimport (
    DLDataType,
    DLDevice,
    DLDeviceType,
    DLManagedTensor,
    DLTensor,
    kDLCPU,
    kDLFloat,
    kDLInt,
    kDLUInt
)

from nanoarrow cimport _utils
from nanoarrow cimport _types
from nanoarrow._device cimport CSharedSyncEvent, Device

from struct import unpack_from, iter_unpack, calcsize, Struct

from nanoarrow import _repr_utils
from nanoarrow._device import DEVICE_CPU


cdef void pycapsule_dlpack_deleter(object dltensor) noexcept:
    cdef DLManagedTensor* dlm_tensor

    # Do nothing if the capsule has been consumed
    if PyCapsule_IsValid(dltensor, "used_dltensor"):
        return

    dlm_tensor = <DLManagedTensor*>PyCapsule_GetPointer(dltensor, 'dltensor')
    if dlm_tensor == NULL:
        PyErr_WriteUnraisable(dltensor)
    # The deleter can be NULL if there is no way for the caller
    # to provide a reasonable destructor
    elif dlm_tensor.deleter:
        dlm_tensor.deleter(dlm_tensor)


cdef void view_dlpack_deleter(DLManagedTensor* tensor) noexcept with gil:
    if tensor.manager_ctx is NULL:
        return
    Py_DECREF(<CBufferView>tensor.manager_ctx)
    tensor.manager_ctx = NULL
    ArrowFree(tensor)


cdef DLDataType view_to_dlpack_data_type(CBufferView view):
    cdef DLDataType dtype
    # Define DLDataType struct
    if _types.is_unsigned_integer(view.data_type_id):
        dtype.code = kDLUInt
    elif _types.is_signed_integer(view.data_type_id):
        dtype.code = kDLInt
    elif _types.is_floating_point(view.data_type_id):
        dtype.code = kDLFloat
    elif _types.equal(view.data_type_id, _types.BOOL):
        raise ValueError('Bit-packed boolean data type not supported by DLPack.')
    else:
        raise ValueError('DataType is not compatible with DLPack spec: ' + view.data_type)
    dtype.lanes = <uint16_t>1
    dtype.bits = <uint8_t>(view._element_size_bits)

    return dtype

cdef int dlpack_data_type_to_arrow(DLDataType dtype):
    if dtype.code == kDLInt:
        if dtype.bits == 8:
            return _types.INT8
        elif dtype.bits == 16:
            return _types.INT16
        elif dtype.bits == 32:
            return _types.INT32
        elif dtype.bits == 64:
            return _types.INT64
    elif dtype.code == kDLUInt:
        if dtype.bits == 8:
            return _types.UINT8
        elif dtype.bits == 16:
            return _types.UINT16
        elif dtype.bits == 32:
            return _types.UINT32
        elif dtype.bits == 64:
            return _types.UINT64
    elif dtype.code == kDLFloat:
        if dtype.bits == 16:
            return _types.HALF_FLOAT
        elif dtype.bits == 32:
            return _types.FLOAT
        elif dtype.bits == 64:
            return _types.DOUBLE

    raise ValueError("Can't convert dlpack data type to Arrow type")

cdef object view_to_dlpack(CBufferView view, stream=None):
    # Define DLDevice and DLDataType struct and
    # with that check for data type support first
    cdef DLDevice device = view_to_dlpack_device(view)
    cdef DLDataType dtype = view_to_dlpack_data_type(view)

    # Allocate memory for DLManagedTensor
    cdef DLManagedTensor* dlm_tensor = <DLManagedTensor*>ArrowMalloc(sizeof(DLManagedTensor))
    # Define DLManagedTensor struct
    cdef DLTensor* dl_tensor = &dlm_tensor.dl_tensor
    dl_tensor.data = <void*>view._ptr.data.data
    dl_tensor.ndim = 1

    dl_tensor.shape = &view._n_elements
    dl_tensor.strides = NULL
    dl_tensor.byte_offset = 0

    dl_tensor.device = device
    dl_tensor.dtype = dtype

    dlm_tensor.manager_ctx = <void*>view
    Py_INCREF(view)
    dlm_tensor.deleter = view_dlpack_deleter

    # stream has a DLPack + device specific interpretation

    # nanoarrow_device needs a CUstream* (i.e., a CUstream_st**), but dlpack
    # gives us a CUstream_st*.
    cdef void* cuda_pstream

    if view._event.device is DEVICE_CPU:
        if stream is not None and stream != -1:
            raise ValueError("dlpack stream must be None or -1 for the CPU device")
    elif view._event.device.device_type_id == ARROW_DEVICE_CUDA:
        if stream == 0:
            raise ValueError("dlpack stream value of 0 is not permitted for CUDA")
        elif stream == -1:
            # Sentinel for "do not synchronize"
            pass
        elif stream in (1, 2):
            # Technically we are mixing the per-thread and legacy default streams here;
            # however, the nanoarrow_device API currently has no mechanism to expose
            # a pointer to these streams specifically.
            cuda_pstream = <void*>0
            view._event.synchronize_stream(<uintptr_t>&cuda_pstream)
        else:
            # Otherwise, this is a CUstream** (i.e., CUstream_st*)
            cuda_pstream = <void*><uintptr_t>stream
            view._event.synchronize_stream(<uintptr_t>&cuda_pstream)

    return PyCapsule_New(dlm_tensor, 'dltensor', pycapsule_dlpack_deleter)


cdef DLDevice view_to_dlpack_device(CBufferView view):
    cdef DLDevice device

    # Check data type support
    if _types.equal(view.data_type_id, _types.BOOL):
        raise ValueError('Bit-packed boolean data type not supported by DLPack.')
    elif (
        not _types.is_unsigned_integer(view.data_type_id)
        and not _types.is_signed_integer(view.data_type_id)
        and not _types.is_floating_point(view.data_type_id)
    ):
        raise ValueError('DataType is not compatible with DLPack spec: ' + view.data_type)

    # Define DLDevice struct
    cdef ArrowDevice* arrow_device = view._event.device._ptr
    if arrow_device.device_type is ARROW_DEVICE_CPU:
        # DLPack uses 0 for the CPU device id where Arrow uses -1
        device.device_type = kDLCPU
        device.device_id =  0
    else:
        # Otherwise, Arrow's device identifiers and types are intentionally
        # identical to DLPack
        device.device_type = <DLDeviceType>arrow_device.device_type
        device.device_id = arrow_device.device_id

    return device


cdef bint dlpack_strides_are_contiguous(DLTensor* dl_tensor):
    if dl_tensor.strides == NULL:
        return True

    if dl_tensor.ndim != 1:
        raise NotImplementedError("Contiguous stride check not implemented for ndim != 1")

    # DLTensor strides are in elemements, not bytes
    return dl_tensor.strides[0] == 1


cdef class CBufferView:
    """Wrapper for Array buffer content

    This object is a Python wrapper around a buffer held by an Array.
    It implements the Python buffer protocol and is best accessed through
    another implementor (e.g., `np.array(array_view.buffers[1])`)). Note that
    this buffer content does not apply any parent offset.
    """

    def __cinit__(self, object base, uintptr_t addr, int64_t size_bytes,
                  ArrowType data_type,
                  Py_ssize_t element_size_bits, CSharedSyncEvent event):
        self._base = base
        self._ptr.data.data = <void*>addr
        self._ptr.size_bytes = size_bytes
        self._data_type = data_type
        self._event = event
        self._format[0] = 0
        self._element_size_bits = _types.to_format(
            self._data_type,
            element_size_bits,
            sizeof(self._format),
            self._format
        )
        self._strides = self._item_size()
        self._shape = self._ptr.size_bytes // self._strides
        if _types.equal(self._data_type, _types.BOOL):
            self._n_elements = self._shape * 8
        else:
            self._n_elements = self._shape

    def _addr(self):
        return <uintptr_t>self._ptr.data.data

    @property
    def device(self):
        return self._event.device

    @property
    def element_size_bits(self):
        return self._element_size_bits

    @property
    def size_bytes(self):
        return self._ptr.size_bytes

    @property
    def data_type_id(self):
        return self._data_type

    @property
    def data_type(self):
        return ArrowTypeString(self._data_type).decode("UTF-8")

    @property
    def format(self):
        return self._format.decode("UTF-8")

    @property
    def itemsize(self):
        return self._strides

    def __len__(self):
        return self._shape

    def __getitem__(self, int64_t i):
        if i < 0 or i >= self._shape:
            raise IndexError(f"Index {i} out of range")
        cdef int64_t offset = self._strides * i
        value = unpack_from(self.format, buffer=self, offset=offset)
        if len(value) == 1:
            return value[0]
        else:
            return value

    def __iter__(self):
        return self._iter_dispatch(0, len(self))

    def _iter_dispatch(self, int64_t offset, int64_t length):
        if offset < 0 or length < 0 or (offset + length) > len(self):
            raise IndexError(
                f"offset {offset} and length {length} do not describe a valid slice "
                f"of buffer with length {len(self)}"
            )
        # memoryview's implementation is very fast but not always possible (half float, fixed-size binary, interval)
        if _types.one_of(
            self._data_type,
            (
                _types.HALF_FLOAT,
                _types.INTERVAL_DAY_TIME,
                _types.INTERVAL_MONTH_DAY_NANO,
                _types.DECIMAL128,
                _types.DECIMAL256
            )
        ) or (
            _types.equal(self._data_type, _types.BINARY) and self._element_size_bits != 0
        ):
Loading ...