Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev70 

/ tests / test_cuda.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""
UNTESTED:
read_message
"""

import sys
import sysconfig

import pytest

import pyarrow as pa
try:
    import numpy as np
except ImportError:
    pytestmark = pytest.mark.numpy


cuda = pytest.importorskip("pyarrow.cuda")

platform = sysconfig.get_platform()
# TODO: enable ppc64 when Arrow C++ supports IPC in ppc64 systems:
has_ipc_support = platform == 'linux-x86_64'  # or 'ppc64' in platform

cuda_ipc = pytest.mark.skipif(
    not has_ipc_support,
    reason='CUDA IPC not supported in platform `%s`' % (platform))

global_context = None  # for flake8
global_context1 = None  # for flake8


def setup_module(module):
    module.global_context = cuda.Context(0)
    module.global_context1 = cuda.Context(cuda.Context.get_num_devices() - 1)


def teardown_module(module):
    del module.global_context


def test_Context():
    assert cuda.Context.get_num_devices() > 0
    assert global_context.device_number == 0
    assert global_context1.device_number == cuda.Context.get_num_devices() - 1

    mm = global_context.memory_manager
    assert not mm.is_cpu
    assert "<pyarrow.MemoryManager device: CudaDevice" in repr(mm)

    dev = global_context.device
    assert dev == mm.device

    assert not dev.is_cpu
    assert dev.device_id == 0
    assert dev.device_type == pa.DeviceAllocationType.CUDA

    with pytest.raises(ValueError,
                       match=("device_number argument must "
                              "be non-negative less than")):
        cuda.Context(cuda.Context.get_num_devices())


@pytest.mark.parametrize("size", [0, 1, 1000])
def test_manage_allocate_free_host(size):
    buf = cuda.new_host_buffer(size)
    arr = np.frombuffer(buf, dtype=np.uint8)
    arr[size//4:3*size//4] = 1
    arr_cp = arr.copy()
    arr2 = np.frombuffer(buf, dtype=np.uint8)
    np.testing.assert_equal(arr2, arr_cp)
    assert buf.size == size


def test_context_allocate_del():
    bytes_allocated = global_context.bytes_allocated
    cudabuf = global_context.new_buffer(128)
    assert global_context.bytes_allocated == bytes_allocated + 128
    del cudabuf
    assert global_context.bytes_allocated == bytes_allocated


def make_random_buffer(size, target='host'):
    """Return a host or device buffer with random data.
    """
    if target == 'host':
        assert size >= 0
        buf = pa.allocate_buffer(size)
        assert buf.size == size
        arr = np.frombuffer(buf, dtype=np.uint8)
        assert arr.size == size
        arr[:] = np.random.randint(low=1, high=255, size=size, dtype=np.uint8)
        assert arr.sum() > 0 or size == 0
        arr_ = np.frombuffer(buf, dtype=np.uint8)
        np.testing.assert_equal(arr, arr_)
        return arr, buf
    elif target == 'device':
        arr, buf = make_random_buffer(size, target='host')
        dbuf = global_context.new_buffer(size)
        assert dbuf.size == size
        dbuf.copy_from_host(buf, position=0, nbytes=size)
        return arr, dbuf
    raise ValueError('invalid target value')


@pytest.mark.parametrize("size", [0, 1, 1000])
def test_context_device_buffer(size):
    # Creating device buffer from host buffer;
    arr, buf = make_random_buffer(size)
    cudabuf = global_context.buffer_from_data(buf)
    assert cudabuf.size == size
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)

    # CudaBuffer does not support buffer protocol
    with pytest.raises(BufferError):
        memoryview(cudabuf)

    # Creating device buffer from array:
    cudabuf = global_context.buffer_from_data(arr)
    assert cudabuf.size == size
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)

    # Creating device buffer from bytes:
    cudabuf = global_context.buffer_from_data(arr.tobytes())
    assert cudabuf.size == size
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)

    # Creating a device buffer from another device buffer, view:
    cudabuf2 = cudabuf.slice(0, cudabuf.size)
    assert cudabuf2.size == size
    arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)

    if size > 1:
        cudabuf2.copy_from_host(arr[size//2:])
        arr3 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
        np.testing.assert_equal(np.concatenate((arr[size//2:], arr[size//2:])),
                                arr3)
        cudabuf2.copy_from_host(arr[:size//2])  # restoring arr

    # Creating a device buffer from another device buffer, copy:
    cudabuf2 = global_context.buffer_from_data(cudabuf)
    assert cudabuf2.size == size
    arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)

    cudabuf2.copy_from_host(arr[size//2:])
    arr3 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr3)

    # Slice of a device buffer
    cudabuf2 = cudabuf.slice(0, cudabuf.size+10)
    assert cudabuf2.size == size
    arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)

    cudabuf2 = cudabuf.slice(size//4, size+10)
    assert cudabuf2.size == size - size//4
    arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[size//4:], arr2)

    # Creating a device buffer from a slice of host buffer
    soffset = size//4
    ssize = 2*size//4
    cudabuf = global_context.buffer_from_data(buf, offset=soffset,
                                              size=ssize)
    assert cudabuf.size == ssize
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)

    cudabuf = global_context.buffer_from_data(buf.slice(offset=soffset,
                                                        length=ssize))
    assert cudabuf.size == ssize
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)

    # Creating a device buffer from a slice of an array
    cudabuf = global_context.buffer_from_data(arr, offset=soffset, size=ssize)
    assert cudabuf.size == ssize
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)

    cudabuf = global_context.buffer_from_data(arr[soffset:soffset+ssize])
    assert cudabuf.size == ssize
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)

    # Creating a device buffer from a slice of bytes
    cudabuf = global_context.buffer_from_data(arr.tobytes(),
                                              offset=soffset,
                                              size=ssize)
    assert cudabuf.size == ssize
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)

    # Creating a device buffer from size
    cudabuf = global_context.new_buffer(size)
    assert cudabuf.size == size

    # Creating device buffer from a slice of another device buffer:
    cudabuf = global_context.buffer_from_data(arr)
    cudabuf2 = cudabuf.slice(soffset, ssize)
    assert cudabuf2.size == ssize
    arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)

    # Creating device buffer from HostBuffer

    buf = cuda.new_host_buffer(size)
    arr_ = np.frombuffer(buf, dtype=np.uint8)
    arr_[:] = arr
    cudabuf = global_context.buffer_from_data(buf)
    assert cudabuf.size == size
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)

    # Creating device buffer from HostBuffer slice

    cudabuf = global_context.buffer_from_data(buf, offset=soffset, size=ssize)
    assert cudabuf.size == ssize
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)

    cudabuf = global_context.buffer_from_data(
        buf.slice(offset=soffset, length=ssize))
    assert cudabuf.size == ssize
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)


@pytest.mark.parametrize("size", [0, 1, 1000])
def test_context_from_object(size):
    ctx = global_context
    arr, cbuf = make_random_buffer(size, target='device')
    dtype = arr.dtype

    # Creating device buffer from a CUDA host buffer
    hbuf = cuda.new_host_buffer(size * arr.dtype.itemsize)
    np.frombuffer(hbuf, dtype=dtype)[:] = arr
    cbuf2 = ctx.buffer_from_object(hbuf)
    assert cbuf2.size == cbuf.size
    arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
    np.testing.assert_equal(arr, arr2)

    # Creating device buffer from a device buffer
    cbuf2 = ctx.buffer_from_object(cbuf2)
    assert cbuf2.size == cbuf.size
    arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
    np.testing.assert_equal(arr, arr2)

    # Trying to create a device buffer from a Buffer
    with pytest.raises(pa.ArrowTypeError,
                       match=('buffer is not backed by a CudaBuffer')):
        ctx.buffer_from_object(pa.py_buffer(b"123"))

    # Trying to create a device buffer from numpy.array
    with pytest.raises(pa.ArrowTypeError,
                       match=("cannot create device buffer view from "
                              ".* \'numpy.ndarray\'")):
        ctx.buffer_from_object(np.array([1, 2, 3]))


def test_foreign_buffer():
    ctx = global_context
    dtype = np.dtype(np.uint8)
    size = 10
    hbuf = cuda.new_host_buffer(size * dtype.itemsize)

    # test host buffer memory reference counting
    rc = sys.getrefcount(hbuf)
    fbuf = ctx.foreign_buffer(hbuf.address, hbuf.size, hbuf)
    assert sys.getrefcount(hbuf) == rc + 1
    del fbuf
    assert sys.getrefcount(hbuf) == rc

    # test postponed deallocation of host buffer memory
    fbuf = ctx.foreign_buffer(hbuf.address, hbuf.size, hbuf)
    del hbuf
    fbuf.copy_to_host()

    # test deallocating the host buffer memory making it inaccessible
    hbuf = cuda.new_host_buffer(size * dtype.itemsize)
    fbuf = ctx.foreign_buffer(hbuf.address, hbuf.size)
    del hbuf
    with pytest.raises(pa.ArrowIOError,
                       match=('Cuda error ')):
        fbuf.copy_to_host()


@pytest.mark.parametrize("size", [0, 1, 1000])
def test_CudaBuffer(size):
    arr, buf = make_random_buffer(size)
    assert arr.tobytes() == buf.to_pybytes()
    cbuf = global_context.buffer_from_data(buf)
    assert cbuf.size == size
    assert not cbuf.is_cpu
    assert arr.tobytes() == cbuf.to_pybytes()
    if size > 0:
        assert cbuf.address > 0

    for i in range(size):
        assert cbuf[i] == arr[i]

    for s in [
            slice(None),
            slice(size//4, size//2),
    ]:
        assert cbuf[s].to_pybytes() == arr[s].tobytes()

    sbuf = cbuf.slice(size//4, size//2)
    assert sbuf.parent == cbuf

    with pytest.raises(TypeError,
                       match="Do not call CudaBuffer's constructor directly"):
        cuda.CudaBuffer()


@pytest.mark.parametrize("size", [0, 1, 1000])
def test_HostBuffer(size):
    arr, buf = make_random_buffer(size)
    assert arr.tobytes() == buf.to_pybytes()
    hbuf = cuda.new_host_buffer(size)
    np.frombuffer(hbuf, dtype=np.uint8)[:] = arr
    assert hbuf.size == size
    assert hbuf.is_cpu
    assert arr.tobytes() == hbuf.to_pybytes()
Loading ...