# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from pyarrow.lib cimport *
from pyarrow.includes.libarrow_cuda cimport *
from pyarrow.lib import allocate_buffer, as_buffer, ArrowTypeError
from pyarrow.util import get_contiguous_span
cimport cpython as cp
cdef class Context(_Weakrefable):
"""
CUDA driver context.
"""
def __init__(self, *args, **kwargs):
"""
Create a CUDA driver context for a particular device.
If a CUDA context handle is passed, it is wrapped, otherwise
a default CUDA context for the given device is requested.
Parameters
----------
device_number : int (default 0)
Specify the GPU device for which the CUDA driver context is
requested.
handle : int, optional
Specify CUDA handle for a shared context that has been created
by another library.
"""
# This method exposed because autodoc doesn't pick __cinit__
def __cinit__(self, int device_number=0, uintptr_t handle=0):
cdef CCudaDeviceManager* manager
manager = GetResultValue(CCudaDeviceManager.Instance())
cdef int n = manager.num_devices()
if device_number >= n or device_number < 0:
self.context.reset()
raise ValueError('device_number argument must be '
'non-negative less than %s' % (n))
if handle == 0:
self.context = GetResultValue(manager.GetContext(device_number))
else:
self.context = GetResultValue(manager.GetSharedContext(
device_number, <void*>handle))
self.device_number = device_number
@staticmethod
def from_numba(context=None):
"""
Create a Context instance from a Numba CUDA context.
Parameters
----------
context : {numba.cuda.cudadrv.driver.Context, None}
A Numba CUDA context instance.
If None, the current Numba context is used.
Returns
-------
shared_context : pyarrow.cuda.Context
Context instance.
"""
if context is None:
import numba.cuda
context = numba.cuda.current_context()
return Context(device_number=context.device.id,
handle=context.handle.value)
def to_numba(self):
"""
Convert Context to a Numba CUDA context.
Returns
-------
context : numba.cuda.cudadrv.driver.Context
Numba CUDA context instance.
"""
import ctypes
import numba.cuda
device = numba.cuda.gpus[self.device_number]
handle = ctypes.c_void_p(self.handle)
context = numba.cuda.cudadrv.driver.Context(device, handle)
class DummyPendingDeallocs(object):
# Context is managed by pyarrow
def add_item(self, *args, **kwargs):
pass
context.deallocations = DummyPendingDeallocs()
return context
@staticmethod
def get_num_devices():
""" Return the number of GPU devices.
"""
cdef CCudaDeviceManager* manager
manager = GetResultValue(CCudaDeviceManager.Instance())
return manager.num_devices()
@property
def device_number(self):
""" Return context device number.
"""
return self.device_number
@property
def handle(self):
""" Return pointer to context handle.
"""
return <uintptr_t>self.context.get().handle()
cdef void init(self, const shared_ptr[CCudaContext]& ctx):
self.context = ctx
def synchronize(self):
"""Blocks until the device has completed all preceding requested
tasks.
"""
check_status(self.context.get().Synchronize())
@property
def bytes_allocated(self):
"""Return the number of allocated bytes.
"""
return self.context.get().bytes_allocated()
def get_device_address(self, uintptr_t address):
"""Return the device address that is reachable from kernels running in
the context
Parameters
----------
address : int
Specify memory address value
Returns
-------
device_address : int
Device address accessible from device context
Notes
-----
The device address is defined as a memory address accessible
by device. While it is often a device memory address but it
can be also a host memory address, for instance, when the
memory is allocated as host memory (using cudaMallocHost or
cudaHostAlloc) or as managed memory (using cudaMallocManaged)
or the host memory is page-locked (using cudaHostRegister).
"""
return GetResultValue(self.context.get().GetDeviceAddress(address))
def new_buffer(self, int64_t nbytes):
"""Return new device buffer.
Parameters
----------
nbytes : int
Specify the number of bytes to be allocated.
Returns
-------
buf : CudaBuffer
Allocated buffer.
"""
cdef:
shared_ptr[CCudaBuffer] cudabuf
with nogil:
cudabuf = GetResultValue(self.context.get().Allocate(nbytes))
return pyarrow_wrap_cudabuffer(cudabuf)
def foreign_buffer(self, address, size, base=None):
"""
Create device buffer from address and size as a view.
The caller is responsible for allocating and freeing the
memory. When `address==size==0` then a new zero-sized buffer
is returned.
Parameters
----------
address : int
Specify the starting address of the buffer. The address can
refer to both device or host memory but it must be
accessible from device after mapping it with
`get_device_address` method.
size : int
Specify the size of device buffer in bytes.
base : {None, object}
Specify object that owns the referenced memory.
Returns
-------
cbuf : CudaBuffer
Device buffer as a view of device reachable memory.
"""
if not address and size == 0:
return self.new_buffer(0)
cdef:
uintptr_t c_addr = self.get_device_address(address)
int64_t c_size = size
shared_ptr[CCudaBuffer] cudabuf
cudabuf = GetResultValue(self.context.get().View(
<uint8_t*>c_addr, c_size))
return pyarrow_wrap_cudabuffer_base(cudabuf, base)
def open_ipc_buffer(self, ipc_handle):
""" Open existing CUDA IPC memory handle
Parameters
----------
ipc_handle : IpcMemHandle
Specify opaque pointer to CUipcMemHandle (driver API).
Returns
-------
buf : CudaBuffer
referencing device buffer
"""
handle = pyarrow_unwrap_cudaipcmemhandle(ipc_handle)
cdef shared_ptr[CCudaBuffer] cudabuf
with nogil:
cudabuf = GetResultValue(
self.context.get().OpenIpcBuffer(handle.get()[0]))
return pyarrow_wrap_cudabuffer(cudabuf)
def buffer_from_data(self, object data, int64_t offset=0, int64_t size=-1):
"""Create device buffer and initialize with data.
Parameters
----------
data : {CudaBuffer, HostBuffer, Buffer, array-like}
Specify data to be copied to device buffer.
offset : int
Specify the offset of input buffer for device data
buffering. Default: 0.
size : int
Specify the size of device buffer in bytes. Default: all
(starting from input offset)
Returns
-------
cbuf : CudaBuffer
Device buffer with copied data.
"""
is_host_data = not pyarrow_is_cudabuffer(data)
buf = as_buffer(data) if is_host_data else data
bsize = buf.size
if offset < 0 or (bsize and offset >= bsize):
raise ValueError('offset argument is out-of-range')
if size < 0:
size = bsize - offset
elif offset + size > bsize:
raise ValueError(
'requested larger slice than available in device buffer')
if offset != 0 or size != bsize:
buf = buf.slice(offset, size)
result = self.new_buffer(size)
if is_host_data:
result.copy_from_host(buf, position=0, nbytes=size)
else:
result.copy_from_device(buf, position=0, nbytes=size)
return result
def buffer_from_object(self, obj):
"""Create device buffer view of arbitrary object that references
device accessible memory.
When the object contains a non-contiguous view of device
accessible memory then the returned device buffer will contain
contiguous view of the memory, that is, including the
intermediate data that is otherwise invisible to the input
object.
Parameters
----------
obj : {object, Buffer, HostBuffer, CudaBuffer, ...}
Specify an object that holds (device or host) address that
can be accessed from device. This includes objects with
types defined in pyarrow.cuda as well as arbitrary objects
that implement the CUDA array interface as defined by numba.
Returns
-------
cbuf : CudaBuffer
Device buffer as a view of device accessible memory.
"""
if isinstance(obj, HostBuffer):
return self.foreign_buffer(obj.address, obj.size, base=obj)
elif isinstance(obj, Buffer):
return CudaBuffer.from_buffer(obj)
elif isinstance(obj, CudaBuffer):
return obj
elif hasattr(obj, '__cuda_array_interface__'):
desc = obj.__cuda_array_interface__
addr = desc['data'][0]
if addr is None:
return self.new_buffer(0)
import numpy as np
start, end = get_contiguous_span(
desc['shape'], desc.get('strides'),
np.dtype(desc['typestr']).itemsize)
return self.foreign_buffer(addr + start, end - start, base=obj)
raise ArrowTypeError('cannot create device buffer view from'
' `%s` object' % (type(obj)))
cdef class IpcMemHandle(_Weakrefable):
"""A serializable container for a CUDA IPC handle.
"""
cdef void init(self, shared_ptr[CCudaIpcMemHandle]& h):
self.handle = h
@staticmethod
def from_buffer(Buffer opaque_handle):
"""Create IpcMemHandle from opaque buffer (e.g. from another
process)
Parameters
----------
opaque_handle :
a CUipcMemHandle as a const void*
Loading ...