# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: language_level = 3
from libc.stdint cimport uintptr_t, int8_t, uint8_t, uint16_t, int64_t
from libc.string cimport memcpy
from libc.stdio cimport snprintf
from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer, PyCapsule_IsValid
from cpython cimport (
Py_buffer,
PyObject_GetBuffer,
PyBuffer_Release,
PyBuffer_ToContiguous,
PyBuffer_FillInfo,
PyBUF_ANY_CONTIGUOUS,
PyBUF_FORMAT,
PyBUF_WRITABLE,
PyErr_WriteUnraisable,
)
from cpython.ref cimport Py_INCREF, Py_DECREF
from nanoarrow_c cimport (
NANOARROW_OK,
ArrowMalloc,
ArrowFree,
ArrowType,
ArrowTypeString,
ArrowBitGet,
ArrowBitsUnpackInt8,
ArrowBufferReserve,
ArrowBufferAppendFill,
ArrowBufferAppendInt8,
ArrowBitmapInit,
ArrowBitmapReset,
ArrowBitmap,
ArrowBitmapReserve,
ArrowBitmapAppend,
ArrowBitmapAppendUnsafe,
ArrowBuffer,
ArrowBufferMove,
)
from nanoarrow_device_c cimport (
ARROW_DEVICE_CPU,
ARROW_DEVICE_CUDA,
ArrowDevice,
)
from nanoarrow_dlpack cimport (
DLDataType,
DLDevice,
DLDeviceType,
DLManagedTensor,
DLTensor,
kDLCPU,
kDLFloat,
kDLInt,
kDLUInt
)
from nanoarrow cimport _utils
from nanoarrow cimport _types
from nanoarrow._device cimport CSharedSyncEvent, Device
from struct import unpack_from, iter_unpack, calcsize, Struct
from nanoarrow import _repr_utils
from nanoarrow._device import DEVICE_CPU
cdef void pycapsule_dlpack_deleter(object dltensor) noexcept:
cdef DLManagedTensor* dlm_tensor
# Do nothing if the capsule has been consumed
if PyCapsule_IsValid(dltensor, "used_dltensor"):
return
dlm_tensor = <DLManagedTensor*>PyCapsule_GetPointer(dltensor, 'dltensor')
if dlm_tensor == NULL:
PyErr_WriteUnraisable(dltensor)
# The deleter can be NULL if there is no way for the caller
# to provide a reasonable destructor
elif dlm_tensor.deleter:
dlm_tensor.deleter(dlm_tensor)
cdef void view_dlpack_deleter(DLManagedTensor* tensor) noexcept with gil:
if tensor.manager_ctx is NULL:
return
Py_DECREF(<CBufferView>tensor.manager_ctx)
tensor.manager_ctx = NULL
ArrowFree(tensor)
cdef DLDataType view_to_dlpack_data_type(CBufferView view):
cdef DLDataType dtype
# Define DLDataType struct
if _types.is_unsigned_integer(view.data_type_id):
dtype.code = kDLUInt
elif _types.is_signed_integer(view.data_type_id):
dtype.code = kDLInt
elif _types.is_floating_point(view.data_type_id):
dtype.code = kDLFloat
elif _types.equal(view.data_type_id, _types.BOOL):
raise ValueError('Bit-packed boolean data type not supported by DLPack.')
else:
raise ValueError('DataType is not compatible with DLPack spec: ' + view.data_type)
dtype.lanes = <uint16_t>1
dtype.bits = <uint8_t>(view._element_size_bits)
return dtype
cdef int dlpack_data_type_to_arrow(DLDataType dtype):
if dtype.code == kDLInt:
if dtype.bits == 8:
return _types.INT8
elif dtype.bits == 16:
return _types.INT16
elif dtype.bits == 32:
return _types.INT32
elif dtype.bits == 64:
return _types.INT64
elif dtype.code == kDLUInt:
if dtype.bits == 8:
return _types.UINT8
elif dtype.bits == 16:
return _types.UINT16
elif dtype.bits == 32:
return _types.UINT32
elif dtype.bits == 64:
return _types.UINT64
elif dtype.code == kDLFloat:
if dtype.bits == 16:
return _types.HALF_FLOAT
elif dtype.bits == 32:
return _types.FLOAT
elif dtype.bits == 64:
return _types.DOUBLE
raise ValueError("Can't convert dlpack data type to Arrow type")
cdef object view_to_dlpack(CBufferView view, stream=None):
# Define DLDevice and DLDataType struct and
# with that check for data type support first
cdef DLDevice device = view_to_dlpack_device(view)
cdef DLDataType dtype = view_to_dlpack_data_type(view)
# Allocate memory for DLManagedTensor
cdef DLManagedTensor* dlm_tensor = <DLManagedTensor*>ArrowMalloc(sizeof(DLManagedTensor))
# Define DLManagedTensor struct
cdef DLTensor* dl_tensor = &dlm_tensor.dl_tensor
dl_tensor.data = <void*>view._ptr.data.data
dl_tensor.ndim = 1
dl_tensor.shape = &view._n_elements
dl_tensor.strides = NULL
dl_tensor.byte_offset = 0
dl_tensor.device = device
dl_tensor.dtype = dtype
dlm_tensor.manager_ctx = <void*>view
Py_INCREF(view)
dlm_tensor.deleter = view_dlpack_deleter
# stream has a DLPack + device specific interpretation
# nanoarrow_device needs a CUstream* (i.e., a CUstream_st**), but dlpack
# gives us a CUstream_st*.
cdef void* cuda_pstream
if view._event.device is DEVICE_CPU:
if stream is not None and stream != -1:
raise ValueError("dlpack stream must be None or -1 for the CPU device")
elif view._event.device.device_type_id == ARROW_DEVICE_CUDA:
if stream == 0:
raise ValueError("dlpack stream value of 0 is not permitted for CUDA")
elif stream == -1:
# Sentinel for "do not synchronize"
pass
elif stream in (1, 2):
# Technically we are mixing the per-thread and legacy default streams here;
# however, the nanoarrow_device API currently has no mechanism to expose
# a pointer to these streams specifically.
cuda_pstream = <void*>0
view._event.synchronize_stream(<uintptr_t>&cuda_pstream)
else:
# Otherwise, this is a CUstream** (i.e., CUstream_st*)
cuda_pstream = <void*><uintptr_t>stream
view._event.synchronize_stream(<uintptr_t>&cuda_pstream)
return PyCapsule_New(dlm_tensor, 'dltensor', pycapsule_dlpack_deleter)
cdef DLDevice view_to_dlpack_device(CBufferView view):
cdef DLDevice device
# Check data type support
if _types.equal(view.data_type_id, _types.BOOL):
raise ValueError('Bit-packed boolean data type not supported by DLPack.')
elif (
not _types.is_unsigned_integer(view.data_type_id)
and not _types.is_signed_integer(view.data_type_id)
and not _types.is_floating_point(view.data_type_id)
):
raise ValueError('DataType is not compatible with DLPack spec: ' + view.data_type)
# Define DLDevice struct
cdef ArrowDevice* arrow_device = view._event.device._ptr
if arrow_device.device_type is ARROW_DEVICE_CPU:
# DLPack uses 0 for the CPU device id where Arrow uses -1
device.device_type = kDLCPU
device.device_id = 0
else:
# Otherwise, Arrow's device identifiers and types are intentionally
# identical to DLPack
device.device_type = <DLDeviceType>arrow_device.device_type
device.device_id = arrow_device.device_id
return device
cdef bint dlpack_strides_are_contiguous(DLTensor* dl_tensor):
if dl_tensor.strides == NULL:
return True
if dl_tensor.ndim != 1:
raise NotImplementedError("Contiguous stride check not implemented for ndim != 1")
# DLTensor strides are in elemements, not bytes
return dl_tensor.strides[0] == 1
cdef class CBufferView:
"""Wrapper for Array buffer content
This object is a Python wrapper around a buffer held by an Array.
It implements the Python buffer protocol and is best accessed through
another implementor (e.g., `np.array(array_view.buffers[1])`)). Note that
this buffer content does not apply any parent offset.
"""
def __cinit__(self, object base, uintptr_t addr, int64_t size_bytes,
ArrowType data_type,
Py_ssize_t element_size_bits, CSharedSyncEvent event):
self._base = base
self._ptr.data.data = <void*>addr
self._ptr.size_bytes = size_bytes
self._data_type = data_type
self._event = event
self._format[0] = 0
self._element_size_bits = _types.to_format(
self._data_type,
element_size_bits,
sizeof(self._format),
self._format
)
self._strides = self._item_size()
self._shape = self._ptr.size_bytes // self._strides
if _types.equal(self._data_type, _types.BOOL):
self._n_elements = self._shape * 8
else:
self._n_elements = self._shape
def _addr(self):
return <uintptr_t>self._ptr.data.data
@property
def device(self):
return self._event.device
@property
def element_size_bits(self):
return self._element_size_bits
@property
def size_bytes(self):
return self._ptr.size_bytes
@property
def data_type_id(self):
return self._data_type
@property
def data_type(self):
return ArrowTypeString(self._data_type).decode("UTF-8")
@property
def format(self):
return self._format.decode("UTF-8")
@property
def itemsize(self):
return self._strides
def __len__(self):
return self._shape
def __getitem__(self, int64_t i):
if i < 0 or i >= self._shape:
raise IndexError(f"Index {i} out of range")
cdef int64_t offset = self._strides * i
value = unpack_from(self.format, buffer=self, offset=offset)
if len(value) == 1:
return value[0]
else:
return value
def __iter__(self):
return self._iter_dispatch(0, len(self))
def _iter_dispatch(self, int64_t offset, int64_t length):
if offset < 0 or length < 0 or (offset + length) > len(self):
raise IndexError(
f"offset {offset} and length {length} do not describe a valid slice "
f"of buffer with length {len(self)}"
)
# memoryview's implementation is very fast but not always possible (half float, fixed-size binary, interval)
if _types.one_of(
self._data_type,
(
_types.HALF_FLOAT,
_types.INTERVAL_DAY_TIME,
_types.INTERVAL_MONTH_DAY_NANO,
_types.DECIMAL128,
_types.DECIMAL256
)
) or (
_types.equal(self._data_type, _types.BINARY) and self._element_size_bits != 0
):
Loading ...