Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev70 

/ tests / test_feather.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import io
import os
import sys
import tempfile
import pytest
import hypothesis as h
import hypothesis.strategies as st

try:
    import numpy as np
except ImportError:
    np = None

import pyarrow as pa
import pyarrow.tests.strategies as past
from pyarrow.feather import (read_feather, write_feather, read_table,
                             FeatherDataset)

try:
    from pandas.testing import assert_frame_equal
    import pandas as pd
    import pyarrow.pandas_compat
except ImportError:
    pass


@pytest.fixture(scope='module')
def datadir(base_datadir):
    return base_datadir / 'feather'


def random_path(prefix='feather_'):
    return tempfile.mktemp(prefix=prefix)


@pytest.fixture(scope="module", params=[1, 2])
def version(request):
    yield request.param


@pytest.fixture(scope="module", params=[None, "uncompressed", "lz4", "zstd"])
def compression(request):
    if request.param in ['lz4', 'zstd'] and not pa.Codec.is_available(
            request.param):
        pytest.skip(f'{request.param} is not available')
    yield request.param


TEST_FILES = None


def setup_module(module):
    global TEST_FILES
    TEST_FILES = []


def teardown_module(module):
    for path in TEST_FILES:
        try:
            os.remove(path)
        except os.error:
            pass


@pytest.mark.pandas
def test_file_not_exist():
    with pytest.raises(pa.ArrowIOError):
        read_feather('test_invalid_file')


def _check_pandas_roundtrip(df, expected=None, path=None,
                            columns=None, use_threads=False,
                            version=None, compression=None,
                            compression_level=None):
    if path is None:
        path = random_path()

    if version is None:
        version = 2

    TEST_FILES.append(path)
    write_feather(df, path, compression=compression,
                  compression_level=compression_level, version=version)

    if not os.path.exists(path):
        raise Exception('file not written')

    result = read_feather(path, columns, use_threads=use_threads)

    if expected is None:
        expected = df

    assert_frame_equal(result, expected)


def _check_arrow_roundtrip(table, path=None, compression=None):
    if path is None:
        path = random_path()

    TEST_FILES.append(path)
    write_feather(table, path, compression=compression)
    if not os.path.exists(path):
        raise Exception('file not written')

    result = read_table(path)
    assert result.equals(table)


def _assert_error_on_write(df, exc, path=None, version=2):
    # check that we are raising the exception
    # on writing

    if path is None:
        path = random_path()

    TEST_FILES.append(path)

    def f():
        write_feather(df, path, version=version)

    pytest.raises(exc, f)


@pytest.mark.numpy
def test_dataset(version):
    num_values = (100, 100)
    num_files = 5
    paths = [random_path() for i in range(num_files)]
    data = {
        "col_" + str(i): np.random.randn(num_values[0])
        for i in range(num_values[1])
    }
    table = pa.table(data)

    TEST_FILES.extend(paths)
    for index, path in enumerate(paths):
        rows = (
            index * (num_values[0] // num_files),
            (index + 1) * (num_values[0] // num_files),
        )

        write_feather(table[rows[0]: rows[1]], path, version=version)

    data = FeatherDataset(paths).read_table()
    assert data.equals(table)


@pytest.mark.pandas
def test_float_no_nulls(version):
    data = {}
    numpy_dtypes = ['f4', 'f8']
    num_values = 100

    for dtype in numpy_dtypes:
        values = np.random.randn(num_values)
        data[dtype] = values.astype(dtype)

    df = pd.DataFrame(data)
    _check_pandas_roundtrip(df, version=version)


@pytest.mark.pandas
def test_read_table(version):
    num_values = (100, 100)
    path = random_path()

    TEST_FILES.append(path)

    values = np.random.randint(0, 100, size=num_values)
    columns = ['col_' + str(i) for i in range(100)]
    table = pa.Table.from_arrays(values, columns)

    write_feather(table, path, version=version)

    result = read_table(path)
    assert result.equals(table)

    # Test without memory mapping
    result = read_table(path, memory_map=False)
    assert result.equals(table)

    result = read_feather(path, memory_map=False)
    assert_frame_equal(table.to_pandas(), result)


@pytest.mark.pandas
def test_use_threads(version):
    # ARROW-14470
    num_values = (10, 10)
    path = random_path()

    TEST_FILES.append(path)

    values = np.random.randint(0, 10, size=num_values)
    columns = ['col_' + str(i) for i in range(10)]
    table = pa.Table.from_arrays(values, columns)

    write_feather(table, path, version=version)

    result = read_feather(path)
    assert_frame_equal(table.to_pandas(), result)

    # Test read_feather with use_threads=False
    result = read_feather(path, use_threads=False)
    assert_frame_equal(table.to_pandas(), result)

    # Test read_table with use_threads=False
    result = read_table(path, use_threads=False)
    assert result.equals(table)


@pytest.mark.pandas
def test_float_nulls(version):
    num_values = 100

    path = random_path()
    TEST_FILES.append(path)

    null_mask = np.random.randint(0, 10, size=num_values) < 3
    dtypes = ['f4', 'f8']
    expected_cols = []

    arrays = []
    for name in dtypes:
        values = np.random.randn(num_values).astype(name)
        arrays.append(pa.array(values, mask=null_mask))

        values[null_mask] = np.nan

        expected_cols.append(values)

    table = pa.table(arrays, names=dtypes)
    _check_arrow_roundtrip(table)

    df = table.to_pandas()
    _check_pandas_roundtrip(df, version=version)


@pytest.mark.pandas
def test_integer_no_nulls(version):
    data, arr = {}, []

    numpy_dtypes = ['i1', 'i2', 'i4', 'i8',
                    'u1', 'u2', 'u4', 'u8']
    num_values = 100

    for dtype in numpy_dtypes:
        values = np.random.randint(0, 100, size=num_values)
        data[dtype] = values.astype(dtype)
        arr.append(values.astype(dtype))

    df = pd.DataFrame(data)
    _check_pandas_roundtrip(df, version=version)

    table = pa.table(arr, names=numpy_dtypes)
    _check_arrow_roundtrip(table)


@pytest.mark.pandas
def test_platform_numpy_integers(version):
    data = {}

    numpy_dtypes = ['longlong']
    num_values = 100

    for dtype in numpy_dtypes:
        values = np.random.randint(0, 100, size=num_values)
        data[dtype] = values.astype(dtype)

    df = pd.DataFrame(data)
    _check_pandas_roundtrip(df, version=version)


@pytest.mark.pandas
def test_integer_with_nulls(version):
    # pandas requires upcast to float dtype
    path = random_path()
    TEST_FILES.append(path)

    int_dtypes = ['i1', 'i2', 'i4', 'i8', 'u1', 'u2', 'u4', 'u8']
    num_values = 100

    arrays = []
    null_mask = np.random.randint(0, 10, size=num_values) < 3
    expected_cols = []
    for name in int_dtypes:
        values = np.random.randint(0, 100, size=num_values)
        arrays.append(pa.array(values, mask=null_mask))

        expected = values.astype('f8')
        expected[null_mask] = np.nan

        expected_cols.append(expected)

    table = pa.table(arrays, names=int_dtypes)
    _check_arrow_roundtrip(table)

    df = table.to_pandas()
    _check_pandas_roundtrip(df, version=version)


@pytest.mark.pandas
def test_boolean_no_nulls(version):
    num_values = 100

    np.random.seed(0)

    df = pd.DataFrame({'bools': np.random.randn(num_values) > 0})
    _check_pandas_roundtrip(df, version=version)


@pytest.mark.pandas
def test_boolean_nulls(version):
    # pandas requires upcast to object dtype
    path = random_path()
    TEST_FILES.append(path)

    num_values = 100
    np.random.seed(0)

    mask = np.random.randint(0, 10, size=num_values) < 3
    values = np.random.randint(0, 10, size=num_values) < 5

    table = pa.table([pa.array(values, mask=mask)], names=['bools'])
    _check_arrow_roundtrip(table)

    df = table.to_pandas()
    _check_pandas_roundtrip(df, version=version)
Loading ...