# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import contextlib
import ctypes
import gc
import pyarrow as pa
try:
from pyarrow.cffi import ffi
except ImportError:
ffi = None
import pytest
try:
import pandas as pd
import pandas.testing as tm
except ImportError:
pd = tm = None
needs_cffi = pytest.mark.skipif(ffi is None,
reason="test needs cffi package installed")
assert_schema_released = pytest.raises(
ValueError, match="Cannot import released ArrowSchema")
assert_array_released = pytest.raises(
ValueError, match="Cannot import released ArrowArray")
assert_stream_released = pytest.raises(
ValueError, match="Cannot import released Arrow Stream")
def PyCapsule_IsValid(capsule, name):
return ctypes.pythonapi.PyCapsule_IsValid(ctypes.py_object(capsule), name) == 1
@contextlib.contextmanager
def registered_extension_type(ext_type):
pa.register_extension_type(ext_type)
try:
yield
finally:
pa.unregister_extension_type(ext_type.extension_name)
class ParamExtType(pa.ExtensionType):
def __init__(self, width):
self._width = width
super().__init__(pa.binary(width),
"pyarrow.tests.test_cffi.ParamExtType")
@property
def width(self):
return self._width
def __arrow_ext_serialize__(self):
return str(self.width).encode()
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
width = int(serialized.decode())
return cls(width)
def make_schema():
return pa.schema([('ints', pa.list_(pa.int32()))],
metadata={b'key1': b'value1'})
def make_extension_schema():
return pa.schema([('ext', ParamExtType(3))],
metadata={b'key1': b'value1'})
def make_extension_storage_schema():
# Should be kept in sync with make_extension_schema
return pa.schema([('ext', ParamExtType(3).storage_type)],
metadata={b'key1': b'value1'})
def make_batch():
return pa.record_batch([[[1], [2, 42]]], make_schema())
def make_extension_batch():
schema = make_extension_schema()
ext_col = schema[0].type.wrap_array(pa.array([b"foo", b"bar"],
type=pa.binary(3)))
return pa.record_batch([ext_col], schema)
def make_batches():
schema = make_schema()
return [
pa.record_batch([[[1], [2, 42]]], schema),
pa.record_batch([[None, [], [5, 6]]], schema),
]
def make_serialized(schema, batches):
with pa.BufferOutputStream() as sink:
with pa.ipc.new_stream(sink, schema) as out:
for batch in batches:
out.write(batch)
return sink.getvalue()
@needs_cffi
def test_export_import_type():
c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
gc.collect() # Make sure no Arrow data dangles in a ref cycle
old_allocated = pa.total_allocated_bytes()
typ = pa.list_(pa.int32())
typ._export_to_c(ptr_schema)
assert pa.total_allocated_bytes() > old_allocated
# Delete and recreate C++ object from exported pointer
del typ
assert pa.total_allocated_bytes() > old_allocated
typ_new = pa.DataType._import_from_c(ptr_schema)
assert typ_new == pa.list_(pa.int32())
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_schema_released:
pa.DataType._import_from_c(ptr_schema)
# Invalid format string
pa.int32()._export_to_c(ptr_schema)
bad_format = ffi.new("char[]", b"zzz")
c_schema.format = bad_format
with pytest.raises(ValueError,
match="Invalid or unsupported format string"):
pa.DataType._import_from_c(ptr_schema)
# Now released
with assert_schema_released:
pa.DataType._import_from_c(ptr_schema)
@needs_cffi
def test_export_import_field():
c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
gc.collect() # Make sure no Arrow data dangles in a ref cycle
old_allocated = pa.total_allocated_bytes()
field = pa.field("test", pa.list_(pa.int32()), nullable=True)
field._export_to_c(ptr_schema)
assert pa.total_allocated_bytes() > old_allocated
# Delete and recreate C++ object from exported pointer
del field
assert pa.total_allocated_bytes() > old_allocated
field_new = pa.Field._import_from_c(ptr_schema)
assert field_new == pa.field("test", pa.list_(pa.int32()), nullable=True)
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_schema_released:
pa.Field._import_from_c(ptr_schema)
def check_export_import_array(array_type, exporter, importer):
c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
c_array = ffi.new(f"struct {array_type}*")
ptr_array = int(ffi.cast("uintptr_t", c_array))
gc.collect() # Make sure no Arrow data dangles in a ref cycle
old_allocated = pa.total_allocated_bytes()
# Type is known up front
typ = pa.list_(pa.int32())
arr = pa.array([[1], [2, 42]], type=typ)
py_value = arr.to_pylist()
exporter(arr, ptr_array)
assert pa.total_allocated_bytes() > old_allocated
# Delete recreate C++ object from exported pointer
del arr
arr_new = importer(ptr_array, typ)
assert arr_new.to_pylist() == py_value
assert arr_new.type == pa.list_(pa.int32())
assert pa.total_allocated_bytes() > old_allocated
del arr_new, typ
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_array_released:
importer(ptr_array, pa.list_(pa.int32()))
# Type is exported and imported at the same time
arr = pa.array([[1], [2, 42]], type=pa.list_(pa.int32()))
py_value = arr.to_pylist()
exporter(arr, ptr_array, ptr_schema)
# Delete and recreate C++ objects from exported pointers
del arr
arr_new = importer(ptr_array, ptr_schema)
assert arr_new.to_pylist() == py_value
assert arr_new.type == pa.list_(pa.int32())
assert pa.total_allocated_bytes() > old_allocated
del arr_new
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_schema_released:
importer(ptr_array, ptr_schema)
@needs_cffi
def test_export_import_array():
check_export_import_array(
"ArrowArray",
pa.Array._export_to_c,
pa.Array._import_from_c,
)
@needs_cffi
def test_export_import_device_array():
check_export_import_array(
"ArrowDeviceArray",
pa.Array._export_to_c_device,
pa.Array._import_from_c_device,
)
# verify exported struct
c_array = ffi.new("struct ArrowDeviceArray*")
ptr_array = int(ffi.cast("uintptr_t", c_array))
arr = pa.array([[1], [2, 42]], type=pa.list_(pa.int32()))
arr._export_to_c_device(ptr_array)
assert c_array.device_type == 1 # ARROW_DEVICE_CPU 1
assert c_array.device_id == -1
assert c_array.array.length == 2
def check_export_import_schema(schema_factory, expected_schema_factory=None):
if expected_schema_factory is None:
expected_schema_factory = schema_factory
c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
gc.collect() # Make sure no Arrow data dangles in a ref cycle
old_allocated = pa.total_allocated_bytes()
schema_factory()._export_to_c(ptr_schema)
assert pa.total_allocated_bytes() > old_allocated
# Delete and recreate C++ object from exported pointer
schema_new = pa.Schema._import_from_c(ptr_schema)
assert schema_new == expected_schema_factory()
assert pa.total_allocated_bytes() == old_allocated
del schema_new
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_schema_released:
pa.Schema._import_from_c(ptr_schema)
# Not a struct type
pa.int32()._export_to_c(ptr_schema)
with pytest.raises(ValueError,
match="ArrowSchema describes non-struct type"):
pa.Schema._import_from_c(ptr_schema)
# Now released
with assert_schema_released:
pa.Schema._import_from_c(ptr_schema)
@needs_cffi
def test_export_import_schema():
check_export_import_schema(make_schema)
@needs_cffi
def test_export_import_schema_with_extension():
# Extension type is unregistered => the storage type is imported
check_export_import_schema(make_extension_schema,
make_extension_storage_schema)
# Extension type is registered => the extension type is imported
with registered_extension_type(ParamExtType(1)):
check_export_import_schema(make_extension_schema)
@needs_cffi
def test_export_import_schema_float_pointer():
# Previous versions of the R Arrow library used to pass pointer
# values as a double.
c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
match = "Passing a pointer value as a float is unsafe"
with pytest.warns(UserWarning, match=match):
make_schema()._export_to_c(float(ptr_schema))
with pytest.warns(UserWarning, match=match):
schema_new = pa.Schema._import_from_c(float(ptr_schema))
assert schema_new == make_schema()
def check_export_import_batch(array_type, exporter, importer, batch_factory):
c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
c_array = ffi.new(f"struct {array_type}*")
ptr_array = int(ffi.cast("uintptr_t", c_array))
gc.collect() # Make sure no Arrow data dangles in a ref cycle
old_allocated = pa.total_allocated_bytes()
# Schema is known up front
batch = batch_factory()
schema = batch.schema
py_value = batch.to_pydict()
exporter(batch, ptr_array)
assert pa.total_allocated_bytes() > old_allocated
# Delete and recreate C++ object from exported pointer
del batch
batch_new = importer(ptr_array, schema)
assert batch_new.to_pydict() == py_value
assert batch_new.schema == schema
assert pa.total_allocated_bytes() > old_allocated
del batch_new, schema
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_array_released:
importer(ptr_array, make_schema())
Loading ...