Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aaronreidsmith / pandas   python

Repository URL to install this package:

Version: 0.25.3 

/ tests / resample / test_time_grouper.py

from datetime import datetime
from operator import methodcaller

import numpy as np
import pytest

import pandas as pd
from pandas import DataFrame, Series
from pandas.core.groupby.grouper import Grouper
from pandas.core.indexes.datetimes import date_range
import pandas.util.testing as tm
from pandas.util.testing import assert_frame_equal, assert_series_equal

test_series = Series(np.random.randn(1000), index=date_range("1/1/2000", periods=1000))


def test_apply():
    grouper = Grouper(freq="A", label="right", closed="right")

    grouped = test_series.groupby(grouper)

    def f(x):
        return x.sort_values()[-3:]

    applied = grouped.apply(f)
    expected = test_series.groupby(lambda x: x.year).apply(f)

    applied.index = applied.index.droplevel(0)
    expected.index = expected.index.droplevel(0)
    assert_series_equal(applied, expected)


def test_count():
    test_series[::3] = np.nan

    expected = test_series.groupby(lambda x: x.year).count()

    grouper = Grouper(freq="A", label="right", closed="right")
    result = test_series.groupby(grouper).count()
    expected.index = result.index
    assert_series_equal(result, expected)

    result = test_series.resample("A").count()
    expected.index = result.index
    assert_series_equal(result, expected)


def test_numpy_reduction():
    result = test_series.resample("A", closed="right").prod()

    expected = test_series.groupby(lambda x: x.year).agg(np.prod)
    expected.index = result.index

    assert_series_equal(result, expected)


def test_apply_iteration():
    # #2300
    N = 1000
    ind = pd.date_range(start="2000-01-01", freq="D", periods=N)
    df = DataFrame({"open": 1, "close": 2}, index=ind)
    tg = Grouper(freq="M")

    _, grouper, _ = tg._get_grouper(df)

    # Errors
    grouped = df.groupby(grouper, group_keys=False)

    def f(df):
        return df["close"] / df["open"]

    # it works!
    result = grouped.apply(f)
    tm.assert_index_equal(result.index, df.index)


@pytest.mark.parametrize(
    "name, func",
    [
        ("Int64Index", tm.makeIntIndex),
        ("Index", tm.makeUnicodeIndex),
        ("Float64Index", tm.makeFloatIndex),
        ("MultiIndex", lambda m: tm.makeCustomIndex(m, 2)),
    ],
)
def test_fails_on_no_datetime_index(name, func):
    n = 2
    index = func(n)
    df = DataFrame({"a": np.random.randn(n)}, index=index)

    msg = (
        "Only valid with DatetimeIndex, TimedeltaIndex "
        "or PeriodIndex, but got an instance of '{}'".format(name)
    )
    with pytest.raises(TypeError, match=msg):
        df.groupby(Grouper(freq="D"))


def test_aaa_group_order():
    # GH 12840
    # check TimeGrouper perform stable sorts
    n = 20
    data = np.random.randn(n, 4)
    df = DataFrame(data, columns=["A", "B", "C", "D"])
    df["key"] = [
        datetime(2013, 1, 1),
        datetime(2013, 1, 2),
        datetime(2013, 1, 3),
        datetime(2013, 1, 4),
        datetime(2013, 1, 5),
    ] * 4
    grouped = df.groupby(Grouper(key="key", freq="D"))

    tm.assert_frame_equal(grouped.get_group(datetime(2013, 1, 1)), df[::5])
    tm.assert_frame_equal(grouped.get_group(datetime(2013, 1, 2)), df[1::5])
    tm.assert_frame_equal(grouped.get_group(datetime(2013, 1, 3)), df[2::5])
    tm.assert_frame_equal(grouped.get_group(datetime(2013, 1, 4)), df[3::5])
    tm.assert_frame_equal(grouped.get_group(datetime(2013, 1, 5)), df[4::5])


def test_aggregate_normal(resample_method):
    """Check TimeGrouper's aggregation is identical as normal groupby."""

    if resample_method == "ohlc":
        pytest.xfail(reason="DataError: No numeric types to aggregate")

    data = np.random.randn(20, 4)
    normal_df = DataFrame(data, columns=["A", "B", "C", "D"])
    normal_df["key"] = [1, 2, 3, 4, 5] * 4

    dt_df = DataFrame(data, columns=["A", "B", "C", "D"])
    dt_df["key"] = [
        datetime(2013, 1, 1),
        datetime(2013, 1, 2),
        datetime(2013, 1, 3),
        datetime(2013, 1, 4),
        datetime(2013, 1, 5),
    ] * 4

    normal_grouped = normal_df.groupby("key")
    dt_grouped = dt_df.groupby(Grouper(key="key", freq="D"))

    expected = getattr(normal_grouped, resample_method)()
    dt_result = getattr(dt_grouped, resample_method)()
    expected.index = date_range(start="2013-01-01", freq="D", periods=5, name="key")
    tm.assert_equal(expected, dt_result)

    # if TimeGrouper is used included, 'nth' doesn't work yet

    """
    for func in ['nth']:
        expected = getattr(normal_grouped, func)(3)
        expected.index = date_range(start='2013-01-01',
                                    freq='D', periods=5, name='key')
        dt_result = getattr(dt_grouped, func)(3)
        assert_frame_equal(expected, dt_result)
    """


@pytest.mark.parametrize(
    "method, method_args, unit",
    [
        ("sum", dict(), 0),
        ("sum", dict(min_count=0), 0),
        ("sum", dict(min_count=1), np.nan),
        ("prod", dict(), 1),
        ("prod", dict(min_count=0), 1),
        ("prod", dict(min_count=1), np.nan),
    ],
)
def test_resample_entirly_nat_window(method, method_args, unit):
    s = pd.Series([0] * 2 + [np.nan] * 2, index=pd.date_range("2017", periods=4))
    result = methodcaller(method, **method_args)(s.resample("2d"))
    expected = pd.Series(
        [0.0, unit], index=pd.to_datetime(["2017-01-01", "2017-01-03"])
    )
    tm.assert_series_equal(result, expected)


@pytest.mark.parametrize(
    "func, fill_value",
    [("min", np.nan), ("max", np.nan), ("sum", 0), ("prod", 1), ("count", 0)],
)
def test_aggregate_with_nat(func, fill_value):
    # check TimeGrouper's aggregation is identical as normal groupby
    # if NaT is included, 'var', 'std', 'mean', 'first','last'
    # and 'nth' doesn't work yet

    n = 20
    data = np.random.randn(n, 4).astype("int64")
    normal_df = DataFrame(data, columns=["A", "B", "C", "D"])
    normal_df["key"] = [1, 2, np.nan, 4, 5] * 4

    dt_df = DataFrame(data, columns=["A", "B", "C", "D"])
    dt_df["key"] = [
        datetime(2013, 1, 1),
        datetime(2013, 1, 2),
        pd.NaT,
        datetime(2013, 1, 4),
        datetime(2013, 1, 5),
    ] * 4

    normal_grouped = normal_df.groupby("key")
    dt_grouped = dt_df.groupby(Grouper(key="key", freq="D"))

    normal_result = getattr(normal_grouped, func)()
    dt_result = getattr(dt_grouped, func)()

    pad = DataFrame([[fill_value] * 4], index=[3], columns=["A", "B", "C", "D"])
    expected = normal_result.append(pad)
    expected = expected.sort_index()
    expected.index = date_range(start="2013-01-01", freq="D", periods=5, name="key")
    assert_frame_equal(expected, dt_result)
    assert dt_result.index.name == "key"


def test_aggregate_with_nat_size():
    # GH 9925
    n = 20
    data = np.random.randn(n, 4).astype("int64")
    normal_df = DataFrame(data, columns=["A", "B", "C", "D"])
    normal_df["key"] = [1, 2, np.nan, 4, 5] * 4

    dt_df = DataFrame(data, columns=["A", "B", "C", "D"])
    dt_df["key"] = [
        datetime(2013, 1, 1),
        datetime(2013, 1, 2),
        pd.NaT,
        datetime(2013, 1, 4),
        datetime(2013, 1, 5),
    ] * 4

    normal_grouped = normal_df.groupby("key")
    dt_grouped = dt_df.groupby(Grouper(key="key", freq="D"))

    normal_result = normal_grouped.size()
    dt_result = dt_grouped.size()

    pad = Series([0], index=[3])
    expected = normal_result.append(pad)
    expected = expected.sort_index()
    expected.index = date_range(start="2013-01-01", freq="D", periods=5, name="key")
    assert_series_equal(expected, dt_result)
    assert dt_result.index.name == "key"


def test_repr():
    # GH18203
    result = repr(Grouper(key="A", freq="H"))
    expected = (
        "TimeGrouper(key='A', freq=<Hour>, axis=0, sort=True, "
        "closed='left', label='left', how='mean', "
        "convention='e', base=0)"
    )
    assert result == expected


@pytest.mark.parametrize(
    "method, method_args, expected_values",
    [
        ("sum", dict(), [1, 0, 1]),
        ("sum", dict(min_count=0), [1, 0, 1]),
        ("sum", dict(min_count=1), [1, np.nan, 1]),
        ("sum", dict(min_count=2), [np.nan, np.nan, np.nan]),
        ("prod", dict(), [1, 1, 1]),
        ("prod", dict(min_count=0), [1, 1, 1]),
        ("prod", dict(min_count=1), [1, np.nan, 1]),
        ("prod", dict(min_count=2), [np.nan, np.nan, np.nan]),
    ],
)
def test_upsample_sum(method, method_args, expected_values):
    s = pd.Series(1, index=pd.date_range("2017", periods=2, freq="H"))
    resampled = s.resample("30T")
    index = pd.to_datetime(
        ["2017-01-01T00:00:00", "2017-01-01T00:30:00", "2017-01-01T01:00:00"]
    )
    result = methodcaller(method, **method_args)(resampled)
    expected = pd.Series(expected_values, index=index)
    tm.assert_series_equal(result, expected)