# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
UNTESTED:
read_message
"""
import sys
import sysconfig
import pytest
import pyarrow as pa
try:
import numpy as np
except ImportError:
pytestmark = pytest.mark.numpy
cuda = pytest.importorskip("pyarrow.cuda")
platform = sysconfig.get_platform()
# TODO: enable ppc64 when Arrow C++ supports IPC in ppc64 systems:
has_ipc_support = platform == 'linux-x86_64' # or 'ppc64' in platform
cuda_ipc = pytest.mark.skipif(
not has_ipc_support,
reason='CUDA IPC not supported in platform `%s`' % (platform))
global_context = None # for flake8
global_context1 = None # for flake8
def setup_module(module):
module.global_context = cuda.Context(0)
module.global_context1 = cuda.Context(cuda.Context.get_num_devices() - 1)
def teardown_module(module):
del module.global_context
def test_Context():
assert cuda.Context.get_num_devices() > 0
assert global_context.device_number == 0
assert global_context1.device_number == cuda.Context.get_num_devices() - 1
mm = global_context.memory_manager
assert not mm.is_cpu
assert "<pyarrow.MemoryManager device: CudaDevice" in repr(mm)
dev = global_context.device
assert dev == mm.device
assert not dev.is_cpu
assert dev.device_id == 0
assert dev.device_type == pa.DeviceAllocationType.CUDA
with pytest.raises(ValueError,
match=("device_number argument must "
"be non-negative less than")):
cuda.Context(cuda.Context.get_num_devices())
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_manage_allocate_free_host(size):
buf = cuda.new_host_buffer(size)
arr = np.frombuffer(buf, dtype=np.uint8)
arr[size//4:3*size//4] = 1
arr_cp = arr.copy()
arr2 = np.frombuffer(buf, dtype=np.uint8)
np.testing.assert_equal(arr2, arr_cp)
assert buf.size == size
def test_context_allocate_del():
bytes_allocated = global_context.bytes_allocated
cudabuf = global_context.new_buffer(128)
assert global_context.bytes_allocated == bytes_allocated + 128
del cudabuf
assert global_context.bytes_allocated == bytes_allocated
def make_random_buffer(size, target='host'):
"""Return a host or device buffer with random data.
"""
if target == 'host':
assert size >= 0
buf = pa.allocate_buffer(size)
assert buf.size == size
arr = np.frombuffer(buf, dtype=np.uint8)
assert arr.size == size
arr[:] = np.random.randint(low=1, high=255, size=size, dtype=np.uint8)
assert arr.sum() > 0 or size == 0
arr_ = np.frombuffer(buf, dtype=np.uint8)
np.testing.assert_equal(arr, arr_)
return arr, buf
elif target == 'device':
arr, buf = make_random_buffer(size, target='host')
dbuf = global_context.new_buffer(size)
assert dbuf.size == size
dbuf.copy_from_host(buf, position=0, nbytes=size)
return arr, dbuf
raise ValueError('invalid target value')
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_context_device_buffer(size):
# Creating device buffer from host buffer;
arr, buf = make_random_buffer(size)
cudabuf = global_context.buffer_from_data(buf)
assert cudabuf.size == size
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr, arr2)
# CudaBuffer does not support buffer protocol
with pytest.raises(BufferError):
memoryview(cudabuf)
# Creating device buffer from array:
cudabuf = global_context.buffer_from_data(arr)
assert cudabuf.size == size
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr, arr2)
# Creating device buffer from bytes:
cudabuf = global_context.buffer_from_data(arr.tobytes())
assert cudabuf.size == size
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr, arr2)
# Creating a device buffer from another device buffer, view:
cudabuf2 = cudabuf.slice(0, cudabuf.size)
assert cudabuf2.size == size
arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr, arr2)
if size > 1:
cudabuf2.copy_from_host(arr[size//2:])
arr3 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(np.concatenate((arr[size//2:], arr[size//2:])),
arr3)
cudabuf2.copy_from_host(arr[:size//2]) # restoring arr
# Creating a device buffer from another device buffer, copy:
cudabuf2 = global_context.buffer_from_data(cudabuf)
assert cudabuf2.size == size
arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr, arr2)
cudabuf2.copy_from_host(arr[size//2:])
arr3 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr, arr3)
# Slice of a device buffer
cudabuf2 = cudabuf.slice(0, cudabuf.size+10)
assert cudabuf2.size == size
arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr, arr2)
cudabuf2 = cudabuf.slice(size//4, size+10)
assert cudabuf2.size == size - size//4
arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr[size//4:], arr2)
# Creating a device buffer from a slice of host buffer
soffset = size//4
ssize = 2*size//4
cudabuf = global_context.buffer_from_data(buf, offset=soffset,
size=ssize)
assert cudabuf.size == ssize
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)
cudabuf = global_context.buffer_from_data(buf.slice(offset=soffset,
length=ssize))
assert cudabuf.size == ssize
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)
# Creating a device buffer from a slice of an array
cudabuf = global_context.buffer_from_data(arr, offset=soffset, size=ssize)
assert cudabuf.size == ssize
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)
cudabuf = global_context.buffer_from_data(arr[soffset:soffset+ssize])
assert cudabuf.size == ssize
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)
# Creating a device buffer from a slice of bytes
cudabuf = global_context.buffer_from_data(arr.tobytes(),
offset=soffset,
size=ssize)
assert cudabuf.size == ssize
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)
# Creating a device buffer from size
cudabuf = global_context.new_buffer(size)
assert cudabuf.size == size
# Creating device buffer from a slice of another device buffer:
cudabuf = global_context.buffer_from_data(arr)
cudabuf2 = cudabuf.slice(soffset, ssize)
assert cudabuf2.size == ssize
arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)
# Creating device buffer from HostBuffer
buf = cuda.new_host_buffer(size)
arr_ = np.frombuffer(buf, dtype=np.uint8)
arr_[:] = arr
cudabuf = global_context.buffer_from_data(buf)
assert cudabuf.size == size
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr, arr2)
# Creating device buffer from HostBuffer slice
cudabuf = global_context.buffer_from_data(buf, offset=soffset, size=ssize)
assert cudabuf.size == ssize
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)
cudabuf = global_context.buffer_from_data(
buf.slice(offset=soffset, length=ssize))
assert cudabuf.size == ssize
arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_context_from_object(size):
ctx = global_context
arr, cbuf = make_random_buffer(size, target='device')
dtype = arr.dtype
# Creating device buffer from a CUDA host buffer
hbuf = cuda.new_host_buffer(size * arr.dtype.itemsize)
np.frombuffer(hbuf, dtype=dtype)[:] = arr
cbuf2 = ctx.buffer_from_object(hbuf)
assert cbuf2.size == cbuf.size
arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
np.testing.assert_equal(arr, arr2)
# Creating device buffer from a device buffer
cbuf2 = ctx.buffer_from_object(cbuf2)
assert cbuf2.size == cbuf.size
arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
np.testing.assert_equal(arr, arr2)
# Trying to create a device buffer from a Buffer
with pytest.raises(pa.ArrowTypeError,
match=('buffer is not backed by a CudaBuffer')):
ctx.buffer_from_object(pa.py_buffer(b"123"))
# Trying to create a device buffer from numpy.array
with pytest.raises(pa.ArrowTypeError,
match=("cannot create device buffer view from "
".* \'numpy.ndarray\'")):
ctx.buffer_from_object(np.array([1, 2, 3]))
def test_foreign_buffer():
ctx = global_context
dtype = np.dtype(np.uint8)
size = 10
hbuf = cuda.new_host_buffer(size * dtype.itemsize)
# test host buffer memory reference counting
rc = sys.getrefcount(hbuf)
fbuf = ctx.foreign_buffer(hbuf.address, hbuf.size, hbuf)
assert sys.getrefcount(hbuf) == rc + 1
del fbuf
assert sys.getrefcount(hbuf) == rc
# test postponed deallocation of host buffer memory
fbuf = ctx.foreign_buffer(hbuf.address, hbuf.size, hbuf)
del hbuf
fbuf.copy_to_host()
# test deallocating the host buffer memory making it inaccessible
hbuf = cuda.new_host_buffer(size * dtype.itemsize)
fbuf = ctx.foreign_buffer(hbuf.address, hbuf.size)
del hbuf
with pytest.raises(pa.ArrowIOError,
match=('Cuda error ')):
fbuf.copy_to_host()
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_CudaBuffer(size):
arr, buf = make_random_buffer(size)
assert arr.tobytes() == buf.to_pybytes()
cbuf = global_context.buffer_from_data(buf)
assert cbuf.size == size
assert not cbuf.is_cpu
assert arr.tobytes() == cbuf.to_pybytes()
if size > 0:
assert cbuf.address > 0
for i in range(size):
assert cbuf[i] == arr[i]
for s in [
slice(None),
slice(size//4, size//2),
]:
assert cbuf[s].to_pybytes() == arr[s].tobytes()
sbuf = cbuf.slice(size//4, size//2)
assert sbuf.parent == cbuf
with pytest.raises(TypeError,
match="Do not call CudaBuffer's constructor directly"):
cuda.CudaBuffer()
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_HostBuffer(size):
arr, buf = make_random_buffer(size)
assert arr.tobytes() == buf.to_pybytes()
hbuf = cuda.new_host_buffer(size)
np.frombuffer(hbuf, dtype=np.uint8)[:] = arr
assert hbuf.size == size
assert hbuf.is_cpu
assert arr.tobytes() == hbuf.to_pybytes()
Loading ...