Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aaronreidsmith / pandas   python

Repository URL to install this package:

Version: 0.25.3 

/ tests / groupby / test_whitelist.py

"""
test methods relating to generic function evaluation
the so-called white/black lists
"""

from string import ascii_lowercase

import numpy as np
import pytest

from pandas import DataFrame, Index, MultiIndex, Series, date_range
from pandas.util import testing as tm

AGG_FUNCTIONS = [
    "sum",
    "prod",
    "min",
    "max",
    "median",
    "mean",
    "skew",
    "mad",
    "std",
    "var",
    "sem",
]
AGG_FUNCTIONS_WITH_SKIPNA = ["skew", "mad"]

df_whitelist = [
    "quantile",
    "fillna",
    "mad",
    "take",
    "idxmax",
    "idxmin",
    "tshift",
    "skew",
    "plot",
    "hist",
    "dtypes",
    "corrwith",
    "corr",
    "cov",
    "diff",
]


@pytest.fixture(params=df_whitelist)
def df_whitelist_fixture(request):
    return request.param


s_whitelist = [
    "quantile",
    "fillna",
    "mad",
    "take",
    "idxmax",
    "idxmin",
    "tshift",
    "skew",
    "plot",
    "hist",
    "dtype",
    "corr",
    "cov",
    "diff",
    "unique",
    "nlargest",
    "nsmallest",
    "is_monotonic_increasing",
    "is_monotonic_decreasing",
]


@pytest.fixture(params=s_whitelist)
def s_whitelist_fixture(request):
    return request.param


@pytest.fixture
def mframe():
    index = MultiIndex(
        levels=[["foo", "bar", "baz", "qux"], ["one", "two", "three"]],
        codes=[[0, 0, 0, 1, 1, 2, 2, 3, 3, 3], [0, 1, 2, 0, 1, 1, 2, 0, 1, 2]],
        names=["first", "second"],
    )
    return DataFrame(np.random.randn(10, 3), index=index, columns=["A", "B", "C"])


@pytest.fixture
def df():
    return DataFrame(
        {
            "A": ["foo", "bar", "foo", "bar", "foo", "bar", "foo", "foo"],
            "B": ["one", "one", "two", "three", "two", "two", "one", "three"],
            "C": np.random.randn(8),
            "D": np.random.randn(8),
        }
    )


@pytest.fixture
def df_letters():
    letters = np.array(list(ascii_lowercase))
    N = 10
    random_letters = letters.take(np.random.randint(0, 26, N))
    df = DataFrame(
        {
            "floats": N / 10 * Series(np.random.random(N)),
            "letters": Series(random_letters),
        }
    )
    return df


@pytest.mark.parametrize("whitelist", [df_whitelist, s_whitelist])
def test_groupby_whitelist(df_letters, whitelist):
    df = df_letters
    if whitelist == df_whitelist:
        # dataframe
        obj = df_letters
    else:
        obj = df_letters["floats"]

    gb = obj.groupby(df.letters)

    assert set(whitelist) == set(gb._apply_whitelist)


def check_whitelist(obj, df, m):
    # check the obj for a particular whitelist m

    gb = obj.groupby(df.letters)

    f = getattr(type(gb), m)

    # name
    try:
        n = f.__name__
    except AttributeError:
        return
    assert n == m

    # qualname
    try:
        n = f.__qualname__
    except AttributeError:
        return
    assert n.endswith(m)


def test_groupby_series_whitelist(df_letters, s_whitelist_fixture):
    m = s_whitelist_fixture
    df = df_letters
    check_whitelist(df.letters, df, m)


def test_groupby_frame_whitelist(df_letters, df_whitelist_fixture):
    m = df_whitelist_fixture
    df = df_letters
    check_whitelist(df, df, m)


@pytest.fixture
def raw_frame():
    index = MultiIndex(
        levels=[["foo", "bar", "baz", "qux"], ["one", "two", "three"]],
        codes=[[0, 0, 0, 1, 1, 2, 2, 3, 3, 3], [0, 1, 2, 0, 1, 1, 2, 0, 1, 2]],
        names=["first", "second"],
    )
    raw_frame = DataFrame(
        np.random.randn(10, 3), index=index, columns=Index(["A", "B", "C"], name="exp")
    )
    raw_frame.iloc[1, [1, 2]] = np.nan
    raw_frame.iloc[7, [0, 1]] = np.nan
    return raw_frame


@pytest.mark.parametrize("op", AGG_FUNCTIONS)
@pytest.mark.parametrize("level", [0, 1])
@pytest.mark.parametrize("axis", [0, 1])
@pytest.mark.parametrize("skipna", [True, False])
@pytest.mark.parametrize("sort", [True, False])
def test_regression_whitelist_methods(raw_frame, op, level, axis, skipna, sort):
    # GH6944
    # GH 17537
    # explicitly test the whitelist methods

    if axis == 0:
        frame = raw_frame
    else:
        frame = raw_frame.T

    if op in AGG_FUNCTIONS_WITH_SKIPNA:
        grouped = frame.groupby(level=level, axis=axis, sort=sort)
        result = getattr(grouped, op)(skipna=skipna)
        expected = getattr(frame, op)(level=level, axis=axis, skipna=skipna)
        if sort:
            expected = expected.sort_index(axis=axis, level=level)
        tm.assert_frame_equal(result, expected)
    else:
        grouped = frame.groupby(level=level, axis=axis, sort=sort)
        result = getattr(grouped, op)()
        expected = getattr(frame, op)(level=level, axis=axis)
        if sort:
            expected = expected.sort_index(axis=axis, level=level)
        tm.assert_frame_equal(result, expected)


def test_groupby_blacklist(df_letters):
    df = df_letters
    s = df_letters.floats

    blacklist = [
        "eval",
        "query",
        "abs",
        "where",
        "mask",
        "align",
        "groupby",
        "clip",
        "astype",
        "at",
        "combine",
        "consolidate",
        "convert_objects",
    ]
    to_methods = [method for method in dir(df) if method.startswith("to_")]

    blacklist.extend(to_methods)

    # e.g., to_csv
    defined_but_not_allowed = "(?:^Cannot.+{0!r}.+{1!r}.+try using the 'apply' method$)"

    # e.g., query, eval
    not_defined = "(?:^{1!r} object has no attribute {0!r}$)"
    fmt = defined_but_not_allowed + "|" + not_defined
    for bl in blacklist:
        for obj in (df, s):
            gb = obj.groupby(df.letters)
            msg = fmt.format(bl, type(gb).__name__)
            with pytest.raises(AttributeError, match=msg):
                getattr(gb, bl)


def test_tab_completion(mframe):
    grp = mframe.groupby(level="second")
    results = {v for v in dir(grp) if not v.startswith("_")}
    expected = {
        "A",
        "B",
        "C",
        "agg",
        "aggregate",
        "apply",
        "boxplot",
        "filter",
        "first",
        "get_group",
        "groups",
        "hist",
        "indices",
        "last",
        "max",
        "mean",
        "median",
        "min",
        "ngroups",
        "nth",
        "ohlc",
        "plot",
        "prod",
        "size",
        "std",
        "sum",
        "transform",
        "var",
        "sem",
        "count",
        "nunique",
        "head",
        "describe",
        "cummax",
        "quantile",
        "rank",
        "cumprod",
        "tail",
        "resample",
        "cummin",
        "fillna",
        "cumsum",
        "cumcount",
        "ngroup",
        "all",
        "shift",
        "skew",
        "take",
        "tshift",
        "pct_change",
        "any",
        "mad",
        "corr",
        "corrwith",
        "cov",
        "dtypes",
        "ndim",
        "diff",
        "idxmax",
        "idxmin",
        "ffill",
        "bfill",
        "pad",
        "backfill",
        "rolling",
        "expanding",
        "pipe",
    }
    assert results == expected


def test_groupby_function_rename(mframe):
    grp = mframe.groupby(level="second")
    for name in ["sum", "prod", "min", "max", "first", "last"]:
        f = getattr(grp, name)
        assert f.__name__ == name


def test_groupby_selection_with_methods(df):
    # some methods which require DatetimeIndex
    rng = date_range("2014", periods=len(df))
    df.index = rng

    g = df.groupby(["A"])[["C"]]
    g_exp = df[["C"]].groupby(df["A"])
    # TODO check groupby with > 1 col ?

    # methods which are called as .foo()
    methods = [
        "count",
        "corr",
        "cummax",
        "cummin",
        "cumprod",
        "describe",
        "rank",
        "quantile",
        "diff",
        "shift",
        "all",
        "any",
        "idxmin",
        "idxmax",
        "ffill",
        "bfill",
        "pct_change",
        "tshift",
    ]

    for m in methods:
        res = getattr(g, m)()
        exp = getattr(g_exp, m)()

        # should always be frames!
        tm.assert_frame_equal(res, exp)

    # methods which aren't just .foo()
    tm.assert_frame_equal(g.fillna(0), g_exp.fillna(0))
    tm.assert_frame_equal(g.dtypes, g_exp.dtypes)
    tm.assert_frame_equal(g.apply(lambda x: x.sum()), g_exp.apply(lambda x: x.sum()))

    tm.assert_frame_equal(g.resample("D").mean(), g_exp.resample("D").mean())
    tm.assert_frame_equal(g.resample("D").ohlc(), g_exp.resample("D").ohlc())

    tm.assert_frame_equal(
        g.filter(lambda x: len(x) == 3), g_exp.filter(lambda x: len(x) == 3)
    )