# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import io
import json
try:
import numpy as np
except ImportError:
np = None
import pytest
import pyarrow as pa
from pyarrow.fs import LocalFileSystem, SubTreeFileSystem
from pyarrow.util import guid
from pyarrow.vendored.version import Version
try:
import pyarrow.parquet as pq
from pyarrow.tests.parquet.common import (_read_table, _test_dataframe,
_write_table)
except ImportError:
pq = None
try:
import pandas as pd
import pandas.testing as tm
from pyarrow.tests.parquet.common import (_roundtrip_pandas_dataframe,
alltypes_sample)
except ImportError:
pd = tm = None
# Marks all of the tests in this module
# Ignore these with pytest ... -m 'not parquet'
pytestmark = pytest.mark.parquet
@pytest.mark.pandas
def test_pandas_parquet_custom_metadata(tempdir):
df = alltypes_sample(size=10000)
filename = tempdir / 'pandas_roundtrip.parquet'
arrow_table = pa.Table.from_pandas(df)
assert b'pandas' in arrow_table.schema.metadata
_write_table(arrow_table, filename)
metadata = pq.read_metadata(filename).metadata
assert b'pandas' in metadata
js = json.loads(metadata[b'pandas'].decode('utf8'))
assert js['index_columns'] == [{'kind': 'range',
'name': None,
'start': 0, 'stop': 10000,
'step': 1}]
@pytest.mark.pandas
def test_merging_parquet_tables_with_different_pandas_metadata(tempdir):
# ARROW-3728: Merging Parquet Files - Pandas Meta in Schema Mismatch
schema = pa.schema([
pa.field('int', pa.int16()),
pa.field('float', pa.float32()),
pa.field('string', pa.string())
])
df1 = pd.DataFrame({
'int': np.arange(3, dtype=np.uint8),
'float': np.arange(3, dtype=np.float32),
'string': ['ABBA', 'EDDA', 'ACDC']
})
df2 = pd.DataFrame({
'int': [4, 5],
'float': [1.1, None],
'string': [None, None]
})
table1 = pa.Table.from_pandas(df1, schema=schema, preserve_index=False)
table2 = pa.Table.from_pandas(df2, schema=schema, preserve_index=False)
assert not table1.schema.equals(table2.schema, check_metadata=True)
assert table1.schema.equals(table2.schema)
writer = pq.ParquetWriter(tempdir / 'merged.parquet', schema=schema)
writer.write_table(table1)
writer.write_table(table2)
@pytest.mark.pandas
def test_pandas_parquet_column_multiindex(tempdir):
df = alltypes_sample(size=10)
df.columns = pd.MultiIndex.from_tuples(
list(zip(df.columns, df.columns[::-1])),
names=['level_1', 'level_2']
)
filename = tempdir / 'pandas_roundtrip.parquet'
arrow_table = pa.Table.from_pandas(df)
assert arrow_table.schema.pandas_metadata is not None
_write_table(arrow_table, filename)
table_read = pq.read_pandas(filename)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
def test_pandas_parquet_2_0_roundtrip_read_pandas_no_index_written(tempdir):
df = alltypes_sample(size=10000)
filename = tempdir / 'pandas_roundtrip.parquet'
arrow_table = pa.Table.from_pandas(df, preserve_index=False)
js = arrow_table.schema.pandas_metadata
assert not js['index_columns']
# ARROW-2170
# While index_columns should be empty, columns needs to be filled still.
assert js['columns']
_write_table(arrow_table, filename)
table_read = pq.read_pandas(filename)
js = table_read.schema.pandas_metadata
assert not js['index_columns']
read_metadata = table_read.schema.metadata
assert arrow_table.schema.metadata == read_metadata
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
def test_pandas_parquet_native_file_roundtrip():
df = _test_dataframe(10000)
arrow_table = pa.Table.from_pandas(df)
imos = pa.BufferOutputStream()
_write_table(arrow_table, imos, version='2.6')
buf = imos.getvalue()
reader = pa.BufferReader(buf)
df_read = _read_table(reader).to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
def test_read_pandas_column_subset():
df = _test_dataframe(10000)
arrow_table = pa.Table.from_pandas(df)
imos = pa.BufferOutputStream()
_write_table(arrow_table, imos, version='2.6')
buf = imos.getvalue()
reader = pa.BufferReader(buf)
df_read = pq.read_pandas(
reader, columns=['strings', 'uint8'],
).to_pandas()
tm.assert_frame_equal(df[['strings', 'uint8']], df_read)
@pytest.mark.pandas
def test_pandas_parquet_empty_roundtrip():
df = _test_dataframe(0)
arrow_table = pa.Table.from_pandas(df)
imos = pa.BufferOutputStream()
_write_table(arrow_table, imos, version='2.6')
buf = imos.getvalue()
reader = pa.BufferReader(buf)
df_read = _read_table(reader).to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
def test_pandas_can_write_nested_data():
data = {
"agg_col": [
{"page_type": 1},
{"record_type": 1},
{"non_consecutive_home": 0},
],
"uid_first": "1001"
}
df = pd.DataFrame(data=data)
arrow_table = pa.Table.from_pandas(df)
imos = pa.BufferOutputStream()
# This succeeds under V2
_write_table(arrow_table, imos)
@pytest.mark.pandas
def test_pandas_parquet_pyfile_roundtrip(tempdir):
filename = tempdir / 'pandas_pyfile_roundtrip.parquet'
size = 5
df = pd.DataFrame({
'int64': np.arange(size, dtype=np.int64),
'float32': np.arange(size, dtype=np.float32),
'float64': np.arange(size, dtype=np.float64),
'bool': np.random.randn(size) > 0,
'strings': ['foo', 'bar', None, 'baz', 'qux']
})
arrow_table = pa.Table.from_pandas(df)
with filename.open('wb') as f:
_write_table(arrow_table, f, version="2.6")
data = io.BytesIO(filename.read_bytes())
table_read = _read_table(data)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
def test_pandas_parquet_configuration_options(tempdir):
size = 10000
np.random.seed(0)
df = pd.DataFrame({
'uint8': np.arange(size, dtype=np.uint8),
'uint16': np.arange(size, dtype=np.uint16),
'uint32': np.arange(size, dtype=np.uint32),
'uint64': np.arange(size, dtype=np.uint64),
'int8': np.arange(size, dtype=np.int16),
'int16': np.arange(size, dtype=np.int16),
'int32': np.arange(size, dtype=np.int32),
'int64': np.arange(size, dtype=np.int64),
'float32': np.arange(size, dtype=np.float32),
'float64': np.arange(size, dtype=np.float64),
'bool': np.random.randn(size) > 0
})
filename = tempdir / 'pandas_roundtrip.parquet'
arrow_table = pa.Table.from_pandas(df)
for use_dictionary in [True, False]:
_write_table(arrow_table, filename, version='2.6',
use_dictionary=use_dictionary)
table_read = _read_table(filename)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
for write_statistics in [True, False]:
_write_table(arrow_table, filename, version='2.6',
write_statistics=write_statistics)
table_read = _read_table(filename)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
for compression in ['NONE', 'SNAPPY', 'GZIP', 'LZ4', 'ZSTD']:
if (compression != 'NONE' and
not pa.lib.Codec.is_available(compression)):
continue
_write_table(arrow_table, filename, version='2.6',
compression=compression)
table_read = _read_table(filename)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
@pytest.mark.filterwarnings("ignore:Parquet format '2.0':FutureWarning")
def test_spark_flavor_preserves_pandas_metadata():
df = _test_dataframe(size=100)
df.index = np.arange(0, 10 * len(df), 10)
df.index.name = 'foo'
result = _roundtrip_pandas_dataframe(df, {'version': '2.0',
'flavor': 'spark'})
tm.assert_frame_equal(result, df)
@pytest.mark.pandas
def test_index_column_name_duplicate(tempdir):
data = {
'close': {
pd.Timestamp('2017-06-30 01:31:00'): 154.99958999999998,
pd.Timestamp('2017-06-30 01:32:00'): 154.99958999999998,
},
'time': {
pd.Timestamp('2017-06-30 01:31:00'): pd.Timestamp(
'2017-06-30 01:31:00'
),
pd.Timestamp('2017-06-30 01:32:00'): pd.Timestamp(
'2017-06-30 01:32:00'
),
}
}
path = str(tempdir / 'data.parquet')
# Pandas v2 defaults to [ns], but Arrow defaults to [us] time units
# so we need to cast the pandas dtype. Pandas v1 will always silently
# coerce to [ns] due to lack of non-[ns] support.
dfx = pd.DataFrame(data, dtype='datetime64[us]').set_index('time', drop=False)
tdfx = pa.Table.from_pandas(dfx)
_write_table(tdfx, path)
arrow_table = _read_table(path)
result_df = arrow_table.to_pandas()
tm.assert_frame_equal(result_df, dfx)
@pytest.mark.pandas
def test_multiindex_duplicate_values(tempdir):
num_rows = 3
numbers = list(range(num_rows))
index = pd.MultiIndex.from_arrays(
[['foo', 'foo', 'bar'], numbers],
names=['foobar', 'some_numbers'],
)
df = pd.DataFrame({'numbers': numbers}, index=index)
table = pa.Table.from_pandas(df)
filename = tempdir / 'dup_multi_index_levels.parquet'
_write_table(table, filename)
result_table = _read_table(filename)
assert table.equals(result_table)
result_df = result_table.to_pandas()
tm.assert_frame_equal(result_df, df)
@pytest.mark.pandas
def test_backwards_compatible_index_naming(datadir):
expected_string = b"""\
carat cut color clarity depth table price x y z
0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43
0.21 Premium E SI1 59.8 61.0 326 3.89 3.84 2.31
0.23 Good E VS1 56.9 65.0 327 4.05 4.07 2.31
0.29 Premium I VS2 62.4 58.0 334 4.20 4.23 2.63
0.31 Good J SI2 63.3 58.0 335 4.34 4.35 2.75
0.24 Very Good J VVS2 62.8 57.0 336 3.94 3.96 2.48
Loading ...