Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev70 

/ tests / parquet / test_pandas.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import io
import json

try:
    import numpy as np
except ImportError:
    np = None
import pytest

import pyarrow as pa
from pyarrow.fs import LocalFileSystem, SubTreeFileSystem
from pyarrow.util import guid
from pyarrow.vendored.version import Version

try:
    import pyarrow.parquet as pq
    from pyarrow.tests.parquet.common import (_read_table, _test_dataframe,
                                              _write_table)
except ImportError:
    pq = None


try:
    import pandas as pd
    import pandas.testing as tm

    from pyarrow.tests.parquet.common import (_roundtrip_pandas_dataframe,
                                              alltypes_sample)
except ImportError:
    pd = tm = None


# Marks all of the tests in this module
# Ignore these with pytest ... -m 'not parquet'
pytestmark = pytest.mark.parquet


@pytest.mark.pandas
def test_pandas_parquet_custom_metadata(tempdir):
    df = alltypes_sample(size=10000)

    filename = tempdir / 'pandas_roundtrip.parquet'
    arrow_table = pa.Table.from_pandas(df)
    assert b'pandas' in arrow_table.schema.metadata

    _write_table(arrow_table, filename)

    metadata = pq.read_metadata(filename).metadata
    assert b'pandas' in metadata

    js = json.loads(metadata[b'pandas'].decode('utf8'))
    assert js['index_columns'] == [{'kind': 'range',
                                    'name': None,
                                    'start': 0, 'stop': 10000,
                                    'step': 1}]


@pytest.mark.pandas
def test_merging_parquet_tables_with_different_pandas_metadata(tempdir):
    # ARROW-3728: Merging Parquet Files - Pandas Meta in Schema Mismatch
    schema = pa.schema([
        pa.field('int', pa.int16()),
        pa.field('float', pa.float32()),
        pa.field('string', pa.string())
    ])
    df1 = pd.DataFrame({
        'int': np.arange(3, dtype=np.uint8),
        'float': np.arange(3, dtype=np.float32),
        'string': ['ABBA', 'EDDA', 'ACDC']
    })
    df2 = pd.DataFrame({
        'int': [4, 5],
        'float': [1.1, None],
        'string': [None, None]
    })
    table1 = pa.Table.from_pandas(df1, schema=schema, preserve_index=False)
    table2 = pa.Table.from_pandas(df2, schema=schema, preserve_index=False)

    assert not table1.schema.equals(table2.schema, check_metadata=True)
    assert table1.schema.equals(table2.schema)

    writer = pq.ParquetWriter(tempdir / 'merged.parquet', schema=schema)
    writer.write_table(table1)
    writer.write_table(table2)


@pytest.mark.pandas
def test_pandas_parquet_column_multiindex(tempdir):
    df = alltypes_sample(size=10)
    df.columns = pd.MultiIndex.from_tuples(
        list(zip(df.columns, df.columns[::-1])),
        names=['level_1', 'level_2']
    )

    filename = tempdir / 'pandas_roundtrip.parquet'
    arrow_table = pa.Table.from_pandas(df)
    assert arrow_table.schema.pandas_metadata is not None

    _write_table(arrow_table, filename)

    table_read = pq.read_pandas(filename)
    df_read = table_read.to_pandas()
    tm.assert_frame_equal(df, df_read)


@pytest.mark.pandas
def test_pandas_parquet_2_0_roundtrip_read_pandas_no_index_written(tempdir):
    df = alltypes_sample(size=10000)

    filename = tempdir / 'pandas_roundtrip.parquet'
    arrow_table = pa.Table.from_pandas(df, preserve_index=False)
    js = arrow_table.schema.pandas_metadata
    assert not js['index_columns']
    # ARROW-2170
    # While index_columns should be empty, columns needs to be filled still.
    assert js['columns']

    _write_table(arrow_table, filename)
    table_read = pq.read_pandas(filename)

    js = table_read.schema.pandas_metadata
    assert not js['index_columns']

    read_metadata = table_read.schema.metadata
    assert arrow_table.schema.metadata == read_metadata

    df_read = table_read.to_pandas()
    tm.assert_frame_equal(df, df_read)


@pytest.mark.pandas
def test_pandas_parquet_native_file_roundtrip():
    df = _test_dataframe(10000)
    arrow_table = pa.Table.from_pandas(df)
    imos = pa.BufferOutputStream()
    _write_table(arrow_table, imos, version='2.6')
    buf = imos.getvalue()
    reader = pa.BufferReader(buf)
    df_read = _read_table(reader).to_pandas()
    tm.assert_frame_equal(df, df_read)


@pytest.mark.pandas
def test_read_pandas_column_subset():
    df = _test_dataframe(10000)
    arrow_table = pa.Table.from_pandas(df)
    imos = pa.BufferOutputStream()
    _write_table(arrow_table, imos, version='2.6')
    buf = imos.getvalue()
    reader = pa.BufferReader(buf)
    df_read = pq.read_pandas(
        reader, columns=['strings', 'uint8'],
    ).to_pandas()
    tm.assert_frame_equal(df[['strings', 'uint8']], df_read)


@pytest.mark.pandas
def test_pandas_parquet_empty_roundtrip():
    df = _test_dataframe(0)
    arrow_table = pa.Table.from_pandas(df)
    imos = pa.BufferOutputStream()
    _write_table(arrow_table, imos, version='2.6')
    buf = imos.getvalue()
    reader = pa.BufferReader(buf)
    df_read = _read_table(reader).to_pandas()
    tm.assert_frame_equal(df, df_read)


@pytest.mark.pandas
def test_pandas_can_write_nested_data():
    data = {
        "agg_col": [
            {"page_type": 1},
            {"record_type": 1},
            {"non_consecutive_home": 0},
        ],
        "uid_first": "1001"
    }
    df = pd.DataFrame(data=data)
    arrow_table = pa.Table.from_pandas(df)
    imos = pa.BufferOutputStream()
    # This succeeds under V2
    _write_table(arrow_table, imos)


@pytest.mark.pandas
def test_pandas_parquet_pyfile_roundtrip(tempdir):
    filename = tempdir / 'pandas_pyfile_roundtrip.parquet'
    size = 5
    df = pd.DataFrame({
        'int64': np.arange(size, dtype=np.int64),
        'float32': np.arange(size, dtype=np.float32),
        'float64': np.arange(size, dtype=np.float64),
        'bool': np.random.randn(size) > 0,
        'strings': ['foo', 'bar', None, 'baz', 'qux']
    })

    arrow_table = pa.Table.from_pandas(df)

    with filename.open('wb') as f:
        _write_table(arrow_table, f, version="2.6")

    data = io.BytesIO(filename.read_bytes())

    table_read = _read_table(data)
    df_read = table_read.to_pandas()
    tm.assert_frame_equal(df, df_read)


@pytest.mark.pandas
def test_pandas_parquet_configuration_options(tempdir):
    size = 10000
    np.random.seed(0)
    df = pd.DataFrame({
        'uint8': np.arange(size, dtype=np.uint8),
        'uint16': np.arange(size, dtype=np.uint16),
        'uint32': np.arange(size, dtype=np.uint32),
        'uint64': np.arange(size, dtype=np.uint64),
        'int8': np.arange(size, dtype=np.int16),
        'int16': np.arange(size, dtype=np.int16),
        'int32': np.arange(size, dtype=np.int32),
        'int64': np.arange(size, dtype=np.int64),
        'float32': np.arange(size, dtype=np.float32),
        'float64': np.arange(size, dtype=np.float64),
        'bool': np.random.randn(size) > 0
    })
    filename = tempdir / 'pandas_roundtrip.parquet'
    arrow_table = pa.Table.from_pandas(df)

    for use_dictionary in [True, False]:
        _write_table(arrow_table, filename, version='2.6',
                     use_dictionary=use_dictionary)
        table_read = _read_table(filename)
        df_read = table_read.to_pandas()
        tm.assert_frame_equal(df, df_read)

    for write_statistics in [True, False]:
        _write_table(arrow_table, filename, version='2.6',
                     write_statistics=write_statistics)
        table_read = _read_table(filename)
        df_read = table_read.to_pandas()
        tm.assert_frame_equal(df, df_read)

    for compression in ['NONE', 'SNAPPY', 'GZIP', 'LZ4', 'ZSTD']:
        if (compression != 'NONE' and
                not pa.lib.Codec.is_available(compression)):
            continue
        _write_table(arrow_table, filename, version='2.6',
                     compression=compression)
        table_read = _read_table(filename)
        df_read = table_read.to_pandas()
        tm.assert_frame_equal(df, df_read)


@pytest.mark.pandas
@pytest.mark.filterwarnings("ignore:Parquet format '2.0':FutureWarning")
def test_spark_flavor_preserves_pandas_metadata():
    df = _test_dataframe(size=100)
    df.index = np.arange(0, 10 * len(df), 10)
    df.index.name = 'foo'

    result = _roundtrip_pandas_dataframe(df, {'version': '2.0',
                                              'flavor': 'spark'})
    tm.assert_frame_equal(result, df)


@pytest.mark.pandas
def test_index_column_name_duplicate(tempdir):
    data = {
        'close': {
            pd.Timestamp('2017-06-30 01:31:00'): 154.99958999999998,
            pd.Timestamp('2017-06-30 01:32:00'): 154.99958999999998,
        },
        'time': {
            pd.Timestamp('2017-06-30 01:31:00'): pd.Timestamp(
                '2017-06-30 01:31:00'
            ),
            pd.Timestamp('2017-06-30 01:32:00'): pd.Timestamp(
                '2017-06-30 01:32:00'
            ),
        }
    }
    path = str(tempdir / 'data.parquet')

    # Pandas v2 defaults to [ns], but Arrow defaults to [us] time units
    # so we need to cast the pandas dtype. Pandas v1 will always silently
    # coerce to [ns] due to lack of non-[ns] support.
    dfx = pd.DataFrame(data, dtype='datetime64[us]').set_index('time', drop=False)

    tdfx = pa.Table.from_pandas(dfx)
    _write_table(tdfx, path)
    arrow_table = _read_table(path)
    result_df = arrow_table.to_pandas()
    tm.assert_frame_equal(result_df, dfx)


@pytest.mark.pandas
def test_multiindex_duplicate_values(tempdir):
    num_rows = 3
    numbers = list(range(num_rows))
    index = pd.MultiIndex.from_arrays(
        [['foo', 'foo', 'bar'], numbers],
        names=['foobar', 'some_numbers'],
    )

    df = pd.DataFrame({'numbers': numbers}, index=index)
    table = pa.Table.from_pandas(df)

    filename = tempdir / 'dup_multi_index_levels.parquet'

    _write_table(table, filename)
    result_table = _read_table(filename)
    assert table.equals(result_table)

    result_df = result_table.to_pandas()
    tm.assert_frame_equal(result_df, df)


@pytest.mark.pandas
def test_backwards_compatible_index_naming(datadir):
    expected_string = b"""\
carat        cut  color  clarity  depth  table  price     x     y     z
 0.23      Ideal      E      SI2   61.5   55.0    326  3.95  3.98  2.43
 0.21    Premium      E      SI1   59.8   61.0    326  3.89  3.84  2.31
 0.23       Good      E      VS1   56.9   65.0    327  4.05  4.07  2.31
 0.29    Premium      I      VS2   62.4   58.0    334  4.20  4.23  2.63
 0.31       Good      J      SI2   63.3   58.0    335  4.34  4.35  2.75
 0.24  Very Good      J     VVS2   62.8   57.0    336  3.94  3.96  2.48
Loading ...