# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import datetime
import inspect
import os
import pathlib
import sys
try:
import numpy as np
except ImportError:
np = None
import pytest
import unittest.mock as mock
import pyarrow as pa
import pyarrow.compute as pc
from pyarrow.fs import (FileSelector, FileSystem, LocalFileSystem,
PyFileSystem, SubTreeFileSystem, FSSpecHandler)
from pyarrow.tests import util
from pyarrow.util import guid
try:
import pyarrow.parquet as pq
from pyarrow.tests.parquet.common import (
_read_table, _test_dataframe, _write_table)
except ImportError:
pq = None
try:
import pandas as pd
import pandas.testing as tm
except ImportError:
pd = tm = None
# Marks all of the tests in this module
# Ignore these with pytest ... -m 'not parquet'
pytestmark = [pytest.mark.parquet, pytest.mark.dataset]
def test_filesystem_uri(tempdir):
table = pa.table({"a": [1, 2, 3]})
directory = tempdir / "data_dir"
directory.mkdir()
path = directory / "data.parquet"
pq.write_table(table, str(path))
# filesystem object
result = pq.read_table(
path, filesystem=LocalFileSystem())
assert result.equals(table)
# filesystem URI
result = pq.read_table(
"data_dir/data.parquet", filesystem=util._filesystem_uri(tempdir))
assert result.equals(table)
@pytest.mark.pandas
def test_read_partitioned_directory(tempdir):
local = LocalFileSystem()
_partition_test_for_filesystem(local, tempdir)
@pytest.mark.pandas
def test_read_partitioned_columns_selection(tempdir):
# ARROW-3861 - do not include partition columns in resulting table when
# `columns` keyword was passed without those columns
local = LocalFileSystem()
base_path = tempdir
_partition_test_for_filesystem(local, base_path)
dataset = pq.ParquetDataset(base_path)
result = dataset.read(columns=["values"])
assert result.column_names == ["values"]
@pytest.mark.pandas
def test_filters_equivalency(tempdir):
local = LocalFileSystem()
base_path = tempdir
integer_keys = [0, 1]
string_keys = ['a', 'b', 'c']
boolean_keys = [True, False]
partition_spec = [
['integer', integer_keys],
['string', string_keys],
['boolean', boolean_keys]
]
df = pd.DataFrame({
'integer': np.array(integer_keys, dtype='i4').repeat(15),
'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2),
'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5), 3),
'values': np.arange(30),
})
_generate_partition_directories(local, base_path, partition_spec, df)
# Old filters syntax:
# integer == 1 AND string != b AND boolean == True
dataset = pq.ParquetDataset(
base_path, filesystem=local,
filters=[('integer', '=', 1), ('string', '!=', 'b'),
('boolean', '==', 'True')],
)
table = dataset.read()
result_df = (table.to_pandas().reset_index(drop=True))
assert 0 not in result_df['integer'].values
assert 'b' not in result_df['string'].values
assert False not in result_df['boolean'].values
# filters in disjunctive normal form:
# (integer == 1 AND string != b AND boolean == True) OR
# (integer == 2 AND boolean == False)
# TODO(ARROW-3388): boolean columns are reconstructed as string
filters = [
[
('integer', '=', 1),
('string', '!=', 'b'),
('boolean', '==', 'True')
],
[('integer', '=', 0), ('boolean', '==', 'False')]
]
dataset = pq.ParquetDataset(
base_path, filesystem=local, filters=filters)
table = dataset.read()
result_df = table.to_pandas().reset_index(drop=True)
# Check that all rows in the DF fulfill the filter
df_filter_1 = (result_df['integer'] == 1) \
& (result_df['string'] != 'b') \
& (result_df['boolean'] == 'True')
df_filter_2 = (np.array(result_df['integer']) == 0) \
& (result_df['boolean'] == 'False')
assert df_filter_1.sum() > 0
assert df_filter_2.sum() > 0
assert result_df.shape[0] == (df_filter_1.sum() + df_filter_2.sum())
for filters in [[[('string', '==', b'1\0a')]],
[[('string', '==', '1\0a')]]]:
dataset = pq.ParquetDataset(
base_path, filesystem=local, filters=filters)
assert dataset.read().num_rows == 0
@pytest.mark.pandas
def test_filters_cutoff_exclusive_integer(tempdir):
local = LocalFileSystem()
base_path = tempdir
integer_keys = [0, 1, 2, 3, 4]
partition_spec = [
['integers', integer_keys],
]
N = 5
df = pd.DataFrame({
'index': np.arange(N),
'integers': np.array(integer_keys, dtype='i4'),
}, columns=['index', 'integers'])
_generate_partition_directories(local, base_path, partition_spec, df)
dataset = pq.ParquetDataset(
base_path, filesystem=local,
filters=[
('integers', '<', 4),
('integers', '>', 1),
],
)
table = dataset.read()
result_df = (table.to_pandas()
.sort_values(by='index')
.reset_index(drop=True))
result_list = [x for x in map(int, result_df['integers'].values)]
assert result_list == [2, 3]
@pytest.mark.xfail(
# different error with use_legacy_datasets because result_df is no longer
# categorical
raises=(TypeError, AssertionError),
reason='Loss of type information in creation of categoricals.'
)
@pytest.mark.pandas
def test_filters_cutoff_exclusive_datetime(tempdir):
local = LocalFileSystem()
base_path = tempdir
date_keys = [
datetime.date(2018, 4, 9),
datetime.date(2018, 4, 10),
datetime.date(2018, 4, 11),
datetime.date(2018, 4, 12),
datetime.date(2018, 4, 13)
]
partition_spec = [
['dates', date_keys]
]
N = 5
df = pd.DataFrame({
'index': np.arange(N),
'dates': np.array(date_keys, dtype='datetime64'),
}, columns=['index', 'dates'])
_generate_partition_directories(local, base_path, partition_spec, df)
dataset = pq.ParquetDataset(
base_path, filesystem=local,
filters=[
('dates', '<', "2018-04-12"),
('dates', '>', "2018-04-10")
],
)
table = dataset.read()
result_df = (table.to_pandas()
.sort_values(by='index')
.reset_index(drop=True))
expected = pd.Categorical(
np.array([datetime.date(2018, 4, 11)], dtype='datetime64'),
categories=np.array(date_keys, dtype='datetime64'))
assert result_df['dates'].values == expected
@pytest.mark.pandas
def test_filters_inclusive_datetime(tempdir):
# ARROW-11480
path = tempdir / 'timestamps.parquet'
pd.DataFrame({
"dates": pd.date_range("2020-01-01", periods=10, freq="D"),
"id": range(10)
}).to_parquet(path, use_deprecated_int96_timestamps=True)
table = pq.read_table(path, filters=[
("dates", "<=", datetime.datetime(2020, 1, 5))
])
assert table.column('id').to_pylist() == [0, 1, 2, 3, 4]
@pytest.mark.pandas
def test_filters_inclusive_integer(tempdir):
local = LocalFileSystem()
base_path = tempdir
integer_keys = [0, 1, 2, 3, 4]
partition_spec = [
['integers', integer_keys],
]
N = 5
df = pd.DataFrame({
'index': np.arange(N),
'integers': np.array(integer_keys, dtype='i4'),
}, columns=['index', 'integers'])
_generate_partition_directories(local, base_path, partition_spec, df)
dataset = pq.ParquetDataset(
base_path, filesystem=local,
filters=[
('integers', '<=', 3),
('integers', '>=', 2),
],
)
table = dataset.read()
result_df = (table.to_pandas()
.sort_values(by='index')
.reset_index(drop=True))
result_list = [int(x) for x in map(int, result_df['integers'].values)]
assert result_list == [2, 3]
@pytest.mark.pandas
def test_filters_inclusive_set(tempdir):
local = LocalFileSystem()
base_path = tempdir
integer_keys = [0, 1]
string_keys = ['a', 'b', 'c']
boolean_keys = [True, False]
partition_spec = [
['integer', integer_keys],
['string', string_keys],
['boolean', boolean_keys]
]
df = pd.DataFrame({
'integer': np.array(integer_keys, dtype='i4').repeat(15),
'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2),
'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5), 3),
'values': np.arange(30),
})
_generate_partition_directories(local, base_path, partition_spec, df)
dataset = pq.ParquetDataset(
base_path, filesystem=local,
filters=[('string', 'in', 'ab')],
)
table = dataset.read()
result_df = (table.to_pandas().reset_index(drop=True))
assert 'a' in result_df['string'].values
assert 'b' in result_df['string'].values
assert 'c' not in result_df['string'].values
dataset = pq.ParquetDataset(
base_path, filesystem=local,
filters=[('integer', 'in', [1]), ('string', 'in', ('a', 'b')),
('boolean', 'not in', {'False'})],
)
table = dataset.read()
result_df = (table.to_pandas().reset_index(drop=True))
assert 0 not in result_df['integer'].values
assert 'c' not in result_df['string'].values
Loading ...