# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import io
import os
import sys
import tempfile
import pytest
import hypothesis as h
import hypothesis.strategies as st
try:
import numpy as np
except ImportError:
np = None
import pyarrow as pa
import pyarrow.tests.strategies as past
from pyarrow.feather import (read_feather, write_feather, read_table,
FeatherDataset)
try:
from pandas.testing import assert_frame_equal
import pandas as pd
import pyarrow.pandas_compat
except ImportError:
pass
@pytest.fixture(scope='module')
def datadir(base_datadir):
return base_datadir / 'feather'
def random_path(prefix='feather_'):
return tempfile.mktemp(prefix=prefix)
@pytest.fixture(scope="module", params=[1, 2])
def version(request):
yield request.param
@pytest.fixture(scope="module", params=[None, "uncompressed", "lz4", "zstd"])
def compression(request):
if request.param in ['lz4', 'zstd'] and not pa.Codec.is_available(
request.param):
pytest.skip(f'{request.param} is not available')
yield request.param
TEST_FILES = None
def setup_module(module):
global TEST_FILES
TEST_FILES = []
def teardown_module(module):
for path in TEST_FILES:
try:
os.remove(path)
except os.error:
pass
@pytest.mark.pandas
def test_file_not_exist():
with pytest.raises(pa.ArrowIOError):
read_feather('test_invalid_file')
def _check_pandas_roundtrip(df, expected=None, path=None,
columns=None, use_threads=False,
version=None, compression=None,
compression_level=None):
if path is None:
path = random_path()
if version is None:
version = 2
TEST_FILES.append(path)
write_feather(df, path, compression=compression,
compression_level=compression_level, version=version)
if not os.path.exists(path):
raise Exception('file not written')
result = read_feather(path, columns, use_threads=use_threads)
if expected is None:
expected = df
assert_frame_equal(result, expected)
def _check_arrow_roundtrip(table, path=None, compression=None):
if path is None:
path = random_path()
TEST_FILES.append(path)
write_feather(table, path, compression=compression)
if not os.path.exists(path):
raise Exception('file not written')
result = read_table(path)
assert result.equals(table)
def _assert_error_on_write(df, exc, path=None, version=2):
# check that we are raising the exception
# on writing
if path is None:
path = random_path()
TEST_FILES.append(path)
def f():
write_feather(df, path, version=version)
pytest.raises(exc, f)
@pytest.mark.numpy
def test_dataset(version):
num_values = (100, 100)
num_files = 5
paths = [random_path() for i in range(num_files)]
data = {
"col_" + str(i): np.random.randn(num_values[0])
for i in range(num_values[1])
}
table = pa.table(data)
TEST_FILES.extend(paths)
for index, path in enumerate(paths):
rows = (
index * (num_values[0] // num_files),
(index + 1) * (num_values[0] // num_files),
)
write_feather(table[rows[0]: rows[1]], path, version=version)
data = FeatherDataset(paths).read_table()
assert data.equals(table)
@pytest.mark.pandas
def test_float_no_nulls(version):
data = {}
numpy_dtypes = ['f4', 'f8']
num_values = 100
for dtype in numpy_dtypes:
values = np.random.randn(num_values)
data[dtype] = values.astype(dtype)
df = pd.DataFrame(data)
_check_pandas_roundtrip(df, version=version)
@pytest.mark.pandas
def test_read_table(version):
num_values = (100, 100)
path = random_path()
TEST_FILES.append(path)
values = np.random.randint(0, 100, size=num_values)
columns = ['col_' + str(i) for i in range(100)]
table = pa.Table.from_arrays(values, columns)
write_feather(table, path, version=version)
result = read_table(path)
assert result.equals(table)
# Test without memory mapping
result = read_table(path, memory_map=False)
assert result.equals(table)
result = read_feather(path, memory_map=False)
assert_frame_equal(table.to_pandas(), result)
@pytest.mark.pandas
def test_use_threads(version):
# ARROW-14470
num_values = (10, 10)
path = random_path()
TEST_FILES.append(path)
values = np.random.randint(0, 10, size=num_values)
columns = ['col_' + str(i) for i in range(10)]
table = pa.Table.from_arrays(values, columns)
write_feather(table, path, version=version)
result = read_feather(path)
assert_frame_equal(table.to_pandas(), result)
# Test read_feather with use_threads=False
result = read_feather(path, use_threads=False)
assert_frame_equal(table.to_pandas(), result)
# Test read_table with use_threads=False
result = read_table(path, use_threads=False)
assert result.equals(table)
@pytest.mark.pandas
def test_float_nulls(version):
num_values = 100
path = random_path()
TEST_FILES.append(path)
null_mask = np.random.randint(0, 10, size=num_values) < 3
dtypes = ['f4', 'f8']
expected_cols = []
arrays = []
for name in dtypes:
values = np.random.randn(num_values).astype(name)
arrays.append(pa.array(values, mask=null_mask))
values[null_mask] = np.nan
expected_cols.append(values)
table = pa.table(arrays, names=dtypes)
_check_arrow_roundtrip(table)
df = table.to_pandas()
_check_pandas_roundtrip(df, version=version)
@pytest.mark.pandas
def test_integer_no_nulls(version):
data, arr = {}, []
numpy_dtypes = ['i1', 'i2', 'i4', 'i8',
'u1', 'u2', 'u4', 'u8']
num_values = 100
for dtype in numpy_dtypes:
values = np.random.randint(0, 100, size=num_values)
data[dtype] = values.astype(dtype)
arr.append(values.astype(dtype))
df = pd.DataFrame(data)
_check_pandas_roundtrip(df, version=version)
table = pa.table(arr, names=numpy_dtypes)
_check_arrow_roundtrip(table)
@pytest.mark.pandas
def test_platform_numpy_integers(version):
data = {}
numpy_dtypes = ['longlong']
num_values = 100
for dtype in numpy_dtypes:
values = np.random.randint(0, 100, size=num_values)
data[dtype] = values.astype(dtype)
df = pd.DataFrame(data)
_check_pandas_roundtrip(df, version=version)
@pytest.mark.pandas
def test_integer_with_nulls(version):
# pandas requires upcast to float dtype
path = random_path()
TEST_FILES.append(path)
int_dtypes = ['i1', 'i2', 'i4', 'i8', 'u1', 'u2', 'u4', 'u8']
num_values = 100
arrays = []
null_mask = np.random.randint(0, 10, size=num_values) < 3
expected_cols = []
for name in int_dtypes:
values = np.random.randint(0, 100, size=num_values)
arrays.append(pa.array(values, mask=null_mask))
expected = values.astype('f8')
expected[null_mask] = np.nan
expected_cols.append(expected)
table = pa.table(arrays, names=int_dtypes)
_check_arrow_roundtrip(table)
df = table.to_pandas()
_check_pandas_roundtrip(df, version=version)
@pytest.mark.pandas
def test_boolean_no_nulls(version):
num_values = 100
np.random.seed(0)
df = pd.DataFrame({'bools': np.random.randn(num_values) > 0})
_check_pandas_roundtrip(df, version=version)
@pytest.mark.pandas
def test_boolean_nulls(version):
# pandas requires upcast to object dtype
path = random_path()
TEST_FILES.append(path)
num_values = 100
np.random.seed(0)
mask = np.random.randint(0, 10, size=num_values) < 3
values = np.random.randint(0, 10, size=num_values) < 5
table = pa.table([pa.array(values, mask=mask)], names=['bools'])
_check_arrow_roundtrip(table)
df = table.to_pandas()
_check_pandas_roundtrip(df, version=version)
Loading ...