import numpy as np
import pytest
import pandas as pd
from pandas import DataFrame, Series
import pandas.util.testing as tm
class TestGrouperGrouping:
def setup_method(self, method):
self.series = Series(np.arange(10))
self.frame = DataFrame({"A": [1] * 20 + [2] * 12 + [3] * 8, "B": np.arange(40)})
def test_mutated(self):
msg = r"group\(\) got an unexpected keyword argument 'foo'"
with pytest.raises(TypeError, match=msg):
self.frame.groupby("A", foo=1)
g = self.frame.groupby("A")
assert not g.mutated
g = self.frame.groupby("A", mutated=True)
assert g.mutated
def test_getitem(self):
g = self.frame.groupby("A")
g_mutated = self.frame.groupby("A", mutated=True)
expected = g_mutated.B.apply(lambda x: x.rolling(2).mean())
result = g.rolling(2).mean().B
tm.assert_series_equal(result, expected)
result = g.rolling(2).B.mean()
tm.assert_series_equal(result, expected)
result = g.B.rolling(2).mean()
tm.assert_series_equal(result, expected)
result = self.frame.B.groupby(self.frame.A).rolling(2).mean()
tm.assert_series_equal(result, expected)
def test_getitem_multiple(self):
# GH 13174
g = self.frame.groupby("A")
r = g.rolling(2)
g_mutated = self.frame.groupby("A", mutated=True)
expected = g_mutated.B.apply(lambda x: x.rolling(2).count())
result = r.B.count()
tm.assert_series_equal(result, expected)
result = r.B.count()
tm.assert_series_equal(result, expected)
def test_rolling(self):
g = self.frame.groupby("A")
r = g.rolling(window=4)
for f in ["sum", "mean", "min", "max", "count", "kurt", "skew"]:
result = getattr(r, f)()
expected = g.apply(lambda x: getattr(x.rolling(4), f)())
tm.assert_frame_equal(result, expected)
for f in ["std", "var"]:
result = getattr(r, f)(ddof=1)
expected = g.apply(lambda x: getattr(x.rolling(4), f)(ddof=1))
tm.assert_frame_equal(result, expected)
result = r.quantile(0.5)
expected = g.apply(lambda x: x.rolling(4).quantile(0.5))
tm.assert_frame_equal(result, expected)
def test_rolling_corr_cov(self):
g = self.frame.groupby("A")
r = g.rolling(window=4)
for f in ["corr", "cov"]:
result = getattr(r, f)(self.frame)
def func(x):
return getattr(x.rolling(4), f)(self.frame)
expected = g.apply(func)
tm.assert_frame_equal(result, expected)
result = getattr(r.B, f)(pairwise=True)
def func(x):
return getattr(x.B.rolling(4), f)(pairwise=True)
expected = g.apply(func)
tm.assert_series_equal(result, expected)
def test_rolling_apply(self, raw):
g = self.frame.groupby("A")
r = g.rolling(window=4)
# reduction
result = r.apply(lambda x: x.sum(), raw=raw)
expected = g.apply(lambda x: x.rolling(4).apply(lambda y: y.sum(), raw=raw))
tm.assert_frame_equal(result, expected)
def test_rolling_apply_mutability(self):
# GH 14013
df = pd.DataFrame({"A": ["foo"] * 3 + ["bar"] * 3, "B": [1] * 6})
g = df.groupby("A")
mi = pd.MultiIndex.from_tuples(
[("bar", 3), ("bar", 4), ("bar", 5), ("foo", 0), ("foo", 1), ("foo", 2)]
)
mi.names = ["A", None]
# Grouped column should not be a part of the output
expected = pd.DataFrame([np.nan, 2.0, 2.0] * 2, columns=["B"], index=mi)
result = g.rolling(window=2).sum()
tm.assert_frame_equal(result, expected)
# Call an arbitrary function on the groupby
g.sum()
# Make sure nothing has been mutated
result = g.rolling(window=2).sum()
tm.assert_frame_equal(result, expected)
def test_expanding(self):
g = self.frame.groupby("A")
r = g.expanding()
for f in ["sum", "mean", "min", "max", "count", "kurt", "skew"]:
result = getattr(r, f)()
expected = g.apply(lambda x: getattr(x.expanding(), f)())
tm.assert_frame_equal(result, expected)
for f in ["std", "var"]:
result = getattr(r, f)(ddof=0)
expected = g.apply(lambda x: getattr(x.expanding(), f)(ddof=0))
tm.assert_frame_equal(result, expected)
result = r.quantile(0.5)
expected = g.apply(lambda x: x.expanding().quantile(0.5))
tm.assert_frame_equal(result, expected)
def test_expanding_corr_cov(self):
g = self.frame.groupby("A")
r = g.expanding()
for f in ["corr", "cov"]:
result = getattr(r, f)(self.frame)
def func(x):
return getattr(x.expanding(), f)(self.frame)
expected = g.apply(func)
tm.assert_frame_equal(result, expected)
result = getattr(r.B, f)(pairwise=True)
def func(x):
return getattr(x.B.expanding(), f)(pairwise=True)
expected = g.apply(func)
tm.assert_series_equal(result, expected)
def test_expanding_apply(self, raw):
g = self.frame.groupby("A")
r = g.expanding()
# reduction
result = r.apply(lambda x: x.sum(), raw=raw)
expected = g.apply(lambda x: x.expanding().apply(lambda y: y.sum(), raw=raw))
tm.assert_frame_equal(result, expected)