Repository URL to install this package:
|
Version:
2.0.0rc1 ▾
|
import numpy as np
import pandas as pd
import pytest
from ray.air.util.tensor_extensions.arrow import ArrowTensorArray
from ray.air.util.tensor_extensions.pandas import TensorArray
def test_tensor_array_ops():
outer_dim = 3
inner_shape = (2, 2, 2)
shape = (outer_dim,) + inner_shape
num_items = np.prod(np.array(shape))
arr = np.arange(num_items).reshape(shape)
df = pd.DataFrame({"one": [1, 2, 3], "two": TensorArray(arr)})
def apply_arithmetic_ops(arr):
return 2 * (arr + 1) / 3
def apply_comparison_ops(arr):
return arr % 2 == 0
def apply_logical_ops(arr):
return arr & (3 * arr) | (5 * arr)
# Op tests, using NumPy as the groundtruth.
np.testing.assert_equal(apply_arithmetic_ops(arr), apply_arithmetic_ops(df["two"]))
np.testing.assert_equal(apply_comparison_ops(arr), apply_comparison_ops(df["two"]))
np.testing.assert_equal(apply_logical_ops(arr), apply_logical_ops(df["two"]))
def test_tensor_array_array_protocol():
outer_dim = 3
inner_shape = (2, 2, 2)
shape = (outer_dim,) + inner_shape
num_items = np.prod(np.array(shape))
arr = np.arange(num_items).reshape(shape)
t_arr = TensorArray(arr)
np.testing.assert_array_equal(
np.asarray(t_arr, dtype=np.float32), arr.astype(np.float32)
)
t_arr_elem = t_arr[0]
np.testing.assert_array_equal(
np.asarray(t_arr_elem, dtype=np.float32), arr[0].astype(np.float32)
)
def test_tensor_array_dataframe_repr():
outer_dim = 3
inner_shape = (2, 2)
shape = (outer_dim,) + inner_shape
num_items = np.prod(np.array(shape))
arr = np.arange(num_items).reshape(shape)
t_arr = TensorArray(arr)
df = pd.DataFrame({"a": t_arr})
expected_repr = """ a
0 [[ 0, 1], [ 2, 3]]
1 [[ 4, 5], [ 6, 7]]
2 [[ 8, 9], [10, 11]]"""
assert repr(df) == expected_repr
def test_tensor_array_scalar_cast():
outer_dim = 3
inner_shape = (1,)
shape = (outer_dim,) + inner_shape
num_items = np.prod(np.array(shape))
arr = np.arange(num_items).reshape(shape)
t_arr = TensorArray(arr)
for t_arr_elem, arr_elem in zip(t_arr, arr):
assert float(t_arr_elem) == float(arr_elem)
arr = np.arange(1).reshape((1, 1, 1))
t_arr = TensorArray(arr)
assert float(t_arr) == float(arr)
def test_tensor_array_reductions():
outer_dim = 3
inner_shape = (2, 2, 2)
shape = (outer_dim,) + inner_shape
num_items = np.prod(np.array(shape))
arr = np.arange(num_items).reshape(shape)
df = pd.DataFrame({"one": list(range(outer_dim)), "two": TensorArray(arr)})
# Reduction tests, using NumPy as the groundtruth.
for name, reducer in TensorArray.SUPPORTED_REDUCERS.items():
np_kwargs = {}
if name in ("std", "var"):
# Pandas uses a ddof default of 1 while NumPy uses 0.
# Give NumPy a ddof kwarg of 1 in order to ensure equivalent
# standard deviation calculations.
np_kwargs["ddof"] = 1
np.testing.assert_equal(df["two"].agg(name), reducer(arr, axis=0, **np_kwargs))
def test_arrow_tensor_array_getitem():
outer_dim = 3
inner_shape = (2, 2, 2)
shape = (outer_dim,) + inner_shape
num_items = np.prod(np.array(shape))
arr = np.arange(num_items).reshape(shape)
t_arr = ArrowTensorArray.from_numpy(arr)
for idx in range(outer_dim):
np.testing.assert_array_equal(t_arr[idx], arr[idx])
# Test __iter__.
for t_subarr, subarr in zip(t_arr, arr):
np.testing.assert_array_equal(t_subarr, subarr)
# Test to_pylist.
np.testing.assert_array_equal(t_arr.to_pylist(), list(arr))
# Test slicing and indexing.
t_arr2 = t_arr[1:]
np.testing.assert_array_equal(t_arr2.to_numpy(), arr[1:])
for idx in range(1, outer_dim):
np.testing.assert_array_equal(t_arr2[idx - 1], arr[idx])
@pytest.mark.parametrize(
"test_arr,dtype",
[
([[1, 2], [3, 4], [5, 6], [7, 8]], None),
([[1, 2], [3, 4], [5, 6], [7, 8]], np.int32),
([[1, 2], [3, 4], [5, 6], [7, 8]], np.int16),
([[1, 2], [3, 4], [5, 6], [7, 8]], np.longlong),
([[1.5, 2.5], [3.3, 4.2], [5.2, 6.9], [7.6, 8.1]], None),
([[1.5, 2.5], [3.3, 4.2], [5.2, 6.9], [7.6, 8.1]], np.float32),
([[1.5, 2.5], [3.3, 4.2], [5.2, 6.9], [7.6, 8.1]], np.float16),
([[False, True], [True, False], [True, True], [False, False]], None),
],
)
def test_arrow_tensor_array_slice(test_arr, dtype):
# Test that ArrowTensorArray slicing works as expected.
arr = np.array(test_arr, dtype=dtype)
ata = ArrowTensorArray.from_numpy(arr)
np.testing.assert_array_equal(ata.to_numpy(), arr)
slice1 = ata.slice(0, 2)
np.testing.assert_array_equal(slice1.to_numpy(), arr[0:2])
np.testing.assert_array_equal(slice1[1], arr[1])
slice2 = ata.slice(2, 2)
np.testing.assert_array_equal(slice2.to_numpy(), arr[2:4])
np.testing.assert_array_equal(slice2[1], arr[3])
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", "-x", __file__]))