Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev70 

/ tests / parquet / test_dataset.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import datetime
import inspect
import os
import pathlib
import sys

try:
    import numpy as np
except ImportError:
    np = None
import pytest
import unittest.mock as mock

import pyarrow as pa
import pyarrow.compute as pc
from pyarrow.fs import (FileSelector, FileSystem, LocalFileSystem,
                        PyFileSystem, SubTreeFileSystem, FSSpecHandler)
from pyarrow.tests import util
from pyarrow.util import guid

try:
    import pyarrow.parquet as pq
    from pyarrow.tests.parquet.common import (
        _read_table, _test_dataframe, _write_table)
except ImportError:
    pq = None


try:
    import pandas as pd
    import pandas.testing as tm

except ImportError:
    pd = tm = None


# Marks all of the tests in this module
# Ignore these with pytest ... -m 'not parquet'
pytestmark = [pytest.mark.parquet, pytest.mark.dataset]


def test_filesystem_uri(tempdir):
    table = pa.table({"a": [1, 2, 3]})

    directory = tempdir / "data_dir"
    directory.mkdir()
    path = directory / "data.parquet"
    pq.write_table(table, str(path))

    # filesystem object
    result = pq.read_table(
        path, filesystem=LocalFileSystem())
    assert result.equals(table)

    # filesystem URI
    result = pq.read_table(
        "data_dir/data.parquet", filesystem=util._filesystem_uri(tempdir))
    assert result.equals(table)


@pytest.mark.pandas
def test_read_partitioned_directory(tempdir):
    local = LocalFileSystem()
    _partition_test_for_filesystem(local, tempdir)


@pytest.mark.pandas
def test_read_partitioned_columns_selection(tempdir):
    # ARROW-3861 - do not include partition columns in resulting table when
    # `columns` keyword was passed without those columns
    local = LocalFileSystem()
    base_path = tempdir
    _partition_test_for_filesystem(local, base_path)

    dataset = pq.ParquetDataset(base_path)
    result = dataset.read(columns=["values"])
    assert result.column_names == ["values"]


@pytest.mark.pandas
def test_filters_equivalency(tempdir):
    local = LocalFileSystem()
    base_path = tempdir

    integer_keys = [0, 1]
    string_keys = ['a', 'b', 'c']
    boolean_keys = [True, False]
    partition_spec = [
        ['integer', integer_keys],
        ['string', string_keys],
        ['boolean', boolean_keys]
    ]

    df = pd.DataFrame({
        'integer': np.array(integer_keys, dtype='i4').repeat(15),
        'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2),
        'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5), 3),
        'values': np.arange(30),
    })

    _generate_partition_directories(local, base_path, partition_spec, df)

    # Old filters syntax:
    #  integer == 1 AND string != b AND boolean == True
    dataset = pq.ParquetDataset(
        base_path, filesystem=local,
        filters=[('integer', '=', 1), ('string', '!=', 'b'),
                 ('boolean', '==', 'True')],
    )
    table = dataset.read()
    result_df = (table.to_pandas().reset_index(drop=True))

    assert 0 not in result_df['integer'].values
    assert 'b' not in result_df['string'].values
    assert False not in result_df['boolean'].values

    # filters in disjunctive normal form:
    #  (integer == 1 AND string != b AND boolean == True) OR
    #  (integer == 2 AND boolean == False)
    # TODO(ARROW-3388): boolean columns are reconstructed as string
    filters = [
        [
            ('integer', '=', 1),
            ('string', '!=', 'b'),
            ('boolean', '==', 'True')
        ],
        [('integer', '=', 0), ('boolean', '==', 'False')]
    ]
    dataset = pq.ParquetDataset(
        base_path, filesystem=local, filters=filters)
    table = dataset.read()
    result_df = table.to_pandas().reset_index(drop=True)

    # Check that all rows in the DF fulfill the filter
    df_filter_1 = (result_df['integer'] == 1) \
        & (result_df['string'] != 'b') \
        & (result_df['boolean'] == 'True')
    df_filter_2 = (np.array(result_df['integer']) == 0) \
        & (result_df['boolean'] == 'False')
    assert df_filter_1.sum() > 0
    assert df_filter_2.sum() > 0
    assert result_df.shape[0] == (df_filter_1.sum() + df_filter_2.sum())

    for filters in [[[('string', '==', b'1\0a')]],
                    [[('string', '==', '1\0a')]]]:
        dataset = pq.ParquetDataset(
            base_path, filesystem=local, filters=filters)
        assert dataset.read().num_rows == 0


@pytest.mark.pandas
def test_filters_cutoff_exclusive_integer(tempdir):
    local = LocalFileSystem()
    base_path = tempdir

    integer_keys = [0, 1, 2, 3, 4]
    partition_spec = [
        ['integers', integer_keys],
    ]
    N = 5

    df = pd.DataFrame({
        'index': np.arange(N),
        'integers': np.array(integer_keys, dtype='i4'),
    }, columns=['index', 'integers'])

    _generate_partition_directories(local, base_path, partition_spec, df)

    dataset = pq.ParquetDataset(
        base_path, filesystem=local,
        filters=[
            ('integers', '<', 4),
            ('integers', '>', 1),
        ],
    )
    table = dataset.read()
    result_df = (table.to_pandas()
                      .sort_values(by='index')
                      .reset_index(drop=True))

    result_list = [x for x in map(int, result_df['integers'].values)]
    assert result_list == [2, 3]


@pytest.mark.xfail(
    # different error with use_legacy_datasets because result_df is no longer
    # categorical
    raises=(TypeError, AssertionError),
    reason='Loss of type information in creation of categoricals.'
)
@pytest.mark.pandas
def test_filters_cutoff_exclusive_datetime(tempdir):
    local = LocalFileSystem()
    base_path = tempdir

    date_keys = [
        datetime.date(2018, 4, 9),
        datetime.date(2018, 4, 10),
        datetime.date(2018, 4, 11),
        datetime.date(2018, 4, 12),
        datetime.date(2018, 4, 13)
    ]
    partition_spec = [
        ['dates', date_keys]
    ]
    N = 5

    df = pd.DataFrame({
        'index': np.arange(N),
        'dates': np.array(date_keys, dtype='datetime64'),
    }, columns=['index', 'dates'])

    _generate_partition_directories(local, base_path, partition_spec, df)

    dataset = pq.ParquetDataset(
        base_path, filesystem=local,
        filters=[
            ('dates', '<', "2018-04-12"),
            ('dates', '>', "2018-04-10")
        ],
    )
    table = dataset.read()
    result_df = (table.to_pandas()
                      .sort_values(by='index')
                      .reset_index(drop=True))

    expected = pd.Categorical(
        np.array([datetime.date(2018, 4, 11)], dtype='datetime64'),
        categories=np.array(date_keys, dtype='datetime64'))

    assert result_df['dates'].values == expected


@pytest.mark.pandas
def test_filters_inclusive_datetime(tempdir):
    # ARROW-11480
    path = tempdir / 'timestamps.parquet'

    pd.DataFrame({
        "dates": pd.date_range("2020-01-01", periods=10, freq="D"),
        "id": range(10)
    }).to_parquet(path, use_deprecated_int96_timestamps=True)

    table = pq.read_table(path, filters=[
        ("dates", "<=", datetime.datetime(2020, 1, 5))
    ])

    assert table.column('id').to_pylist() == [0, 1, 2, 3, 4]


@pytest.mark.pandas
def test_filters_inclusive_integer(tempdir):
    local = LocalFileSystem()
    base_path = tempdir

    integer_keys = [0, 1, 2, 3, 4]
    partition_spec = [
        ['integers', integer_keys],
    ]
    N = 5

    df = pd.DataFrame({
        'index': np.arange(N),
        'integers': np.array(integer_keys, dtype='i4'),
    }, columns=['index', 'integers'])

    _generate_partition_directories(local, base_path, partition_spec, df)

    dataset = pq.ParquetDataset(
        base_path, filesystem=local,
        filters=[
            ('integers', '<=', 3),
            ('integers', '>=', 2),
        ],
    )
    table = dataset.read()
    result_df = (table.to_pandas()
                 .sort_values(by='index')
                 .reset_index(drop=True))

    result_list = [int(x) for x in map(int, result_df['integers'].values)]
    assert result_list == [2, 3]


@pytest.mark.pandas
def test_filters_inclusive_set(tempdir):
    local = LocalFileSystem()
    base_path = tempdir

    integer_keys = [0, 1]
    string_keys = ['a', 'b', 'c']
    boolean_keys = [True, False]
    partition_spec = [
        ['integer', integer_keys],
        ['string', string_keys],
        ['boolean', boolean_keys]
    ]

    df = pd.DataFrame({
        'integer': np.array(integer_keys, dtype='i4').repeat(15),
        'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2),
        'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5), 3),
        'values': np.arange(30),
    })

    _generate_partition_directories(local, base_path, partition_spec, df)

    dataset = pq.ParquetDataset(
        base_path, filesystem=local,
        filters=[('string', 'in', 'ab')],
    )
    table = dataset.read()
    result_df = (table.to_pandas().reset_index(drop=True))

    assert 'a' in result_df['string'].values
    assert 'b' in result_df['string'].values
    assert 'c' not in result_df['string'].values

    dataset = pq.ParquetDataset(
        base_path, filesystem=local,
        filters=[('integer', 'in', [1]), ('string', 'in', ('a', 'b')),
                 ('boolean', 'not in', {'False'})],
    )
    table = dataset.read()
    result_df = (table.to_pandas().reset_index(drop=True))

    assert 0 not in result_df['integer'].values
    assert 'c' not in result_df['string'].values
Loading ...