"""
test methods relating to generic function evaluation
the so-called white/black lists
"""
from string import ascii_lowercase
import numpy as np
import pytest
from pandas import DataFrame, Index, MultiIndex, Series, date_range
from pandas.util import testing as tm
AGG_FUNCTIONS = [
"sum",
"prod",
"min",
"max",
"median",
"mean",
"skew",
"mad",
"std",
"var",
"sem",
]
AGG_FUNCTIONS_WITH_SKIPNA = ["skew", "mad"]
df_whitelist = [
"quantile",
"fillna",
"mad",
"take",
"idxmax",
"idxmin",
"tshift",
"skew",
"plot",
"hist",
"dtypes",
"corrwith",
"corr",
"cov",
"diff",
]
@pytest.fixture(params=df_whitelist)
def df_whitelist_fixture(request):
return request.param
s_whitelist = [
"quantile",
"fillna",
"mad",
"take",
"idxmax",
"idxmin",
"tshift",
"skew",
"plot",
"hist",
"dtype",
"corr",
"cov",
"diff",
"unique",
"nlargest",
"nsmallest",
"is_monotonic_increasing",
"is_monotonic_decreasing",
]
@pytest.fixture(params=s_whitelist)
def s_whitelist_fixture(request):
return request.param
@pytest.fixture
def mframe():
index = MultiIndex(
levels=[["foo", "bar", "baz", "qux"], ["one", "two", "three"]],
codes=[[0, 0, 0, 1, 1, 2, 2, 3, 3, 3], [0, 1, 2, 0, 1, 1, 2, 0, 1, 2]],
names=["first", "second"],
)
return DataFrame(np.random.randn(10, 3), index=index, columns=["A", "B", "C"])
@pytest.fixture
def df():
return DataFrame(
{
"A": ["foo", "bar", "foo", "bar", "foo", "bar", "foo", "foo"],
"B": ["one", "one", "two", "three", "two", "two", "one", "three"],
"C": np.random.randn(8),
"D": np.random.randn(8),
}
)
@pytest.fixture
def df_letters():
letters = np.array(list(ascii_lowercase))
N = 10
random_letters = letters.take(np.random.randint(0, 26, N))
df = DataFrame(
{
"floats": N / 10 * Series(np.random.random(N)),
"letters": Series(random_letters),
}
)
return df
@pytest.mark.parametrize("whitelist", [df_whitelist, s_whitelist])
def test_groupby_whitelist(df_letters, whitelist):
df = df_letters
if whitelist == df_whitelist:
# dataframe
obj = df_letters
else:
obj = df_letters["floats"]
gb = obj.groupby(df.letters)
assert set(whitelist) == set(gb._apply_whitelist)
def check_whitelist(obj, df, m):
# check the obj for a particular whitelist m
gb = obj.groupby(df.letters)
f = getattr(type(gb), m)
# name
try:
n = f.__name__
except AttributeError:
return
assert n == m
# qualname
try:
n = f.__qualname__
except AttributeError:
return
assert n.endswith(m)
def test_groupby_series_whitelist(df_letters, s_whitelist_fixture):
m = s_whitelist_fixture
df = df_letters
check_whitelist(df.letters, df, m)
def test_groupby_frame_whitelist(df_letters, df_whitelist_fixture):
m = df_whitelist_fixture
df = df_letters
check_whitelist(df, df, m)
@pytest.fixture
def raw_frame():
index = MultiIndex(
levels=[["foo", "bar", "baz", "qux"], ["one", "two", "three"]],
codes=[[0, 0, 0, 1, 1, 2, 2, 3, 3, 3], [0, 1, 2, 0, 1, 1, 2, 0, 1, 2]],
names=["first", "second"],
)
raw_frame = DataFrame(
np.random.randn(10, 3), index=index, columns=Index(["A", "B", "C"], name="exp")
)
raw_frame.iloc[1, [1, 2]] = np.nan
raw_frame.iloc[7, [0, 1]] = np.nan
return raw_frame
@pytest.mark.parametrize("op", AGG_FUNCTIONS)
@pytest.mark.parametrize("level", [0, 1])
@pytest.mark.parametrize("axis", [0, 1])
@pytest.mark.parametrize("skipna", [True, False])
@pytest.mark.parametrize("sort", [True, False])
def test_regression_whitelist_methods(raw_frame, op, level, axis, skipna, sort):
# GH6944
# GH 17537
# explicitly test the whitelist methods
if axis == 0:
frame = raw_frame
else:
frame = raw_frame.T
if op in AGG_FUNCTIONS_WITH_SKIPNA:
grouped = frame.groupby(level=level, axis=axis, sort=sort)
result = getattr(grouped, op)(skipna=skipna)
expected = getattr(frame, op)(level=level, axis=axis, skipna=skipna)
if sort:
expected = expected.sort_index(axis=axis, level=level)
tm.assert_frame_equal(result, expected)
else:
grouped = frame.groupby(level=level, axis=axis, sort=sort)
result = getattr(grouped, op)()
expected = getattr(frame, op)(level=level, axis=axis)
if sort:
expected = expected.sort_index(axis=axis, level=level)
tm.assert_frame_equal(result, expected)
def test_groupby_blacklist(df_letters):
df = df_letters
s = df_letters.floats
blacklist = [
"eval",
"query",
"abs",
"where",
"mask",
"align",
"groupby",
"clip",
"astype",
"at",
"combine",
"consolidate",
"convert_objects",
]
to_methods = [method for method in dir(df) if method.startswith("to_")]
blacklist.extend(to_methods)
# e.g., to_csv
defined_but_not_allowed = "(?:^Cannot.+{0!r}.+{1!r}.+try using the 'apply' method$)"
# e.g., query, eval
not_defined = "(?:^{1!r} object has no attribute {0!r}$)"
fmt = defined_but_not_allowed + "|" + not_defined
for bl in blacklist:
for obj in (df, s):
gb = obj.groupby(df.letters)
msg = fmt.format(bl, type(gb).__name__)
with pytest.raises(AttributeError, match=msg):
getattr(gb, bl)
def test_tab_completion(mframe):
grp = mframe.groupby(level="second")
results = {v for v in dir(grp) if not v.startswith("_")}
expected = {
"A",
"B",
"C",
"agg",
"aggregate",
"apply",
"boxplot",
"filter",
"first",
"get_group",
"groups",
"hist",
"indices",
"last",
"max",
"mean",
"median",
"min",
"ngroups",
"nth",
"ohlc",
"plot",
"prod",
"size",
"std",
"sum",
"transform",
"var",
"sem",
"count",
"nunique",
"head",
"describe",
"cummax",
"quantile",
"rank",
"cumprod",
"tail",
"resample",
"cummin",
"fillna",
"cumsum",
"cumcount",
"ngroup",
"all",
"shift",
"skew",
"take",
"tshift",
"pct_change",
"any",
"mad",
"corr",
"corrwith",
"cov",
"dtypes",
"ndim",
"diff",
"idxmax",
"idxmin",
"ffill",
"bfill",
"pad",
"backfill",
"rolling",
"expanding",
"pipe",
}
assert results == expected
def test_groupby_function_rename(mframe):
grp = mframe.groupby(level="second")
for name in ["sum", "prod", "min", "max", "first", "last"]:
f = getattr(grp, name)
assert f.__name__ == name
def test_groupby_selection_with_methods(df):
# some methods which require DatetimeIndex
rng = date_range("2014", periods=len(df))
df.index = rng
g = df.groupby(["A"])[["C"]]
g_exp = df[["C"]].groupby(df["A"])
# TODO check groupby with > 1 col ?
# methods which are called as .foo()
methods = [
"count",
"corr",
"cummax",
"cummin",
"cumprod",
"describe",
"rank",
"quantile",
"diff",
"shift",
"all",
"any",
"idxmin",
"idxmax",
"ffill",
"bfill",
"pct_change",
"tshift",
]
for m in methods:
res = getattr(g, m)()
exp = getattr(g_exp, m)()
# should always be frames!
tm.assert_frame_equal(res, exp)
# methods which aren't just .foo()
tm.assert_frame_equal(g.fillna(0), g_exp.fillna(0))
tm.assert_frame_equal(g.dtypes, g_exp.dtypes)
tm.assert_frame_equal(g.apply(lambda x: x.sum()), g_exp.apply(lambda x: x.sum()))
tm.assert_frame_equal(g.resample("D").mean(), g_exp.resample("D").mean())
tm.assert_frame_equal(g.resample("D").ohlc(), g_exp.resample("D").ohlc())
tm.assert_frame_equal(
g.filter(lambda x: len(x) == 3), g_exp.filter(lambda x: len(x) == 3)
)