Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev70 

/ tests / parquet / test_datetime.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import datetime
import io
import warnings

try:
    import numpy as np
except ImportError:
    np = None
import pytest

import pyarrow as pa
from pyarrow.tests.parquet.common import _check_roundtrip

try:
    import pyarrow.parquet as pq
    from pyarrow.tests.parquet.common import _read_table, _write_table
except ImportError:
    pq = None


try:
    import pandas as pd
    import pandas.testing as tm

    from pyarrow.tests.parquet.common import _roundtrip_pandas_dataframe
except ImportError:
    pd = tm = None


# Marks all of the tests in this module
# Ignore these with pytest ... -m 'not parquet'
pytestmark = pytest.mark.parquet


@pytest.mark.pandas
def test_pandas_parquet_datetime_tz():
    # Pandas v2 defaults to [ns], but Arrow defaults to [us] time units
    # so we need to cast the pandas dtype. Pandas v1 will always silently
    # coerce to [ns] due to lack of non-[ns] support.
    s = pd.Series([datetime.datetime(2017, 9, 6)], dtype='datetime64[us]')
    s = s.dt.tz_localize('utc')
    s.index = s

    # Both a column and an index to hit both use cases
    df = pd.DataFrame({'tz_aware': s,
                       'tz_eastern': s.dt.tz_convert('US/Eastern')},
                      index=s)

    f = io.BytesIO()

    arrow_table = pa.Table.from_pandas(df)

    _write_table(arrow_table, f)
    f.seek(0)

    table_read = pq.read_pandas(f)

    df_read = table_read.to_pandas()
    tm.assert_frame_equal(df, df_read)


@pytest.mark.pandas
def test_datetime_timezone_tzinfo():
    value = datetime.datetime(2018, 1, 1, 1, 23, 45,
                              tzinfo=datetime.timezone.utc)
    df = pd.DataFrame({'foo': [value]})

    _roundtrip_pandas_dataframe(df, write_kwargs={})


@pytest.mark.pandas
def test_coerce_timestamps(tempdir):
    from collections import OrderedDict

    # ARROW-622
    arrays = OrderedDict()
    fields = [pa.field('datetime64',
                       pa.list_(pa.timestamp('ms')))]
    arrays['datetime64'] = [
        np.array(['2007-07-13T01:23:34.123456789',
                  None,
                  '2010-08-13T05:46:57.437699912'],
                 dtype='datetime64[ms]'),
        None,
        None,
        np.array(['2007-07-13T02',
                  None,
                  '2010-08-13T05:46:57.437699912'],
                 dtype='datetime64[ms]'),
    ]

    df = pd.DataFrame(arrays)
    schema = pa.schema(fields)

    filename = tempdir / 'pandas_roundtrip.parquet'
    arrow_table = pa.Table.from_pandas(df, schema=schema)

    _write_table(arrow_table, filename, version='2.6', coerce_timestamps='us')
    table_read = _read_table(filename)
    df_read = table_read.to_pandas()

    df_expected = df.copy()
    for i, x in enumerate(df_expected['datetime64']):
        if isinstance(x, np.ndarray):
            df_expected.loc[i, 'datetime64'] = x.astype('M8[us]')

    tm.assert_frame_equal(df_expected, df_read)

    with pytest.raises(ValueError):
        _write_table(arrow_table, filename, version='2.6',
                     coerce_timestamps='unknown')


@pytest.mark.pandas
def test_coerce_timestamps_truncated(tempdir):
    """
    ARROW-2555: Test that we can truncate timestamps when coercing if
    explicitly allowed.
    """
    dt_us = datetime.datetime(year=2017, month=1, day=1, hour=1, minute=1,
                              second=1, microsecond=1)
    dt_ms = datetime.datetime(year=2017, month=1, day=1, hour=1, minute=1,
                              second=1)

    fields_us = [pa.field('datetime64', pa.timestamp('us'))]
    arrays_us = {'datetime64': [dt_us, dt_ms]}

    df_us = pd.DataFrame(arrays_us)
    schema_us = pa.schema(fields_us)

    filename = tempdir / 'pandas_truncated.parquet'
    table_us = pa.Table.from_pandas(df_us, schema=schema_us)

    _write_table(table_us, filename, version='2.6', coerce_timestamps='ms',
                 allow_truncated_timestamps=True)
    table_ms = _read_table(filename)
    df_ms = table_ms.to_pandas()

    arrays_expected = {'datetime64': [dt_ms, dt_ms]}
    df_expected = pd.DataFrame(arrays_expected, dtype='datetime64[ms]')
    tm.assert_frame_equal(df_expected, df_ms)


@pytest.mark.pandas
def test_date_time_types(tempdir):
    t1 = pa.date32()
    data1 = np.array([17259, 17260, 17261], dtype='int32')
    a1 = pa.array(data1, type=t1)

    t2 = pa.date64()
    data2 = data1.astype('int64') * 86400000
    a2 = pa.array(data2, type=t2)

    t3 = pa.timestamp('us')
    start = pd.Timestamp('2001-01-01').value / 1000
    data3 = np.array([start, start + 1, start + 2], dtype='int64')
    a3 = pa.array(data3, type=t3)

    t4 = pa.time32('ms')
    data4 = np.arange(3, dtype='i4')
    a4 = pa.array(data4, type=t4)

    t5 = pa.time64('us')
    a5 = pa.array(data4.astype('int64'), type=t5)

    t6 = pa.time32('s')
    a6 = pa.array(data4, type=t6)

    ex_t6 = pa.time32('ms')
    ex_a6 = pa.array(data4 * 1000, type=ex_t6)

    t7 = pa.timestamp('ns')
    start = pd.Timestamp('2001-01-01').value
    data7 = np.array([start, start + 1000, start + 2000],
                     dtype='int64')
    a7 = pa.array(data7, type=t7)

    table = pa.Table.from_arrays([a1, a2, a3, a4, a5, a6, a7],
                                 ['date32', 'date64', 'timestamp[us]',
                                  'time32[s]', 'time64[us]',
                                  'time32_from64[s]',
                                  'timestamp[ns]'])

    # date64 as date32
    # time32[s] to time32[ms]
    expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7],
                                    ['date32', 'date64', 'timestamp[us]',
                                     'time32[s]', 'time64[us]',
                                     'time32_from64[s]',
                                     'timestamp[ns]'])

    _check_roundtrip(table, expected=expected, version='2.6')

    t0 = pa.timestamp('ms')
    data0 = np.arange(4, dtype='int64')
    a0 = pa.array(data0, type=t0)

    t1 = pa.timestamp('us')
    data1 = np.arange(4, dtype='int64')
    a1 = pa.array(data1, type=t1)

    t2 = pa.timestamp('ns')
    data2 = np.arange(4, dtype='int64')
    a2 = pa.array(data2, type=t2)

    table = pa.Table.from_arrays([a0, a1, a2],
                                 ['ts[ms]', 'ts[us]', 'ts[ns]'])
    expected = pa.Table.from_arrays([a0, a1, a2],
                                    ['ts[ms]', 'ts[us]', 'ts[ns]'])

    # int64 for all timestamps supported by default
    filename = tempdir / 'int64_timestamps.parquet'
    _write_table(table, filename, version='2.6')
    parquet_schema = pq.ParquetFile(filename).schema
    for i in range(3):
        assert parquet_schema.column(i).physical_type == 'INT64'
    read_table = _read_table(filename)
    assert read_table.equals(expected)

    t0_ns = pa.timestamp('ns')
    data0_ns = np.array(data0 * 1000000, dtype='int64')
    a0_ns = pa.array(data0_ns, type=t0_ns)

    t1_ns = pa.timestamp('ns')
    data1_ns = np.array(data1 * 1000, dtype='int64')
    a1_ns = pa.array(data1_ns, type=t1_ns)

    expected = pa.Table.from_arrays([a0_ns, a1_ns, a2],
                                    ['ts[ms]', 'ts[us]', 'ts[ns]'])

    # int96 nanosecond timestamps produced upon request
    filename = tempdir / 'explicit_int96_timestamps.parquet'
    _write_table(table, filename, version='2.6',
                 use_deprecated_int96_timestamps=True)
    parquet_schema = pq.ParquetFile(filename).schema
    for i in range(3):
        assert parquet_schema.column(i).physical_type == 'INT96'
    read_table = _read_table(filename)
    assert read_table.equals(expected)

    # int96 nanosecond timestamps implied by flavor 'spark'
    filename = tempdir / 'spark_int96_timestamps.parquet'
    _write_table(table, filename, version='2.6',
                 flavor='spark')
    parquet_schema = pq.ParquetFile(filename).schema
    for i in range(3):
        assert parquet_schema.column(i).physical_type == 'INT96'
    read_table = _read_table(filename)
    assert read_table.equals(expected)


@pytest.mark.pandas
@pytest.mark.parametrize('unit', ['s', 'ms', 'us', 'ns'])
def test_coerce_int96_timestamp_unit(unit):
    i_s = pd.Timestamp('2010-01-01').value / 1000000000  # := 1262304000

    d_s = np.arange(i_s, i_s + 10, 1, dtype='int64')
    d_ms = d_s * 1000
    d_us = d_ms * 1000
    d_ns = d_us * 1000

    a_s = pa.array(d_s, type=pa.timestamp('s'))
    a_ms = pa.array(d_ms, type=pa.timestamp('ms'))
    a_us = pa.array(d_us, type=pa.timestamp('us'))
    a_ns = pa.array(d_ns, type=pa.timestamp('ns'))

    arrays = {"s": a_s, "ms": a_ms, "us": a_us, "ns": a_ns}
    names = ['ts_s', 'ts_ms', 'ts_us', 'ts_ns']
    table = pa.Table.from_arrays([a_s, a_ms, a_us, a_ns], names)

    # For either Parquet version, coercing to nanoseconds is allowed
    # if Int96 storage is used
    expected = pa.Table.from_arrays([arrays.get(unit)]*4, names)
    read_table_kwargs = {"coerce_int96_timestamp_unit": unit}
    _check_roundtrip(table, expected,
                     read_table_kwargs=read_table_kwargs,
                     use_deprecated_int96_timestamps=True)
    _check_roundtrip(table, expected, version='2.6',
                     read_table_kwargs=read_table_kwargs,
                     use_deprecated_int96_timestamps=True)


@pytest.mark.pandas
@pytest.mark.parametrize('pq_reader_method', ['ParquetFile', 'read_table'])
def test_coerce_int96_timestamp_overflow(pq_reader_method, tempdir):

    def get_table(pq_reader_method, filename, **kwargs):
        if pq_reader_method == "ParquetFile":
            return pq.ParquetFile(filename, **kwargs).read()
        elif pq_reader_method == "read_table":
            return pq.read_table(filename, **kwargs)

    # Recreating the initial JIRA issue referenced in ARROW-12096
    oob_dts = [
        datetime.datetime(1000, 1, 1),
        datetime.datetime(2000, 1, 1),
        datetime.datetime(3000, 1, 1)
    ]
    df = pd.DataFrame({"a": oob_dts})
    table = pa.table(df)

    filename = tempdir / "test_round_trip_overflow.parquet"
    pq.write_table(table, filename, use_deprecated_int96_timestamps=True,
                   version="1.0")

    # with the default resolution of ns, we get wrong values for INT96
    # that are out of bounds for nanosecond range
    tab_error = get_table(pq_reader_method, filename)
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore",
                                "Discarding nonzero nanoseconds in conversion",
                                UserWarning)
        assert tab_error["a"].to_pylist() != oob_dts

    # avoid this overflow by specifying the resolution to use for INT96 values
    tab_correct = get_table(
        pq_reader_method, filename, coerce_int96_timestamp_unit="s"
    )
    df_correct = tab_correct.to_pandas(timestamp_as_object=True)
    df["a"] = df["a"].astype(object)
    tm.assert_frame_equal(df, df_correct)


@pytest.mark.parametrize('unit', ['ms', 'us', 'ns'])
def test_timestamp_restore_timezone(unit):
    # ARROW-5888, restore timezone from serialized metadata
    ty = pa.timestamp(unit, tz='America/New_York')
    arr = pa.array([1, 2, 3], type=ty)
Loading ...