Repository URL to install this package:
|
Version:
2022.10.0 ▾
|
import collections
import contextlib
import operator
import pickle
import warnings
import numpy as np
import pandas as pd
import pytest
import dask
import dask.dataframe as dd
from dask.dataframe import _compat
from dask.dataframe._compat import (
PANDAS_GT_110,
PANDAS_GT_130,
PANDAS_GT_150,
check_numeric_only_deprecation,
tm,
)
from dask.dataframe.backends import grouper_dispatch
from dask.dataframe.utils import assert_dask_graph, assert_eq, assert_max_deps
from dask.utils import M
from dask.utils_test import hlg_layer
CHECK_FREQ = {}
if dd._compat.PANDAS_GT_110:
CHECK_FREQ["check_freq"] = False
AGG_FUNCS = [
"sum",
"mean",
"median",
"min",
"max",
"count",
"size",
"std",
"var",
"cov",
"corr",
"nunique",
"first",
"last",
"prod",
]
@pytest.fixture(params=AGG_FUNCS)
def agg_func(request):
"""
Aggregations supported for groups
"""
return request.param
# Wrapper fixture for shuffle_method to auto-apply it to all the tests in this module,
# as we don't want to auto-apply the fixture repo-wide.
@pytest.fixture(autouse=True)
def auto_shuffle_method(shuffle_method):
yield
@pytest.mark.xfail(reason="uncertain how to handle. See issue #3481.")
def test_groupby_internal_repr_xfail():
pdf = pd.DataFrame({"x": [0, 1, 2, 3, 4, 6, 7, 8, 9, 10], "y": list("abcbabbcda")})
ddf = dd.from_pandas(pdf, 3)
gp = pdf.groupby("y")["x"]
dp = ddf.groupby("y")["x"]
assert isinstance(dp.obj, dd.Series)
assert_eq(dp.obj, gp.obj)
gp = pdf.groupby(pdf.y)["x"]
dp = ddf.groupby(ddf.y)["x"]
assert isinstance(dp.obj, dd.Series)
def test_groupby_internal_repr():
pdf = pd.DataFrame({"x": [0, 1, 2, 3, 4, 6, 7, 8, 9, 10], "y": list("abcbabbcda")})
ddf = dd.from_pandas(pdf, 3)
gp = pdf.groupby("y")
dp = ddf.groupby("y")
assert isinstance(dp, dd.groupby.DataFrameGroupBy)
assert isinstance(dp._meta, pd.core.groupby.DataFrameGroupBy)
assert isinstance(dp.obj, dd.DataFrame)
assert_eq(dp.obj, gp.obj)
gp = pdf.groupby("y")["x"]
dp = ddf.groupby("y")["x"]
assert isinstance(dp, dd.groupby.SeriesGroupBy)
assert isinstance(dp._meta, pd.core.groupby.SeriesGroupBy)
gp = pdf.groupby("y")[["x"]]
dp = ddf.groupby("y")[["x"]]
assert isinstance(dp, dd.groupby.DataFrameGroupBy)
assert isinstance(dp._meta, pd.core.groupby.DataFrameGroupBy)
# slicing should not affect to internal
assert isinstance(dp.obj, dd.DataFrame)
assert_eq(dp.obj, gp.obj)
gp = pdf.groupby(pdf.y)["x"]
dp = ddf.groupby(ddf.y)["x"]
assert isinstance(dp, dd.groupby.SeriesGroupBy)
assert isinstance(dp._meta, pd.core.groupby.SeriesGroupBy)
gp = pdf.groupby(pdf.y)[["x"]]
dp = ddf.groupby(ddf.y)[["x"]]
assert isinstance(dp, dd.groupby.DataFrameGroupBy)
assert isinstance(dp._meta, pd.core.groupby.DataFrameGroupBy)
# slicing should not affect to internal
assert isinstance(dp.obj, dd.DataFrame)
assert_eq(dp.obj, gp.obj)
def test_groupby_error():
pdf = pd.DataFrame({"x": [0, 1, 2, 3, 4, 6, 7, 8, 9, 10], "y": list("abcbabbcda")})
ddf = dd.from_pandas(pdf, 3)
with pytest.raises(KeyError):
ddf.groupby("A")
with pytest.raises(KeyError):
ddf.groupby(["x", "A"])
dp = ddf.groupby("y")
msg = "Column not found: "
with pytest.raises(KeyError) as err:
dp["A"]
assert msg in str(err.value)
msg = "Columns not found: "
with pytest.raises(KeyError) as err:
dp[["x", "A"]]
assert msg in str(err.value)
msg = (
"DataFrameGroupBy does not allow compute method."
"Please chain it with an aggregation method (like ``.mean()``) or get a "
"specific group using ``.get_group()`` before calling ``compute()``"
)
with pytest.raises(NotImplementedError) as err:
dp.compute()
assert msg in str(err.value)
def test_full_groupby():
df = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
)
ddf = dd.from_pandas(df, npartitions=3)
pytest.raises(KeyError, lambda: ddf.groupby("does_not_exist"))
pytest.raises(AttributeError, lambda: ddf.groupby("a").does_not_exist)
assert "b" in dir(ddf.groupby("a"))
def func(df):
return df.assign(b=df.b - df.b.mean())
expected = df.groupby("a").apply(func)
with pytest.warns(UserWarning, match="`meta` is not specified"):
assert ddf.groupby("a").apply(func)._name.startswith("func")
assert_eq(expected, ddf.groupby("a").apply(func))
def test_full_groupby_apply_multiarg():
df = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
)
ddf = dd.from_pandas(df, npartitions=3)
def func(df, c, d=3):
return df.assign(b=df.b - df.b.mean() + c * d)
c = df.a.sum()
d = df.b.mean()
c_scalar = ddf.a.sum()
d_scalar = ddf.b.mean()
c_delayed = dask.delayed(lambda: c)()
d_delayed = dask.delayed(lambda: d)()
with pytest.warns(UserWarning, match="`meta` is not specified"):
assert_eq(
df.groupby("a").apply(func, c, d=d),
ddf.groupby("a").apply(func, c, d=d_scalar),
)
assert_eq(df.groupby("a").apply(func, c), ddf.groupby("a").apply(func, c))
assert_eq(
df.groupby("a").apply(func, c, d=d), ddf.groupby("a").apply(func, c, d=d)
)
assert_eq(
df.groupby("a").apply(func, c),
ddf.groupby("a").apply(func, c_scalar),
check_dtype=False,
)
meta = df.groupby("a").apply(func, c)
assert_eq(
df.groupby("a").apply(func, c),
ddf.groupby("a").apply(func, c_scalar, meta=meta),
)
assert_eq(
df.groupby("a").apply(func, c, d=d),
ddf.groupby("a").apply(func, c, d=d_scalar, meta=meta),
)
# Delayed arguments work, but only if metadata is provided
with pytest.raises(ValueError) as exc:
ddf.groupby("a").apply(func, c, d=d_delayed)
assert "dask.delayed" in str(exc.value) and "meta" in str(exc.value)
with pytest.raises(ValueError) as exc:
ddf.groupby("a").apply(func, c_delayed, d=d)
assert "dask.delayed" in str(exc.value) and "meta" in str(exc.value)
assert_eq(
df.groupby("a").apply(func, c),
ddf.groupby("a").apply(func, c_delayed, meta=meta),
)
assert_eq(
df.groupby("a").apply(func, c, d=d),
ddf.groupby("a").apply(func, c, d=d_delayed, meta=meta),
)
@pytest.mark.parametrize(
"grouper",
[
lambda df: ["a"],
lambda df: ["a", "b"],
lambda df: df["a"],
lambda df: [df["a"], df["b"]],
pytest.param(
lambda df: [df["a"] > 2, df["b"] > 1],
marks=pytest.mark.xfail(reason="not yet supported"),
),
],
)
@pytest.mark.parametrize("reverse", [True, False])
def test_full_groupby_multilevel(grouper, reverse):
index = [0, 1, 3, 5, 6, 8, 9, 9, 9]
if reverse:
index = index[::-1]
df = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5, 6, 7, 8, 9],
"d": [1, 2, 3, 4, 5, 6, 7, 8, 9],
"b": [4, 5, 6, 3, 2, 1, 0, 0, 0],
},
index=index,
)
ddf = dd.from_pandas(df, npartitions=3)
def func(df):
return df.assign(b=df.b - df.b.mean())
with pytest.warns(UserWarning, match="`meta` is not specified"):
assert_eq(
df.groupby(grouper(df)).apply(func), ddf.groupby(grouper(ddf)).apply(func)
)
def test_groupby_dir():
df = pd.DataFrame({"a": range(10), "b c d e": range(10)})
ddf = dd.from_pandas(df, npartitions=2)
g = ddf.groupby("a")
assert "a" in dir(g)
assert "b c d e" not in dir(g)
@pytest.mark.parametrize("scheduler", ["sync", "threads"])
def test_groupby_on_index(scheduler):
pdf = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
)
ddf = dd.from_pandas(pdf, npartitions=3)
ddf2 = ddf.set_index("a")
pdf2 = pdf.set_index("a")
assert_eq(ddf.groupby("a").b.mean(), ddf2.groupby(ddf2.index).b.mean())
def func(df):
return df.assign(b=df.b - df.b.mean())
def func2(df):
return df[["b"]] - df[["b"]].mean()
def func3(df):
return df.mean()
with dask.config.set(scheduler=scheduler):
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
assert_eq(ddf.groupby("a").apply(func), pdf.groupby("a").apply(func))
assert_eq(
ddf.groupby("a").apply(func).set_index("a"),
pdf.groupby("a").apply(func).set_index("a"),
)
assert_eq(
pdf2.groupby(pdf2.index).apply(func2),
ddf2.groupby(ddf2.index).apply(func2),
)
assert_eq(
ddf2.b.groupby("a").apply(func3), pdf2.b.groupby("a").apply(func3)
)
assert_eq(
ddf2.b.groupby(ddf2.index).apply(func3),
pdf2.b.groupby(pdf2.index).apply(func3),
)
@pytest.mark.parametrize(
"grouper",
[
lambda df: df.groupby("a")["b"],
lambda df: df.groupby(["a", "b"]),
lambda df: df.groupby(["a", "b"])["c"],
lambda df: df.groupby(df["a"])[["b", "c"]],
lambda df: df.groupby("a")[["b", "c"]],
lambda df: df.groupby("a")[["b"]],
lambda df: df.groupby(["a", "b", "c"]),
],
)
def test_groupby_multilevel_getitem(grouper, agg_func):
# nunique is not implemented for DataFrameGroupBy
if agg_func == "nunique":
return
df = pd.DataFrame(
{
"a": [1, 2, 3, 1, 2, 3],
"b": [1, 2, 1, 4, 2, 1],
"c": [1, 3, 2, 1, 1, 2],
"d": [1, 2, 1, 1, 2, 2],
}
)
ddf = dd.from_pandas(df, 2)
dask_group = grouper(ddf)
pandas_group = grouper(df)
# covariance/correlation only works with N+1 columns
if isinstance(pandas_group, pd.core.groupby.SeriesGroupBy) and agg_func in (
"cov",
"corr",
):
return
dask_agg = getattr(dask_group, agg_func)
pandas_agg = getattr(pandas_group, agg_func)
assert isinstance(dask_group, dd.groupby._GroupBy)
assert isinstance(pandas_group, pd.core.groupby.GroupBy)
if agg_func == "mean":
assert_eq(dask_agg(), pandas_agg().astype(float))
else:
a = dask_agg()
with warnings.catch_warnings():
# pandas does `.cov([[1], [1]])` which numpy warns on (all NaN).
# Pandas does strange things with exceptions in groupby.
warnings.simplefilter("ignore", RuntimeWarning)
b = pandas_agg()
assert_eq(a, b)
def test_groupby_multilevel_agg():
df = pd.DataFrame(
{
"a": [1, 2, 3, 1, 2, 3],
"b": [1, 2, 1, 4, 2, 1],
"c": [1, 3, 2, 1, 1, 2],
"d": [1, 2, 1, 1, 2, 2],
}
)
ddf = dd.from_pandas(df, 2)
sol = df.groupby(["a"]).mean()
res = ddf.groupby(["a"]).mean()
assert_eq(res, sol)
sol = df.groupby(["a", "c"]).mean()
res = ddf.groupby(["a", "c"]).mean()
assert_eq(res, sol)
sol = df.groupby([df["a"], df["c"]]).mean()
res = ddf.groupby([ddf["a"], ddf["c"]]).mean()
assert_eq(res, sol)
@pytest.mark.parametrize("categoricals", [True, False])
def test_groupby_get_group(categoricals):
dsk = {
("x", 0): pd.DataFrame({"a": [1, 2, 6], "b": [4, 2, 7]}, index=[0, 1, 3]),
("x", 1): pd.DataFrame({"a": [4, 2, 6], "b": [3, 3, 1]}, index=[5, 6, 8]),
("x", 2): pd.DataFrame({"a": [4, 3, 7], "b": [1, 1, 3]}, index=[9, 9, 9]),
}
meta = dsk[("x", 0)]
d = dd.DataFrame(dsk, "x", meta, [0, 4, 9, 9])
full = d.compute()
by_keys = [("b", "b"), (d.b, full.b)]
if categoricals:
d = d.categorize(columns=["b"])
full = d.compute()
else:
by_keys.append((d.b + 1, full.b + 1))
for ddkey, pdkey in by_keys:
ddgrouped = d.groupby(ddkey)
pdgrouped = full.groupby(pdkey)
# DataFrame
assert_eq(ddgrouped.get_group(2), pdgrouped.get_group(2))
assert_eq(ddgrouped.get_group(3), pdgrouped.get_group(3))
# Series
assert_eq(ddgrouped.a.get_group(3), pdgrouped.a.get_group(3))
assert_eq(ddgrouped.a.get_group(2), pdgrouped.a.get_group(2))
def test_dataframe_groupby_nunique():
strings = list("aaabbccccdddeee")
data = np.random.randn(len(strings))
ps = pd.DataFrame(dict(strings=strings, data=data))
s = dd.from_pandas(ps, npartitions=3)
expected = ps.groupby("strings")["data"].nunique()
assert_eq(s.groupby("strings")["data"].nunique(), expected)
def test_dataframe_groupby_nunique_across_group_same_value():
strings = list("aaabbccccdddeee")
data = list(map(int, "123111223323412"))
ps = pd.DataFrame(dict(strings=strings, data=data))
s = dd.from_pandas(ps, npartitions=3)
expected = ps.groupby("strings")["data"].nunique()
assert_eq(s.groupby("strings")["data"].nunique(), expected)
def test_series_groupby_propagates_names():
df = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]})
ddf = dd.from_pandas(df, 2)
func = lambda df: df["y"].sum()
with pytest.warns(UserWarning): # meta inference
result = ddf.groupby("x").apply(func)
expected = df.groupby("x").apply(func)
assert_eq(result, expected)
@pytest.mark.parametrize("npartitions", (1, 2))
@pytest.mark.parametrize("func", ("cumsum", "cumprod", "cumcount"))
def test_series_groupby_cumfunc_with_named_index(npartitions, func):
df = pd.DataFrame(
{"x": [1, 2, 3, 4, 5, 6, 7], "y": [8, 9, 6, 2, 3, 5, 6]}
).set_index("x")
ddf = dd.from_pandas(df, npartitions)
assert ddf.npartitions == npartitions
expected = getattr(df["y"].groupby("x"), func)()
result = getattr(ddf["y"].groupby("x"), func)()
assert_eq(result, expected)
def test_series_groupby():
s = pd.Series([1, 2, 2, 1, 1])
pd_group = s.groupby(s)
ss = dd.from_pandas(s, npartitions=2)
dask_group = ss.groupby(ss)
pd_group2 = s.groupby(s + 1)
dask_group2 = ss.groupby(ss + 1)
for dg, pdg in [(dask_group, pd_group), (pd_group2, dask_group2)]:
assert_eq(dg.count(), pdg.count())
assert_eq(dg.sum(), pdg.sum())
assert_eq(dg.min(), pdg.min())
assert_eq(dg.max(), pdg.max())
assert_eq(dg.size(), pdg.size())
assert_eq(dg.first(), pdg.first())
assert_eq(dg.last(), pdg.last())
assert_eq(dg.prod(), pdg.prod())
def test_series_groupby_errors():
s = pd.Series([1, 2, 2, 1, 1])
ss = dd.from_pandas(s, npartitions=2)
msg = "No group keys passed!"
with pytest.raises(ValueError) as err:
s.groupby([]) # pandas
assert msg in str(err.value)
with pytest.raises(ValueError) as err:
ss.groupby([]) # dask should raise the same error
assert msg in str(err.value)
sss = dd.from_pandas(s, npartitions=5)
with pytest.raises(NotImplementedError):
ss.groupby(sss)
with pytest.raises(KeyError):
s.groupby("x") # pandas
with pytest.raises(KeyError):
ss.groupby("x") # dask should raise the same error
def test_groupby_index_array():
df = _compat.makeTimeDataFrame()
ddf = dd.from_pandas(df, npartitions=2)
# first select column, then group
assert_eq(
df.A.groupby(df.index.month).nunique(),
ddf.A.groupby(ddf.index.month).nunique(),
check_names=False,
)
# first group, then select column
assert_eq(
df.groupby(df.index.month).A.nunique(),
ddf.groupby(ddf.index.month).A.nunique(),
check_names=False,
)
def test_groupby_set_index():
df = _compat.makeTimeDataFrame()
ddf = dd.from_pandas(df, npartitions=2)
pytest.raises(TypeError, lambda: ddf.groupby(df.index.month, as_index=False))
@pytest.mark.parametrize("empty", [True, False])
def test_split_apply_combine_on_series(empty):
if empty:
pdf = pd.DataFrame({"a": [1.0], "b": [1.0]}, index=[0]).iloc[:0]
# There's a bug in pandas where df.groupby(...).var(ddof=0) results in
# no columns. Just skip these checks for now.
ddofs = []
else:
ddofs = [0, 1, 2]
pdf = pd.DataFrame(
{"a": [1, 2, 6, 4, 4, 6, 4, 3, 7], "b": [4, 2, 7, 3, 3, 1, 1, 1, 2]},
index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
)
ddf = dd.from_pandas(pdf, npartitions=3)
for ddkey, pdkey in [("b", "b"), (ddf.b, pdf.b), (ddf.b + 1, pdf.b + 1)]:
assert_eq(ddf.groupby(ddkey).a.min(), pdf.groupby(pdkey).a.min())
assert_eq(ddf.groupby(ddkey).a.max(), pdf.groupby(pdkey).a.max())
assert_eq(ddf.groupby(ddkey).a.count(), pdf.groupby(pdkey).a.count())
assert_eq(ddf.groupby(ddkey).a.mean(), pdf.groupby(pdkey).a.mean())
assert_eq(ddf.groupby(ddkey).a.nunique(), pdf.groupby(pdkey).a.nunique())
assert_eq(ddf.groupby(ddkey).a.size(), pdf.groupby(pdkey).a.size())
assert_eq(ddf.groupby(ddkey).a.first(), pdf.groupby(pdkey).a.first())
assert_eq(ddf.groupby(ddkey).a.last(), pdf.groupby(pdkey).a.last())
assert_eq(ddf.groupby(ddkey).a.tail(), pdf.groupby(pdkey).a.tail())
assert_eq(ddf.groupby(ddkey).a.head(), pdf.groupby(pdkey).a.head())
for ddof in ddofs:
assert_eq(ddf.groupby(ddkey).a.var(ddof), pdf.groupby(pdkey).a.var(ddof))
assert_eq(ddf.groupby(ddkey).a.std(ddof), pdf.groupby(pdkey).a.std(ddof))
assert_eq(ddf.groupby(ddkey).sum(), pdf.groupby(pdkey).sum())
assert_eq(ddf.groupby(ddkey).min(), pdf.groupby(pdkey).min())
assert_eq(ddf.groupby(ddkey).max(), pdf.groupby(pdkey).max())
assert_eq(ddf.groupby(ddkey).count(), pdf.groupby(pdkey).count())
assert_eq(ddf.groupby(ddkey).mean(), pdf.groupby(pdkey).mean())
assert_eq(ddf.groupby(ddkey).size(), pdf.groupby(pdkey).size())
assert_eq(ddf.groupby(ddkey).first(), pdf.groupby(pdkey).first())
assert_eq(ddf.groupby(ddkey).last(), pdf.groupby(pdkey).last())
assert_eq(ddf.groupby(ddkey).prod(), pdf.groupby(pdkey).prod())
for ddof in ddofs:
assert_eq(
ddf.groupby(ddkey).var(ddof),
pdf.groupby(pdkey).var(ddof),
check_dtype=False,
)
assert_eq(
ddf.groupby(ddkey).std(ddof),
pdf.groupby(pdkey).std(ddof),
check_dtype=False,
)
for ddkey, pdkey in [(ddf.b, pdf.b), (ddf.b + 1, pdf.b + 1)]:
assert_eq(
ddf.a.groupby(ddkey).sum(), pdf.a.groupby(pdkey).sum(), check_names=False
)
assert_eq(
ddf.a.groupby(ddkey).max(), pdf.a.groupby(pdkey).max(), check_names=False
)
assert_eq(
ddf.a.groupby(ddkey).count(),
pdf.a.groupby(pdkey).count(),
check_names=False,
)
assert_eq(
ddf.a.groupby(ddkey).mean(), pdf.a.groupby(pdkey).mean(), check_names=False
)
assert_eq(
ddf.a.groupby(ddkey).nunique(),
pdf.a.groupby(pdkey).nunique(),
check_names=False,
)
assert_eq(
ddf.a.groupby(ddkey).first(),
pdf.a.groupby(pdkey).first(),
check_names=False,
)
assert_eq(
ddf.a.groupby(ddkey).last(), pdf.a.groupby(pdkey).last(), check_names=False
)
assert_eq(
ddf.a.groupby(ddkey).prod(), pdf.a.groupby(pdkey).prod(), check_names=False
)
for ddof in ddofs:
assert_eq(ddf.a.groupby(ddkey).var(ddof), pdf.a.groupby(pdkey).var(ddof))
assert_eq(ddf.a.groupby(ddkey).std(ddof), pdf.a.groupby(pdkey).std(ddof))
for i in [0, 4, 7]:
assert_eq(ddf.groupby(ddf.b > i).a.sum(), pdf.groupby(pdf.b > i).a.sum())
assert_eq(ddf.groupby(ddf.b > i).a.min(), pdf.groupby(pdf.b > i).a.min())
assert_eq(ddf.groupby(ddf.b > i).a.max(), pdf.groupby(pdf.b > i).a.max())
assert_eq(ddf.groupby(ddf.b > i).a.count(), pdf.groupby(pdf.b > i).a.count())
assert_eq(ddf.groupby(ddf.b > i).a.mean(), pdf.groupby(pdf.b > i).a.mean())
assert_eq(
ddf.groupby(ddf.b > i).a.nunique(), pdf.groupby(pdf.b > i).a.nunique()
)
assert_eq(ddf.groupby(ddf.b > i).a.size(), pdf.groupby(pdf.b > i).a.size())
assert_eq(ddf.groupby(ddf.b > i).a.first(), pdf.groupby(pdf.b > i).a.first())
assert_eq(ddf.groupby(ddf.b > i).a.last(), pdf.groupby(pdf.b > i).a.last())
assert_eq(ddf.groupby(ddf.b > i).a.tail(), pdf.groupby(pdf.b > i).a.tail())
assert_eq(ddf.groupby(ddf.b > i).a.head(), pdf.groupby(pdf.b > i).a.head())
assert_eq(ddf.groupby(ddf.b > i).a.prod(), pdf.groupby(pdf.b > i).a.prod())
assert_eq(ddf.groupby(ddf.a > i).b.sum(), pdf.groupby(pdf.a > i).b.sum())
assert_eq(ddf.groupby(ddf.a > i).b.min(), pdf.groupby(pdf.a > i).b.min())
assert_eq(ddf.groupby(ddf.a > i).b.max(), pdf.groupby(pdf.a > i).b.max())
assert_eq(ddf.groupby(ddf.a > i).b.count(), pdf.groupby(pdf.a > i).b.count())
assert_eq(ddf.groupby(ddf.a > i).b.mean(), pdf.groupby(pdf.a > i).b.mean())
assert_eq(
ddf.groupby(ddf.a > i).b.nunique(), pdf.groupby(pdf.a > i).b.nunique()
)
assert_eq(ddf.groupby(ddf.b > i).b.size(), pdf.groupby(pdf.b > i).b.size())
assert_eq(ddf.groupby(ddf.b > i).b.first(), pdf.groupby(pdf.b > i).b.first())
assert_eq(ddf.groupby(ddf.b > i).b.last(), pdf.groupby(pdf.b > i).b.last())
assert_eq(ddf.groupby(ddf.b > i).b.tail(), pdf.groupby(pdf.b > i).b.tail())
assert_eq(ddf.groupby(ddf.b > i).b.head(), pdf.groupby(pdf.b > i).b.head())
assert_eq(ddf.groupby(ddf.b > i).b.prod(), pdf.groupby(pdf.b > i).b.prod())
assert_eq(ddf.groupby(ddf.b > i).sum(), pdf.groupby(pdf.b > i).sum())
assert_eq(ddf.groupby(ddf.b > i).min(), pdf.groupby(pdf.b > i).min())
assert_eq(ddf.groupby(ddf.b > i).max(), pdf.groupby(pdf.b > i).max())
assert_eq(ddf.groupby(ddf.b > i).count(), pdf.groupby(pdf.b > i).count())
assert_eq(ddf.groupby(ddf.b > i).mean(), pdf.groupby(pdf.b > i).mean())
assert_eq(ddf.groupby(ddf.b > i).size(), pdf.groupby(pdf.b > i).size())
assert_eq(ddf.groupby(ddf.b > i).first(), pdf.groupby(pdf.b > i).first())
assert_eq(ddf.groupby(ddf.b > i).last(), pdf.groupby(pdf.b > i).last())
assert_eq(ddf.groupby(ddf.b > i).prod(), pdf.groupby(pdf.b > i).prod())
assert_eq(ddf.groupby(ddf.a > i).sum(), pdf.groupby(pdf.a > i).sum())
assert_eq(ddf.groupby(ddf.a > i).min(), pdf.groupby(pdf.a > i).min())
assert_eq(ddf.groupby(ddf.a > i).max(), pdf.groupby(pdf.a > i).max())
assert_eq(ddf.groupby(ddf.a > i).count(), pdf.groupby(pdf.a > i).count())
assert_eq(ddf.groupby(ddf.a > i).mean(), pdf.groupby(pdf.a > i).mean())
assert_eq(ddf.groupby(ddf.a > i).size(), pdf.groupby(pdf.a > i).size())
assert_eq(ddf.groupby(ddf.a > i).first(), pdf.groupby(pdf.a > i).first())
assert_eq(ddf.groupby(ddf.a > i).last(), pdf.groupby(pdf.a > i).last())
assert_eq(ddf.groupby(ddf.a > i).prod(), pdf.groupby(pdf.a > i).prod())
for ddof in ddofs:
assert_eq(
ddf.groupby(ddf.b > i).std(ddof), pdf.groupby(pdf.b > i).std(ddof)
)
for ddkey, pdkey in [
("a", "a"),
(ddf.a, pdf.a),
(ddf.a + 1, pdf.a + 1),
(ddf.a > 3, pdf.a > 3),
]:
assert_eq(ddf.groupby(ddkey).b.sum(), pdf.groupby(pdkey).b.sum())
assert_eq(ddf.groupby(ddkey).b.min(), pdf.groupby(pdkey).b.min())
assert_eq(ddf.groupby(ddkey).b.max(), pdf.groupby(pdkey).b.max())
assert_eq(ddf.groupby(ddkey).b.count(), pdf.groupby(pdkey).b.count())
assert_eq(ddf.groupby(ddkey).b.mean(), pdf.groupby(pdkey).b.mean())
assert_eq(ddf.groupby(ddkey).b.nunique(), pdf.groupby(pdkey).b.nunique())
assert_eq(ddf.groupby(ddkey).b.size(), pdf.groupby(pdkey).b.size())
assert_eq(ddf.groupby(ddkey).b.first(), pdf.groupby(pdkey).b.first())
assert_eq(ddf.groupby(ddkey).last(), pdf.groupby(pdkey).last())
assert_eq(ddf.groupby(ddkey).prod(), pdf.groupby(pdkey).prod())
assert_eq(ddf.groupby(ddkey).sum(), pdf.groupby(pdkey).sum())
assert_eq(ddf.groupby(ddkey).min(), pdf.groupby(pdkey).min())
assert_eq(ddf.groupby(ddkey).max(), pdf.groupby(pdkey).max())
assert_eq(ddf.groupby(ddkey).count(), pdf.groupby(pdkey).count())
assert_eq(ddf.groupby(ddkey).mean(), pdf.groupby(pdkey).mean().astype(float))
assert_eq(ddf.groupby(ddkey).size(), pdf.groupby(pdkey).size())
assert_eq(ddf.groupby(ddkey).first(), pdf.groupby(pdkey).first())
assert_eq(ddf.groupby(ddkey).last(), pdf.groupby(pdkey).last())
assert_eq(ddf.groupby(ddkey).prod(), pdf.groupby(pdkey).prod())
for ddof in ddofs:
assert_eq(ddf.groupby(ddkey).b.std(ddof), pdf.groupby(pdkey).b.std(ddof))
assert sorted(ddf.groupby("b").a.sum().dask) == sorted(
ddf.groupby("b").a.sum().dask
)
assert sorted(ddf.groupby(ddf.a > 3).b.mean().dask) == sorted(
ddf.groupby(ddf.a > 3).b.mean().dask
)
# test raises with incorrect key
pytest.raises(KeyError, lambda: ddf.groupby("x"))
pytest.raises(KeyError, lambda: ddf.groupby(["a", "x"]))
pytest.raises(KeyError, lambda: ddf.groupby("a")["x"])
with warnings.catch_warnings():
# pandas warns about using tuples before throwing the KeyError
warnings.simplefilter("ignore", FutureWarning)
pytest.raises(KeyError, lambda: ddf.groupby("a")["b", "x"])
pytest.raises(KeyError, lambda: ddf.groupby("a")[["b", "x"]])
# test graph node labels
assert_dask_graph(ddf.groupby("b").a.sum(), "series-groupby-sum")
assert_dask_graph(ddf.groupby("b").a.min(), "series-groupby-min")
assert_dask_graph(ddf.groupby("b").a.max(), "series-groupby-max")
assert_dask_graph(ddf.groupby("b").a.count(), "series-groupby-count")
assert_dask_graph(ddf.groupby("b").a.var(), "series-groupby-var")
assert_dask_graph(ddf.groupby("b").a.cov(), "series-groupby-cov")
assert_dask_graph(ddf.groupby("b").a.first(), "series-groupby-first")
assert_dask_graph(ddf.groupby("b").a.last(), "series-groupby-last")
assert_dask_graph(ddf.groupby("b").a.tail(), "series-groupby-tail")
assert_dask_graph(ddf.groupby("b").a.head(), "series-groupby-head")
assert_dask_graph(ddf.groupby("b").a.prod(), "series-groupby-prod")
# mean consists from sum and count operations
assert_dask_graph(ddf.groupby("b").a.mean(), "series-groupby-sum")
assert_dask_graph(ddf.groupby("b").a.mean(), "series-groupby-count")
assert_dask_graph(ddf.groupby("b").a.nunique(), "series-groupby-nunique")
assert_dask_graph(ddf.groupby("b").a.size(), "series-groupby-size")
assert_dask_graph(ddf.groupby("b").sum(), "dataframe-groupby-sum")
assert_dask_graph(ddf.groupby("b").min(), "dataframe-groupby-min")
assert_dask_graph(ddf.groupby("b").max(), "dataframe-groupby-max")
assert_dask_graph(ddf.groupby("b").count(), "dataframe-groupby-count")
assert_dask_graph(ddf.groupby("b").first(), "dataframe-groupby-first")
assert_dask_graph(ddf.groupby("b").last(), "dataframe-groupby-last")
assert_dask_graph(ddf.groupby("b").prod(), "dataframe-groupby-prod")
# mean consists from sum and count operations
assert_dask_graph(ddf.groupby("b").mean(), "dataframe-groupby-sum")
assert_dask_graph(ddf.groupby("b").mean(), "dataframe-groupby-count")
assert_dask_graph(ddf.groupby("b").size(), "dataframe-groupby-size")
@pytest.mark.parametrize("keyword", ["split_every", "split_out"])
def test_groupby_reduction_split(keyword):
pdf = pd.DataFrame(
{"a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 100, "b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 100}
)
ddf = dd.from_pandas(pdf, npartitions=15)
def call(g, m, **kwargs):
return getattr(g, m)(**kwargs)
# DataFrame
for m in AGG_FUNCS:
# nunique is not implemented for DataFrameGroupBy
# covariance/correlation is not a series aggregation
if m in ("nunique", "cov", "corr"):
continue
res = call(ddf.groupby("b", sort=False), m, **{keyword: 2})
sol = call(pdf.groupby("b"), m)
assert_eq(res, sol)
assert call(ddf.groupby("b"), m)._name != res._name
res = call(ddf.groupby("b", sort=False), "var", ddof=2, **{keyword: 2})
sol = call(pdf.groupby("b"), "var", ddof=2)
assert_eq(res, sol)
assert call(ddf.groupby("b"), "var", ddof=2)._name != res._name
# Series, post select
for m in AGG_FUNCS:
# covariance/correlation is not a series aggregation
if m in ("cov", "corr"):
continue
res = call(ddf.groupby("b", sort=False).a, m, **{keyword: 2})
sol = call(pdf.groupby("b").a, m)
assert_eq(res, sol)
assert call(ddf.groupby("b").a, m)._name != res._name
res = call(ddf.groupby("b", sort=False).a, "var", ddof=2, **{keyword: 2})
sol = call(pdf.groupby("b").a, "var", ddof=2)
assert_eq(res, sol)
assert call(ddf.groupby("b").a, "var", ddof=2)._name != res._name
# Series, pre select
for m in AGG_FUNCS:
# covariance/correlation is not a series aggregation
if m in ("cov", "corr"):
continue
res = call(ddf.a.groupby(ddf.b, sort=False), m, **{keyword: 2})
sol = call(pdf.a.groupby(pdf.b), m)
# There's a bug in pandas 0.18.0 with `pdf.a.groupby(pdf.b).count()`
# not forwarding the series name. Skip name checks here for now.
assert_eq(res, sol, check_names=False)
assert call(ddf.a.groupby(ddf.b), m)._name != res._name
res = call(ddf.a.groupby(ddf.b, sort=False), "var", ddof=2, **{keyword: 2})
sol = call(pdf.a.groupby(pdf.b), "var", ddof=2)
assert_eq(res, sol)
assert call(ddf.a.groupby(ddf.b), "var", ddof=2)._name != res._name
@pytest.mark.parametrize(
"grouped",
[
lambda df: df.groupby("A"),
lambda df: df.groupby(df["A"]),
lambda df: df.groupby(df["A"] + 1),
lambda df: df.groupby("A")["B"],
# SeriesGroupBy:
lambda df: df.groupby("A")["B"],
lambda df: df.groupby(df["A"])["B"],
lambda df: df.groupby(df["A"] + 1)["B"],
# Series.groupby():
lambda df: df.B.groupby(df["A"]),
lambda df: df.B.groupby(df["A"] + 1),
# DataFrameGroupBy with column slice:
lambda df: df.groupby("A")[["B", "C"]],
lambda df: df.groupby(df["A"])[["B", "C"]],
lambda df: df.groupby(df["A"] + 1)[["B", "C"]],
],
)
@pytest.mark.parametrize(
"func",
[
lambda grp: grp.apply(lambda x: x.sum()),
lambda grp: grp.transform(lambda x: x.sum()),
],
)
def test_apply_or_transform_shuffle(grouped, func):
pdf = pd.DataFrame(
{
"A": [1, 2, 3, 4] * 5,
"B": np.random.randn(20),
"C": np.random.randn(20),
"D": np.random.randn(20),
}
)
ddf = dd.from_pandas(pdf, 3)
with pytest.warns(UserWarning): # meta inference
assert_eq(func(grouped(pdf)), func(grouped(ddf)))
@pytest.mark.parametrize(
"grouper",
[
lambda df: "AA",
lambda df: ["AA", "AB"],
lambda df: df["AA"],
lambda df: [df["AA"], df["AB"]],
lambda df: df["AA"] + 1,
pytest.param(
lambda df: [df["AA"] + 1, df["AB"] + 1],
marks=pytest.mark.xfail("NotImplemented"),
),
],
)
@pytest.mark.parametrize(
"func",
[
lambda grouped: grouped.apply(lambda x: x.sum()),
lambda grouped: grouped.transform(lambda x: x.sum()),
],
)
def test_apply_or_transform_shuffle_multilevel(grouper, func):
pdf = pd.DataFrame(
{
"AB": [1, 2, 3, 4] * 5,
"AA": [1, 2, 3, 4] * 5,
"B": np.random.randn(20),
"C": np.random.randn(20),
"D": np.random.randn(20),
}
)
ddf = dd.from_pandas(pdf, 3)
with pytest.warns(UserWarning):
# DataFrameGroupBy
assert_eq(func(ddf.groupby(grouper(ddf))), func(pdf.groupby(grouper(pdf))))
# SeriesGroupBy
assert_eq(
func(ddf.groupby(grouper(ddf))["B"]), func(pdf.groupby(grouper(pdf))["B"])
)
# DataFrameGroupBy with column slice
assert_eq(
func(ddf.groupby(grouper(ddf))[["B", "C"]]),
func(pdf.groupby(grouper(pdf))[["B", "C"]]),
)
def test_numeric_column_names():
# df.groupby(0)[df.columns] fails if all columns are numbers (pandas bug)
# This ensures that we cast all column iterables to list beforehand.
df = pd.DataFrame({0: [0, 1, 0, 1], 1: [1, 2, 3, 4], 2: [0, 1, 0, 1]})
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(ddf.groupby(0).sum(), df.groupby(0).sum())
assert_eq(ddf.groupby([0, 2]).sum(), df.groupby([0, 2]).sum())
assert_eq(
ddf.groupby(0).apply(lambda x: x, meta={0: int, 1: int, 2: int}),
df.groupby(0).apply(lambda x: x),
)
def test_groupby_apply_tasks(shuffle_method):
if shuffle_method == "disk":
pytest.skip("Tasks-only shuffle test")
df = _compat.makeTimeDataFrame()
df["A"] = df.A // 0.1
df["B"] = df.B // 0.1
ddf = dd.from_pandas(df, npartitions=10)
for ind in [lambda x: "A", lambda x: x.A]:
a = df.groupby(ind(df)).apply(len)
with pytest.warns(UserWarning):
b = ddf.groupby(ind(ddf)).apply(len)
assert_eq(a, b.compute())
assert not any("partd" in k[0] for k in b.dask)
a = df.groupby(ind(df)).B.apply(len)
with pytest.warns(UserWarning):
b = ddf.groupby(ind(ddf)).B.apply(len)
assert_eq(a, b.compute())
assert not any("partd" in k[0] for k in b.dask)
def test_groupby_multiprocessing():
df = pd.DataFrame({"A": [1, 2, 3, 4, 5], "B": ["1", "1", "a", "a", "a"]})
ddf = dd.from_pandas(df, npartitions=3)
with dask.config.set(scheduler="processes"):
assert_eq(
ddf.groupby("B").apply(lambda x: x, meta={"A": int, "B": object}),
df.groupby("B").apply(lambda x: x),
)
def test_groupby_normalize_by():
full = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
)
d = dd.from_pandas(full, npartitions=3)
assert d.groupby("a").by == "a"
assert d.groupby(d["a"]).by == "a"
assert d.groupby(d["a"] > 2).by._name == (d["a"] > 2)._name
assert d.groupby(["a", "b"]).by == ["a", "b"]
assert d.groupby([d["a"], d["b"]]).by == ["a", "b"]
assert d.groupby([d["a"], "b"]).by == ["a", "b"]
def test_aggregate__single_element_groups(agg_func):
spec = agg_func
# nunique/cov is not supported in specs
if spec in ("nunique", "cov", "corr"):
return
pdf = pd.DataFrame(
{"a": [1, 1, 3, 3], "b": [4, 4, 16, 16], "c": [1, 1, 4, 4], "d": [1, 1, 3, 3]},
columns=["c", "b", "a", "d"],
)
ddf = dd.from_pandas(pdf, npartitions=3)
expected = pdf.groupby(["a", "d"]).agg(spec)
# NOTE: for std the result is not recast ot the original dtype
if spec in {"mean", "var"}:
expected = expected.astype(float)
shuffle = {"shuffle": "tasks"} if agg_func == "median" else {}
assert_eq(expected, ddf.groupby(["a", "d"]).agg(spec, **shuffle))
def test_aggregate_build_agg_args__reuse_of_intermediates():
"""Aggregate reuses intermediates. For example, with sum, count, and mean
the sums and counts are only calculated once across the graph and reused to
compute the mean.
"""
from dask.dataframe.groupby import _build_agg_args
no_mean_spec = [("foo", "sum", "input"), ("bar", "count", "input")]
with_mean_spec = [
("foo", "sum", "input"),
("bar", "count", "input"),
("baz", "mean", "input"),
]
no_mean_chunks, no_mean_aggs, no_mean_finalizers = _build_agg_args(no_mean_spec)
with_mean_chunks, with_mean_aggs, with_mean_finalizers = _build_agg_args(
with_mean_spec
)
assert len(no_mean_chunks) == len(with_mean_chunks)
assert len(no_mean_aggs) == len(with_mean_aggs)
assert len(no_mean_finalizers) == len(no_mean_spec)
assert len(with_mean_finalizers) == len(with_mean_spec)
def test_aggregate_dask():
dask_holder = collections.namedtuple("dask_holder", ["dask"])
get_agg_dask = lambda obj: dask_holder(
{
k: v
for (k, v) in obj.dask.items()
# Skip "chunk" tasks, because they include
# SubgraphCallable object with non-deterministic
# (uuid-based) function names
if (k[0].startswith("aggregate") and "-chunk-" not in k[0])
}
)
specs = [
{"b": {"c": "mean"}, "c": {"a": "max", "b": "min"}},
{"b": "mean", "c": ["min", "max"]},
[
"sum",
"mean",
"min",
"max",
"count",
"size",
],
[
"std",
"var",
"first",
"last",
"prod",
],
"sum",
"mean",
"min",
"max",
"count",
"std",
"var",
"first",
"last",
"prod"
# NOTE: the 'size' spec is special since it bypasses aggregate
# 'size'
]
pdf = pd.DataFrame(
{
"a": [1, 2, 3, 1, 1, 2, 4, 3, 7] * 100,
"b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 100,
"c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 100,
"d": [3, 2, 1, 3, 2, 1, 2, 6, 4] * 100,
},
columns=["c", "b", "a", "d"],
)
ddf = dd.from_pandas(pdf, npartitions=100)
for spec in specs:
result1 = ddf.groupby(["a", "b"]).agg(spec, split_every=2)
result2 = ddf.groupby(["a", "b"]).agg(spec, split_every=2)
agg_dask1 = get_agg_dask(result1)
agg_dask2 = get_agg_dask(result2)
# check that the number of partitions used is fixed by split_every
assert_max_deps(agg_dask1, 2)
assert_max_deps(agg_dask2, 2)
# Make sure dict-based aggregation specs result in an
# explicit `getitem` layer to improve column projection
if isinstance(spec, dict):
assert hlg_layer(result1.dask, "getitem")
# check for deterministic key names and values.
# Require pickle since "partial" concat functions
# used in tree-reduction cannot be compared
assert pickle.dumps(agg_dask1[0]) == pickle.dumps(agg_dask2[0])
# the length of the dask does not depend on the passed spec
for other_spec in specs:
# Note: List-based aggregation specs may result in
# an extra delayed layer. This is because a "long" list
# arg will be detected in `dask.array.core.normalize_arg`.
# Also, dict-based aggregation specs will result in
# an extra `getitem` layer (to improve column projection)
if (isinstance(spec, list) == isinstance(other_spec, list)) and (
isinstance(spec, dict) == isinstance(other_spec, dict)
):
other = ddf.groupby(["a", "b"]).agg(other_spec, split_every=2)
assert len(other.dask) == len(result1.dask)
assert len(other.dask) == len(result2.dask)
@pytest.mark.parametrize("split_every", [1, 8])
@pytest.mark.parametrize("split_out", [2, 32])
def test_shuffle_aggregate(shuffle_method, split_out, split_every):
pdf = pd.DataFrame(
{
"a": [1, 2, 3, 1, 1, 2, 4, 3, 7] * 100,
"b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 100,
"c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 100,
"d": [3, 2, 1, 3, 2, 1, 2, 6, 4] * 100,
},
columns=["c", "b", "a", "d"],
)
ddf = dd.from_pandas(pdf, npartitions=100)
spec = {"b": "mean", "c": ["min", "max"]}
result = ddf.groupby(["a", "b"], sort=False).agg(
spec, split_out=split_out, split_every=split_every, shuffle=shuffle_method
)
expect = pdf.groupby(["a", "b"]).agg(spec)
# Make sure "mean" dtype is consistent.
# Pandas<1.3 will return int instead of float
expect[("b", "mean")] = expect[("b", "mean")].astype(result[("b", "mean")].dtype)
assert_eq(expect, result)
@pytest.mark.parametrize("sort", [True, False])
def test_shuffle_aggregate_sort(shuffle_method, sort):
pdf = pd.DataFrame(
{
"a": [1, 2, 3, 1, 1, 2, 4, 3, 7] * 100,
"b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 100,
"c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 100,
"d": [3, 2, 1, 3, 2, 1, 2, 6, 4] * 100,
},
columns=["c", "b", "a", "d"],
)
ddf = dd.from_pandas(pdf, npartitions=100)
spec = {"b": "mean", "c": ["min", "max"]}
result = ddf.groupby("a", sort=sort).agg(spec, split_out=2, shuffle=shuffle_method)
expect = pdf.groupby("a", sort=sort).agg(spec)
# Make sure "mean" dtype is consistent.
# Pandas<1.3 will return int instead of float
expect[("b", "mean")] = expect[("b", "mean")].astype(result[("b", "mean")].dtype)
assert_eq(expect, result)
if sort:
# Cannot use sort=True with multiple groupby keys
with pytest.raises(NotImplementedError):
ddf.groupby(["a", "b"], sort=sort).agg(
spec, split_out=2, shuffle=shuffle_method
)
def test_shuffle_aggregate_defaults(shuffle_method):
pdf = pd.DataFrame(
{
"a": [1, 2, 3, 1, 1, 2, 4, 3, 7] * 100,
"b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 100,
"c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 100,
"d": [3, 2, 1, 3, 2, 1, 2, 6, 4] * 100,
},
columns=["c", "b", "a", "d"],
)
ddf = dd.from_pandas(pdf, npartitions=100)
spec = {"b": "mean", "c": ["min", "max"]}
# No shuffle layer when split_out = 1
dsk = ddf.groupby("a").agg(spec, split_out=1).dask
assert not any("shuffle" in l for l in dsk.layers)
# split_every=1 is invalid for tree reduction
with pytest.raises(ValueError):
ddf.groupby("a").agg(spec, split_out=1, split_every=1)
# If split_out > 1, default to shuffling.
dsk = ddf.groupby("a", sort=False).agg(spec, split_out=2, split_every=1).dask
assert any("shuffle" in l for l in dsk.layers)
@pytest.mark.parametrize("spec", [{"c": "median"}, {"b": np.median, "c": np.max}])
@pytest.mark.parametrize("keys", ["a", ["a", "d"]])
def test_aggregate_median(spec, keys, shuffle_method):
pdf = pd.DataFrame(
{
"a": [1, 2, 3, 1, 1, 2, 4, 3, 7] * 10,
"b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10,
"c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10,
"d": [3, 2, 1, 3, 2, 1, 2, 6, 4] * 10,
},
columns=["c", "b", "a", "d"],
)
ddf = dd.from_pandas(pdf, npartitions=10)
actual = ddf.groupby(keys).aggregate(spec, shuffle=shuffle_method)
expected = pdf.groupby(keys).aggregate(spec)
assert_eq(actual, expected)
with pytest.raises(ValueError, match="must use shuffl"):
ddf.groupby(keys).aggregate(spec, shuffle=False)
with pytest.raises(ValueError, match="must use shuffl"):
ddf.groupby(keys).median(shuffle=False)
@pytest.mark.parametrize("axis", [0, 1])
@pytest.mark.parametrize("group_keys", [True, False, None])
@pytest.mark.parametrize("method", ["ffill", "bfill"])
@pytest.mark.parametrize("limit", [None, 1, 4])
def test_fillna(axis, group_keys, method, limit):
df = pd.DataFrame(
{
"A": [1, 1, 2, 2],
"B": [3, 4, 3, 4],
"C": [np.nan, 3, np.nan, np.nan],
"D": [4, np.nan, 5, np.nan],
"E": [6, np.nan, 7, np.nan],
}
)
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(
df.groupby("A", group_keys=group_keys).fillna(0, axis=axis),
ddf.groupby("A", group_keys=group_keys).fillna(0, axis=axis),
)
assert_eq(
df.groupby("A", group_keys=group_keys).B.fillna(0),
ddf.groupby("A", group_keys=group_keys).B.fillna(0),
)
assert_eq(
df.groupby(["A", "B"], group_keys=group_keys).fillna(0),
ddf.groupby(["A", "B"], group_keys=group_keys).fillna(0),
)
assert_eq(
df.groupby("A", group_keys=group_keys).fillna(
method=method, limit=limit, axis=axis
),
ddf.groupby("A", group_keys=group_keys).fillna(
method=method, limit=limit, axis=axis
),
)
assert_eq(
df.groupby(["A", "B"], group_keys=group_keys).fillna(
method=method, limit=limit, axis=axis
),
ddf.groupby(["A", "B"], group_keys=group_keys).fillna(
method=method, limit=limit, axis=axis
),
)
with pytest.raises(NotImplementedError):
ddf.groupby("A").fillna({"A": 0})
with pytest.raises(NotImplementedError):
ddf.groupby("A").fillna(pd.Series(dtype=int))
with pytest.raises(NotImplementedError):
ddf.groupby("A").fillna(pd.DataFrame)
def test_ffill():
df = pd.DataFrame(
{
"A": [1, 1, 2, 2],
"B": [3, 4, 3, 4],
"C": [np.nan, 3, np.nan, np.nan],
"D": [4, np.nan, 5, np.nan],
"E": [6, np.nan, 7, np.nan],
}
)
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(
df.groupby("A").ffill(),
ddf.groupby("A").ffill(),
)
assert_eq(
df.groupby("A").B.ffill(),
ddf.groupby("A").B.ffill(),
)
assert_eq(
df.groupby(["A", "B"]).ffill(),
ddf.groupby(["A", "B"]).ffill(),
)
def test_bfill():
df = pd.DataFrame(
{
"A": [1, 1, 2, 2],
"B": [3, 4, 3, 4],
"C": [np.nan, 3, np.nan, np.nan],
"D": [np.nan, 4, np.nan, 5],
"E": [np.nan, 6, np.nan, 7],
}
)
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(
df.groupby("A").bfill(),
ddf.groupby("A").bfill(),
)
assert_eq(
df.groupby("A").B.bfill(),
ddf.groupby("A").B.bfill(),
)
assert_eq(
df.groupby(["A", "B"]).bfill(),
ddf.groupby(["A", "B"]).bfill(),
)
@pytest.mark.parametrize(
"grouper",
[
lambda df: ["a"],
lambda df: ["a", "b"],
lambda df: df["a"],
lambda df: [df["a"], df["b"]],
lambda df: [df["a"] > 2, df["b"] > 1],
],
)
@pytest.mark.parametrize("split_out", [1, 2])
def test_dataframe_aggregations_multilevel(grouper, split_out, agg_func):
sort = split_out == 1 # Don't sort for split_out > 1
def call(g, m, **kwargs):
return getattr(g, m)(**kwargs)
pdf = pd.DataFrame(
{
"a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 10,
"b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10,
"d": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10,
"c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10,
},
columns=["c", "b", "a", "d"],
)
ddf = dd.from_pandas(pdf, npartitions=10)
# covariance only works with N+1 columns
if agg_func not in ("cov", "corr"):
assert_eq(
call(pdf.groupby(grouper(pdf), sort=sort)["c"], agg_func),
call(
ddf.groupby(grouper(ddf), sort=sort)["c"],
agg_func,
split_out=split_out,
split_every=2,
),
)
# not supported by pandas
if agg_func != "nunique":
if agg_func in ("cov", "corr") and split_out > 1:
pytest.skip("https://github.com/dask/dask/issues/9509")
assert_eq(
call(pdf.groupby(grouper(pdf), sort=sort)[["c", "d"]], agg_func),
call(
ddf.groupby(grouper(ddf), sort=sort)[["c", "d"]],
agg_func,
split_out=split_out,
split_every=2,
),
)
if agg_func in ("cov", "corr"):
# there are sorting issues between pandas and chunk cov w/dask
df = call(pdf.groupby(grouper(pdf), sort=sort), agg_func).sort_index()
cols = sorted(list(df.columns))
df = df[cols]
dddf = call(
ddf.groupby(grouper(ddf), sort=sort),
agg_func,
split_out=split_out,
split_every=2,
).compute()
dddf = dddf.sort_index()
cols = sorted(list(dddf.columns))
dddf = dddf[cols]
assert_eq(df, dddf)
else:
assert_eq(
call(pdf.groupby(grouper(pdf), sort=sort), agg_func),
call(
ddf.groupby(grouper(ddf), sort=sort),
agg_func,
split_out=split_out,
split_every=2,
),
)
@pytest.mark.parametrize(
"grouper",
[
lambda df: df["a"],
lambda df: [df["a"], df["b"]],
lambda df: [df["a"] > 2, df["b"] > 1],
],
)
@pytest.mark.parametrize("split_out", [1, 2])
def test_series_aggregations_multilevel(grouper, split_out, agg_func):
"""
similar to ``test_dataframe_aggregations_multilevel``, but series do not
support all groupby args.
"""
sort = split_out == 1 # Don't sort for split_out > 1
def call(g, m, **kwargs):
return getattr(g, m)(**kwargs)
# covariance/correlation is not a series aggregation
if agg_func in ("cov", "corr"):
return
pdf = pd.DataFrame(
{
"a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 10,
"b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10,
"c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10,
},
columns=["c", "b", "a"],
)
ddf = dd.from_pandas(pdf, npartitions=10)
assert_eq(
call(pdf["c"].groupby(grouper(pdf), sort=sort), agg_func),
call(
ddf["c"].groupby(grouper(ddf), sort=sort),
agg_func,
split_out=split_out,
split_every=2,
),
# for pandas ~ 0.18, the name is not not properly propagated for
# the mean aggregation
check_names=(agg_func not in {"mean", "nunique"}),
)
@pytest.mark.parametrize(
"grouper",
[
lambda df: df["a"],
lambda df: df["a"] > 2,
lambda df: [df["a"], df["b"]],
lambda df: [df["a"] > 2],
pytest.param(
lambda df: [df["a"] > 2, df["b"] > 1],
marks=pytest.mark.xfail(
not PANDAS_GT_150,
reason="index dtype does not coincide: boolean != empty",
),
),
],
)
@pytest.mark.parametrize(
"group_and_slice",
[
lambda df, grouper: df.groupby(grouper(df)),
lambda df, grouper: df["c"].groupby(grouper(df)),
lambda df, grouper: df.groupby(grouper(df))["c"],
],
)
def test_groupby_meta_content(group_and_slice, grouper):
pdf = pd.DataFrame(
{
"a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 10,
"b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10,
"c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10,
},
columns=["c", "b", "a"],
)
ddf = dd.from_pandas(pdf, npartitions=10)
expected = group_and_slice(pdf, grouper).first().head(0)
meta = group_and_slice(ddf, grouper)._meta.first()
meta_nonempty = group_and_slice(ddf, grouper)._meta_nonempty.first().head(0)
assert_eq(expected, meta)
assert_eq(expected, meta_nonempty)
def test_groupy_non_aligned_index():
pdf = pd.DataFrame(
{
"a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 10,
"b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10,
"c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10,
},
columns=["c", "b", "a"],
)
ddf3 = dd.from_pandas(pdf, npartitions=3)
ddf7 = dd.from_pandas(pdf, npartitions=7)
# working examples
ddf3.groupby(["a", "b"])
ddf3.groupby([ddf3["a"], ddf3["b"]])
# misaligned divisions
with pytest.raises(NotImplementedError):
ddf3.groupby(ddf7["a"])
with pytest.raises(NotImplementedError):
ddf3.groupby([ddf7["a"], ddf7["b"]])
with pytest.raises(NotImplementedError):
ddf3.groupby([ddf7["a"], ddf3["b"]])
with pytest.raises(NotImplementedError):
ddf3.groupby([ddf3["a"], ddf7["b"]])
with pytest.raises(NotImplementedError):
ddf3.groupby([ddf7["a"], "b"])
def test_groupy_series_wrong_grouper():
df = pd.DataFrame(
{
"a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 10,
"b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 10,
"c": [0, 1, 2, 3, 4, 5, 6, 7, 8] * 10,
},
columns=["c", "b", "a"],
)
df = dd.from_pandas(df, npartitions=3)
s = df["a"]
# working index values
s.groupby(s)
s.groupby([s, s])
# non working index values
with pytest.raises(KeyError):
s.groupby("foo")
with pytest.raises(KeyError):
s.groupby([s, "foo"])
with pytest.raises(ValueError):
s.groupby(df)
with pytest.raises(ValueError):
s.groupby([s, df])
@pytest.mark.parametrize("npartitions", [1, 4, 20])
@pytest.mark.parametrize("split_every", [2, 5])
@pytest.mark.parametrize("split_out", [1, 5, 20])
def test_hash_groupby_aggregate(npartitions, split_every, split_out):
df = pd.DataFrame({"x": np.arange(100) % 10, "y": np.ones(100)})
ddf = dd.from_pandas(df, npartitions)
result = ddf.groupby("x", sort=(split_out == 1)).y.var(
split_every=split_every, split_out=split_out
)
dsk = result.__dask_optimize__(result.dask, result.__dask_keys__())
from dask.core import get_deps
dependencies, dependents = get_deps(dsk)
assert result.npartitions == split_out
assert len([k for k, v in dependencies.items() if not v]) == npartitions
assert_eq(result, df.groupby("x").y.var())
def test_split_out_multi_column_groupby():
df = pd.DataFrame(
{"x": np.arange(100) % 10, "y": np.ones(100), "z": [1, 2, 3, 4, 5] * 20}
)
ddf = dd.from_pandas(df, npartitions=10)
result = ddf.groupby(["x", "y"], sort=False).z.mean(split_out=4)
expected = df.groupby(["x", "y"]).z.mean()
assert_eq(result, expected, check_dtype=False)
def test_groupby_split_out_num():
# GH 1841
ddf = dd.from_pandas(
pd.DataFrame({"A": [1, 1, 2, 2], "B": [1, 2, 3, 4]}), npartitions=2
)
assert ddf.groupby("A").sum().npartitions == 1
assert ddf.groupby("A", sort=False).sum(split_out=2).npartitions == 2
assert ddf.groupby("A", sort=False).sum(split_out=3).npartitions == 3
with pytest.raises(TypeError):
# groupby doesn't accept split_out
ddf.groupby("A", split_out=2)
def test_groupby_not_supported():
ddf = dd.from_pandas(
pd.DataFrame({"A": [1, 1, 2, 2], "B": [1, 2, 3, 4]}), npartitions=2
)
with pytest.raises(TypeError):
ddf.groupby("A", axis=1)
with pytest.raises(TypeError):
ddf.groupby("A", level=1)
with pytest.raises(TypeError):
ddf.groupby("A", as_index=False)
with pytest.raises(TypeError):
ddf.groupby("A", squeeze=True)
def test_groupby_numeric_column():
df = pd.DataFrame({"A": ["foo", "foo", "bar"], 0: [1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=3)
assert_eq(ddf.groupby(ddf.A)[0].sum(), df.groupby(df.A)[0].sum())
@pytest.mark.parametrize("sel", ["a", "c", "d", ["a", "b"], ["c", "d"]])
@pytest.mark.parametrize("key", ["a", ["a", "b"]])
@pytest.mark.parametrize("func", ["cumsum", "cumprod", "cumcount"])
def test_cumulative(func, key, sel):
if (
not PANDAS_GT_130
and not func == "cumcount"
and sel == ["a", "b"]
and key == ["a", "b"]
):
pytest.xfail(
reason="cumsum and cumprod will raise DataError: No numeric types to aggregate"
)
df = pd.DataFrame(
{
"a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 6,
"b": [4, 2, 7, 3, 3, 1, 1, 1, 2] * 6,
"c": np.random.randn(54),
"d": np.random.randn(54),
},
columns=["a", "b", "c", "d"],
)
df.iloc[[-18, -12, -6], -1] = np.nan
ddf = dd.from_pandas(df, npartitions=10)
g, dg = (d.groupby(key)[sel] for d in (df, ddf))
assert_eq(getattr(g, func)(), getattr(dg, func)())
if func == "cumcount":
with pytest.warns(
FutureWarning,
match="`axis` keyword argument is deprecated and will removed in a future release",
):
dg.cumcount(axis=0)
@pytest.mark.parametrize("func", ["cumsum", "cumprod"])
def test_cumulative_axis1(func):
df = pd.DataFrame(
{
"a": [1, 2, 6, 4, 4, 6, 4, 3, 7] * 2,
"b": np.random.randn(18),
"c": np.random.randn(18),
}
)
df.iloc[-6, -1] = np.nan
ddf = dd.from_pandas(df, npartitions=4)
assert_eq(
getattr(df.groupby("a"), func)(axis=1), getattr(ddf.groupby("a"), func)(axis=1)
)
with pytest.raises(ValueError, match="No axis named 1 for object type Series"):
getattr(ddf.groupby("a").b, func)(axis=1)
with pytest.warns(
FutureWarning,
match="`axis` keyword argument is deprecated and will removed in a future release",
):
ddf.groupby("a").cumcount(axis=1)
def test_groupby_unaligned_index():
df = pd.DataFrame(
{
"a": np.random.randint(0, 10, 50),
"b": np.random.randn(50),
"c": np.random.randn(50),
}
)
ddf = dd.from_pandas(df, npartitions=5)
filtered = df[df.b < 0.5]
dfiltered = ddf[ddf.b < 0.5]
ddf_group = dfiltered.groupby(ddf.a)
ds_group = dfiltered.b.groupby(ddf.a)
bad = [
ddf_group.mean(),
ddf_group.var(),
ddf_group.b.nunique(),
ddf_group.get_group(0),
ds_group.mean(),
ds_group.var(),
ds_group.nunique(),
ds_group.get_group(0),
]
for obj in bad:
with pytest.raises(ValueError):
obj.compute()
def add1(x):
return x + 1
df_group = filtered.groupby(df.a)
good = [
(ddf_group.apply(add1, meta=ddf), df_group.apply(add1)),
(ddf_group.b.apply(add1, meta=ddf.b), df_group.b.apply(add1)),
]
for (res, sol) in good:
assert_eq(res, sol)
def test_groupby_string_label():
df = pd.DataFrame({"foo": [1, 1, 4], "B": [2, 3, 4], "C": [5, 6, 7]})
ddf = dd.from_pandas(pd.DataFrame(df), npartitions=1)
ddf_group = ddf.groupby("foo")
result = ddf_group.get_group(1).compute()
expected = pd.DataFrame(
{"foo": [1, 1], "B": [2, 3], "C": [5, 6]}, index=pd.Index([0, 1])
)
tm.assert_frame_equal(result, expected)
def test_groupby_dataframe_cum_caching():
"""Test caching behavior of cumulative operations on grouped dataframes.
Relates to #3756.
"""
df = pd.DataFrame(
dict(a=list("aabbcc")), index=pd.date_range(start="20100101", periods=6)
)
df["ones"] = 1
df["twos"] = 2
ddf = dd.from_pandas(df, npartitions=3)
ops = ["cumsum", "cumprod"]
for op in ops:
ddf0 = getattr(ddf.groupby(["a"]), op)()
ddf1 = ddf.rename(columns={"ones": "foo", "twos": "bar"})
ddf1 = getattr(ddf1.groupby(["a"]), op)()
# _a and _b dataframe should be equal
res0_a, res1_a = dask.compute(ddf0, ddf1)
res0_b, res1_b = ddf0.compute(), ddf1.compute()
assert res0_a.equals(res0_b)
assert res1_a.equals(res1_b)
def test_groupby_series_cum_caching():
"""Test caching behavior of cumulative operations on grouped Series
Relates to #3755
"""
df = pd.DataFrame(
dict(a=list("aabbcc")), index=pd.date_range(start="20100101", periods=6)
)
df["ones"] = 1
df["twos"] = 2
ops = ["cumsum", "cumprod"]
for op in ops:
ddf = dd.from_pandas(df, npartitions=3)
dcum = ddf.groupby(["a"])
res0_a, res1_a = dask.compute(
getattr(dcum["ones"], op)(), getattr(dcum["twos"], op)()
)
cum = df.groupby(["a"])
res0_b, res1_b = (getattr(cum["ones"], op)(), getattr(cum["twos"], op)())
assert res0_a.equals(res0_b)
assert res1_a.equals(res1_b)
def test_groupby_slice_agg_reduces():
d = pd.DataFrame({"a": [1, 2, 3, 4], "b": [2, 3, 4, 5]})
a = dd.from_pandas(d, npartitions=2)
result = a.groupby("a")["b"].agg(["min", "max"])
expected = d.groupby("a")["b"].agg(["min", "max"])
assert_eq(result, expected)
def test_groupby_agg_grouper_single():
# https://github.com/dask/dask/issues/2255
d = pd.DataFrame({"a": [1, 2, 3, 4]})
a = dd.from_pandas(d, npartitions=2)
result = a.groupby("a")["a"].agg(["min", "max"])
expected = d.groupby("a")["a"].agg(["min", "max"])
assert_eq(result, expected)
@pytest.mark.parametrize("slice_", ["a", ["a"], ["a", "b"], ["b"]])
def test_groupby_agg_grouper_multiple(slice_):
# https://github.com/dask/dask/issues/2255
d = pd.DataFrame({"a": [1, 2, 3, 4], "b": [1, 2, 3, 4]})
a = dd.from_pandas(d, npartitions=2)
result = a.groupby("a")[slice_].agg(["min", "max"])
expected = d.groupby("a")[slice_].agg(["min", "max"])
assert_eq(result, expected)
@pytest.mark.parametrize(
"agg_func",
[
"cumprod",
"cumcount",
"cumsum",
"var",
"sum",
"mean",
"count",
"size",
"std",
"min",
"max",
"first",
"last",
"prod",
],
)
def test_groupby_column_and_index_agg_funcs(agg_func):
def call(g, m, **kwargs):
return getattr(g, m)(**kwargs)
df = pd.DataFrame(
{
"idx": [1, 1, 1, 2, 2, 2],
"a": [1, 2, 1, 2, 1, 2],
"b": np.arange(6),
"c": [1, 1, 1, 2, 2, 2],
}
).set_index("idx")
ddf = dd.from_pandas(df, npartitions=df.index.nunique())
ddf_no_divs = dd.from_pandas(df, npartitions=df.index.nunique(), sort=False)
# Index and then column
# Compute expected result
expected = call(df.groupby(["idx", "a"]), agg_func)
if agg_func in {"mean", "var"}:
expected = expected.astype(float)
result = call(ddf.groupby(["idx", "a"]), agg_func)
assert_eq(expected, result)
result = call(ddf_no_divs.groupby(["idx", "a"]), agg_func)
assert_eq(expected, result)
# apply-combine-apply aggregation functions
aca_agg = {"sum", "mean", "var", "size", "std", "count", "first", "last", "prod"}
# Test aggregate strings
if agg_func in aca_agg:
result = ddf_no_divs.groupby(["idx", "a"]).agg(agg_func)
assert_eq(expected, result)
# Column and then index
# Compute expected result
expected = call(df.groupby(["a", "idx"]), agg_func)
if agg_func in {"mean", "var"}:
expected = expected.astype(float)
result = call(ddf.groupby(["a", "idx"]), agg_func)
assert_eq(expected, result)
result = call(ddf_no_divs.groupby(["a", "idx"]), agg_func)
assert_eq(expected, result)
# Test aggregate strings
if agg_func in aca_agg:
result = ddf_no_divs.groupby(["a", "idx"]).agg(agg_func)
assert_eq(expected, result)
# Index only
# Compute expected result
expected = call(df.groupby("idx"), agg_func)
if agg_func in {"mean", "var"}:
expected = expected.astype(float)
result = call(ddf.groupby("idx"), agg_func)
assert_eq(expected, result)
result = call(ddf_no_divs.groupby("idx"), agg_func)
assert_eq(expected, result)
# Test aggregate strings
if agg_func in aca_agg:
result = ddf_no_divs.groupby("idx").agg(agg_func)
assert_eq(expected, result)
@pytest.mark.parametrize("group_args", [["idx", "a"], ["a", "idx"], ["idx"], "idx"])
@pytest.mark.parametrize(
"apply_func",
[np.min, np.mean, lambda s, axis=None: np.max(s.values) - np.mean(s.values)],
)
def test_groupby_column_and_index_apply(group_args, apply_func):
df = pd.DataFrame(
{"idx": [1, 1, 1, 2, 2, 2], "a": [1, 2, 1, 2, 1, 2], "b": np.arange(6)}
).set_index("idx")
ddf = dd.from_pandas(df, npartitions=df.index.nunique())
ddf_no_divs = dd.from_pandas(df, npartitions=df.index.nunique(), sort=False)
# Expected result
expected = df.groupby(group_args).apply(apply_func, axis=0)
# Compute on dask DataFrame with divisions (no shuffling)
result = ddf.groupby(group_args).apply(apply_func, axis=0, meta=expected)
assert_eq(expected, result, check_divisions=False)
# Check that partitioning is preserved
assert ddf.divisions == result.divisions
# Check that no shuffling occurred.
# The groupby operation should add only 1 task per partition
assert len(result.dask) == (len(ddf.dask) + ddf.npartitions)
expected = df.groupby(group_args).apply(apply_func, axis=0)
# Compute on dask DataFrame without divisions (requires shuffling)
result = ddf_no_divs.groupby(group_args).apply(apply_func, axis=0, meta=expected)
assert_eq(expected, result, check_divisions=False)
# Check that divisions were preserved (all None in this case)
assert ddf_no_divs.divisions == result.divisions
# Crude check to see if shuffling was performed.
# The groupby operation should add only more than 1 task per partition
assert len(result.dask) > (len(ddf_no_divs.dask) + ddf_no_divs.npartitions)
custom_mean = dd.Aggregation(
"mean",
lambda s: (s.count(), s.sum()),
lambda s0, s1: (s0.sum(), s1.sum()),
lambda s0, s1: s1 / s0,
)
custom_sum = dd.Aggregation("sum", lambda s: s.sum(), lambda s0: s0.sum())
@pytest.mark.parametrize(
"pandas_spec, dask_spec, check_dtype",
[
({"b": "mean"}, {"b": custom_mean}, False),
({"b": "sum"}, {"b": custom_sum}, True),
(["mean", "sum"], [custom_mean, custom_sum], False),
({"b": ["mean", "sum"]}, {"b": [custom_mean, custom_sum]}, False),
],
)
def test_dataframe_groupby_agg_custom_sum(pandas_spec, dask_spec, check_dtype):
df = pd.DataFrame({"g": [0, 0, 1] * 3, "b": [1, 2, 3] * 3})
ddf = dd.from_pandas(df, npartitions=2)
expected = df.groupby("g").aggregate(pandas_spec)
result = ddf.groupby("g").aggregate(dask_spec)
assert_eq(result, expected, check_dtype=check_dtype)
@pytest.mark.parametrize(
"pandas_spec, dask_spec",
[
("mean", custom_mean),
(["mean"], [custom_mean]),
(["mean", "sum"], [custom_mean, custom_sum]),
],
)
def test_series_groupby_agg_custom_mean(pandas_spec, dask_spec):
d = pd.DataFrame({"g": [0, 0, 1] * 3, "b": [1, 2, 3] * 3})
a = dd.from_pandas(d, npartitions=2)
expected = d["b"].groupby(d["g"]).aggregate(pandas_spec)
result = a["b"].groupby(a["g"]).aggregate(dask_spec)
assert_eq(result, expected, check_dtype=False)
def test_groupby_agg_custom__name_clash_with_internal_same_column():
"""for a single input column only unique names are allowed"""
d = pd.DataFrame({"g": [0, 0, 1] * 3, "b": [1, 2, 3] * 3})
a = dd.from_pandas(d, npartitions=2)
agg_func = dd.Aggregation("sum", lambda s: s.sum(), lambda s0: s0.sum())
with pytest.raises(ValueError):
a.groupby("g").aggregate({"b": [agg_func, "sum"]})
def test_groupby_agg_custom__name_clash_with_internal_different_column():
"""custom aggregation functions can share the name of a builtin function"""
d = pd.DataFrame({"g": [0, 0, 1] * 3, "b": [1, 2, 3] * 3, "c": [4, 5, 6] * 3})
a = dd.from_pandas(d, npartitions=2)
# NOTE: this function is purposefully misnamed
agg_func = dd.Aggregation(
"sum",
lambda s: (s.count(), s.sum()),
lambda s0, s1: (s0.sum(), s1.sum()),
lambda s0, s1: s1 / s0,
)
# NOTE: the name of agg-func is suppressed in the output,
# since only a single agg func per column was specified
result = a.groupby("g").aggregate({"b": agg_func, "c": "sum"})
expected = d.groupby("g").aggregate({"b": "mean", "c": "sum"})
assert_eq(result, expected, check_dtype=False)
def test_groupby_agg_custom__mode():
# mode function passing intermediates as pure python objects around. to protect
# results from pandas in apply use return results as single-item lists
def agg_mode(s):
def impl(s):
(res,) = s.iloc[0]
for (i,) in s.iloc[1:]:
res = res.add(i, fill_value=0)
return [res]
return s.apply(impl)
agg_func = dd.Aggregation(
"custom_mode",
lambda s: s.apply(lambda s: [s.value_counts()]),
agg_mode,
lambda s: s.map(lambda i: i[0].idxmax()),
)
d = pd.DataFrame(
{
"g0": [0, 0, 0, 1, 1] * 3,
"g1": [0, 0, 0, 1, 1] * 3,
"cc": [4, 5, 4, 6, 6] * 3,
}
)
a = dd.from_pandas(d, npartitions=5)
actual = a["cc"].groupby([a["g0"], a["g1"]]).agg(agg_func)
# cheat to get the correct index
expected = pd.DataFrame({"g0": [0, 1], "g1": [0, 1], "cc": [4, 6]})
expected = expected["cc"].groupby([expected["g0"], expected["g1"]]).agg("sum")
assert_eq(actual, expected)
@pytest.mark.parametrize("func", ["var", list])
def test_groupby_select_column_agg(func):
pdf = pd.DataFrame(
{
"A": [1, 2, 3, 1, 2, 3, 1, 2, 4],
"B": [-0.776, -0.4, -0.873, 0.054, 1.419, -0.948, -0.967, -1.714, -0.666],
}
)
ddf = dd.from_pandas(pdf, npartitions=4)
actual = ddf.groupby("A")["B"].agg(func)
expected = pdf.groupby("A")["B"].agg(func)
assert_eq(actual, expected)
@pytest.mark.parametrize(
"func",
[
lambda x: x.std(numeric_only=True),
lambda x: x.groupby("x").std(),
lambda x: x.groupby("x").var(),
lambda x: x.groupby("x").mean(),
lambda x: x.groupby("x").sum(),
lambda x: x.groupby("x").z.std(),
],
)
def test_std_object_dtype(func):
df = pd.DataFrame({"x": [1, 2, 1], "y": ["a", "b", "c"], "z": [11.0, 22.0, 33.0]})
ddf = dd.from_pandas(df, npartitions=2)
with check_numeric_only_deprecation():
expected = func(df)
assert_eq(expected, func(ddf))
def test_std_columns_int():
# Make sure std() works when index_by is a df with integer column names
# Non regression test for issue #3560
df = pd.DataFrame({0: [5], 1: [5]})
ddf = dd.from_pandas(df, npartitions=2)
by = dask.array.from_array([0, 1]).to_dask_dataframe()
ddf.groupby(by).std()
def test_timeseries():
df = dask.datasets.timeseries().partitions[:2]
assert_eq(df.groupby("name").std(), df.groupby("name").std())
@pytest.mark.parametrize("min_count", [0, 1, 2, 3])
def test_with_min_count(min_count):
dfs = [
pd.DataFrame(
{
"group": ["A", "A", "B"],
"val1": [np.nan, 2, 3],
"val2": [np.nan, 5, 6],
"val3": [5, 4, 9],
}
),
pd.DataFrame(
{
"group": ["A", "A", "B"],
"val1": [2, np.nan, np.nan],
"val2": [np.nan, 5, 6],
"val3": [5, 4, 9],
}
),
]
ddfs = [dd.from_pandas(df, npartitions=4) for df in dfs]
for df, ddf in zip(dfs, ddfs):
assert_eq(
df.groupby("group").sum(min_count=min_count),
ddf.groupby("group").sum(min_count=min_count),
)
assert_eq(
df.groupby("group").prod(min_count=min_count),
ddf.groupby("group").prod(min_count=min_count),
)
@pytest.mark.parametrize("group_keys", [True, False, None])
def test_groupby_group_keys(group_keys):
df = pd.DataFrame({"a": [1, 2, 2, 3], "b": [2, 3, 4, 5]})
ddf = dd.from_pandas(df, npartitions=2).set_index("a")
pdf = df.set_index("a")
func = lambda g: g.copy()
expected = pdf.groupby("a").apply(func)
assert_eq(expected, ddf.groupby("a").apply(func, meta=expected))
expected = pdf.groupby("a", group_keys=group_keys).apply(func)
assert_eq(
expected, ddf.groupby("a", group_keys=group_keys).apply(func, meta=expected)
)
@pytest.mark.parametrize(
"columns",
[["a", "b", "c"], np.array([1.0, 2.0, 3.0]), ["1", "2", "3"], ["", "a", "b"]],
)
def test_groupby_cov(columns):
rows = 20
cols = 3
data = np.random.randn(rows, cols)
df = pd.DataFrame(data, columns=columns)
df["key"] = [0] * 10 + [1] * 5 + [2] * 5
ddf = dd.from_pandas(df, npartitions=3)
expected = df.groupby("key").cov()
result = ddf.groupby("key").cov()
# when using numerical values for columns
# the column mapping and stacking leads to a float typed
# MultiIndex. Pandas will normally create a object typed
# MultiIndex
if isinstance(columns, np.ndarray):
result = result.compute()
# don't bother checking index -- MultiIndex levels are in a frozenlist
result.columns = result.columns.astype(np.dtype("O"))
assert_eq(expected, result, check_index=False)
else:
assert_eq(expected, result)
def test_df_groupby_idxmin():
pdf = pd.DataFrame(
{"idx": list(range(4)), "group": [1, 1, 2, 2], "value": [10, 20, 20, 10]}
).set_index("idx")
ddf = dd.from_pandas(pdf, npartitions=2)
expected = pd.DataFrame({"group": [1, 2], "value": [0, 3]}).set_index("group")
result_pd = pdf.groupby("group").idxmin()
result_dd = ddf.groupby("group").idxmin()
assert_eq(result_pd, result_dd)
assert_eq(expected, result_dd)
@pytest.mark.parametrize("skipna", [True, False])
def test_df_groupby_idxmin_skipna(skipna):
pdf = pd.DataFrame(
{
"idx": list(range(4)),
"group": [1, 1, 2, 2],
"value": [np.nan, 20.1, np.nan, 10.1],
}
).set_index("idx")
ddf = dd.from_pandas(pdf, npartitions=2)
result_pd = pdf.groupby("group").idxmin(skipna=skipna)
result_dd = ddf.groupby("group").idxmin(skipna=skipna)
assert_eq(result_pd, result_dd)
def test_df_groupby_idxmax():
pdf = pd.DataFrame(
{"idx": list(range(4)), "group": [1, 1, 2, 2], "value": [10, 20, 20, 10]}
).set_index("idx")
ddf = dd.from_pandas(pdf, npartitions=3)
expected = pd.DataFrame({"group": [1, 2], "value": [1, 2]}).set_index("group")
result_pd = pdf.groupby("group").idxmax()
result_dd = ddf.groupby("group").idxmax()
assert_eq(result_pd, result_dd)
assert_eq(expected, result_dd)
@pytest.mark.parametrize("skipna", [True, False])
def test_df_groupby_idxmax_skipna(skipna):
pdf = pd.DataFrame(
{
"idx": list(range(4)),
"group": [1, 1, 2, 2],
"value": [np.nan, 20.1, np.nan, 10.1],
}
).set_index("idx")
ddf = dd.from_pandas(pdf, npartitions=2)
result_pd = pdf.groupby("group").idxmax(skipna=skipna)
result_dd = ddf.groupby("group").idxmax(skipna=skipna)
assert_eq(result_pd, result_dd)
def test_series_groupby_idxmin():
pdf = pd.DataFrame(
{"idx": list(range(4)), "group": [1, 1, 2, 2], "value": [10, 20, 20, 10]}
).set_index("idx")
ddf = dd.from_pandas(pdf, npartitions=2)
expected = (
pd.DataFrame({"group": [1, 2], "value": [0, 3]}).set_index("group").squeeze()
)
result_pd = pdf.groupby("group")["value"].idxmin()
result_dd = ddf.groupby("group")["value"].idxmin()
assert_eq(result_pd, result_dd)
assert_eq(expected, result_dd)
@pytest.mark.parametrize("skipna", [True, False])
def test_series_groupby_idxmin_skipna(skipna):
pdf = pd.DataFrame(
{
"idx": list(range(4)),
"group": [1, 1, 2, 2],
"value": [np.nan, 20.1, np.nan, 10.1],
}
).set_index("idx")
ddf = dd.from_pandas(pdf, npartitions=2)
result_pd = pdf.groupby("group")["value"].idxmin(skipna=skipna)
result_dd = ddf.groupby("group")["value"].idxmin(skipna=skipna)
assert_eq(result_pd, result_dd)
def test_series_groupby_idxmax():
pdf = pd.DataFrame(
{"idx": list(range(4)), "group": [1, 1, 2, 2], "value": [10, 20, 20, 10]}
).set_index("idx")
ddf = dd.from_pandas(pdf, npartitions=3)
expected = (
pd.DataFrame({"group": [1, 2], "value": [1, 2]}).set_index("group").squeeze()
)
result_pd = pdf.groupby("group")["value"].idxmax()
result_dd = ddf.groupby("group")["value"].idxmax()
assert_eq(result_pd, result_dd)
assert_eq(expected, result_dd)
@pytest.mark.parametrize("skipna", [True, False])
def test_series_groupby_idxmax_skipna(skipna):
pdf = pd.DataFrame(
{
"idx": list(range(4)),
"group": [1, 1, 2, 2],
"value": [np.nan, 20.1, np.nan, 10.1],
}
).set_index("idx")
ddf = dd.from_pandas(pdf, npartitions=2)
result_pd = pdf.groupby("group")["value"].idxmax(skipna=skipna)
result_dd = ddf.groupby("group")["value"].idxmax(skipna=skipna)
assert_eq(result_pd, result_dd)
def test_groupby_unique():
rng = np.random.RandomState(42)
df = pd.DataFrame(
{"foo": rng.randint(3, size=100), "bar": rng.randint(10, size=100)}
)
ddf = dd.from_pandas(df, npartitions=10)
pd_gb = df.groupby("foo")["bar"].unique()
dd_gb = ddf.groupby("foo")["bar"].unique()
# Use explode because each DataFrame row is a list; equality fails
assert_eq(dd_gb.explode(), pd_gb.explode())
def test_groupby_value_counts():
rng = np.random.RandomState(42)
df = pd.DataFrame(
{"foo": rng.randint(3, size=100), "bar": rng.randint(4, size=100)}
)
ddf = dd.from_pandas(df, npartitions=2)
pd_gb = df.groupby("foo")["bar"].value_counts()
dd_gb = ddf.groupby("foo")["bar"].value_counts()
assert_eq(dd_gb, pd_gb)
@pytest.mark.parametrize("npartitions", [1, 2, 5])
@pytest.mark.parametrize("period", [1, -1, 10])
@pytest.mark.parametrize("axis", [0, 1])
def test_groupby_shift_basic_input(npartitions, period, axis):
pdf = pd.DataFrame(
{
"a": [0, 0, 1, 1, 2, 2, 3, 3, 3],
"b": [4, 5, 6, 3, 2, 1, 0, 0, 0],
"c": [0, 0, 0, 0, 0, 1, 1, 1, 1],
},
)
ddf = dd.from_pandas(pdf, npartitions=npartitions)
with pytest.warns(UserWarning):
assert_eq(
pdf.groupby(["a", "c"]).shift(period, axis=axis),
ddf.groupby(["a", "c"]).shift(period, axis=axis),
)
with pytest.warns(UserWarning):
assert_eq(
pdf.groupby(["a"]).shift(period, axis=axis),
ddf.groupby(["a"]).shift(period, axis=axis),
)
with pytest.warns(UserWarning):
assert_eq(
pdf.groupby(pdf.c).shift(period, axis=axis),
ddf.groupby(ddf.c).shift(period, axis=axis),
)
def test_groupby_shift_series():
pdf = pd.DataFrame(
{
"a": [0, 0, 1, 1, 2, 2, 3, 3, 3],
"b": [4, 5, 6, 3, 2, 1, 0, 0, 0],
},
)
ddf = dd.from_pandas(pdf, npartitions=3)
with pytest.warns(UserWarning):
assert_eq(
pdf.groupby("a")["b"].shift(periods=2),
ddf.groupby("a")["b"].shift(periods=2),
)
def test_groupby_shift_lazy_input():
pdf = pd.DataFrame(
{
"a": [0, 0, 1, 1, 2, 2, 3, 3, 3],
"b": [4, 5, 6, 3, 2, 1, 0, 0, 0],
"c": [0, 0, 0, 0, 0, 1, 1, 1, 1],
},
)
delayed_periods = dask.delayed(lambda: 1)()
ddf = dd.from_pandas(pdf, npartitions=3)
assert_eq(
pdf.groupby(pdf.c).shift(periods=1),
ddf.groupby(ddf.c).shift(periods=delayed_periods, meta={"a": int, "b": int}),
)
with pytest.warns(UserWarning):
assert_eq(
pdf.groupby(pdf.c).shift(periods=1, fill_value=pdf.b.max()),
ddf.groupby(ddf.c).shift(periods=1, fill_value=ddf.b.max()),
)
@pytest.mark.filterwarnings("ignore:`meta` is not specified")
def test_groupby_shift_within_partition_sorting():
# Result is non-deterministic. We run the assertion a few times to keep
# the probability of false pass low.
for _ in range(10):
df = pd.DataFrame(
{
"a": range(60),
"b": [2, 4, 3, 1] * 15,
"c": [None, 10, 20, None, 30, 40] * 10,
}
)
df = df.set_index("a").sort_index()
ddf = dd.from_pandas(df, npartitions=6)
assert_eq(
df.groupby("b")["c"].shift(1),
ddf.groupby("b")["c"].shift(1),
scheduler="threads",
)
def test_groupby_shift_with_freq():
pdf = pd.DataFrame(
dict(a=[1, 2, 3, 4, 5, 6], b=[0, 0, 0, 1, 1, 1]),
index=pd.date_range(start="20100101", periods=6),
)
ddf = dd.from_pandas(pdf, npartitions=3)
# just pass the pandas result as meta for convenience
df_result = pdf.groupby(pdf.index).shift(periods=-2, freq="D")
# Groupby/shift on the index should avoid shuffle and let the `freq` pass
# unmodified, but that is currently broken: https://github.com/dask/dask/issues/8959
# TODO: remove check_freq condition once fixed.
assert_eq(
df_result,
ddf.groupby(ddf.index).shift(periods=-2, freq="D", meta=df_result),
**CHECK_FREQ,
)
df_result = pdf.groupby("b").shift(periods=-2, freq="D")
assert_eq(df_result, ddf.groupby("b").shift(periods=-2, freq="D", meta=df_result))
@pytest.mark.parametrize(
"transformation", [lambda x: x.sum(), np.sum, "sum", pd.Series.rank]
)
def test_groupby_transform_funcs(transformation):
pdf = pd.DataFrame(
{
"A": [1, 2, 3, 4] * 5,
"B": np.random.randn(20),
"C": np.random.randn(20),
"D": np.random.randn(20),
}
)
ddf = dd.from_pandas(pdf, 3)
with pytest.warns(UserWarning):
# DataFrame
assert_eq(
pdf.groupby("A").transform(transformation),
ddf.groupby("A").transform(transformation),
)
# Series
assert_eq(
pdf.groupby("A")["B"].transform(transformation),
ddf.groupby("A")["B"].transform(transformation),
)
@pytest.mark.parametrize("npartitions", list(range(1, 10)))
@pytest.mark.parametrize("indexed", [True, False])
def test_groupby_transform_ufunc_partitioning(npartitions, indexed):
pdf = pd.DataFrame({"group": [1, 2, 3, 4, 5] * 20, "value": np.random.randn(100)})
if indexed:
pdf = pdf.set_index("group")
ddf = dd.from_pandas(pdf, npartitions)
with pytest.warns(UserWarning):
# DataFrame
assert_eq(
pdf.groupby("group").transform(lambda series: series - series.mean()),
ddf.groupby("group").transform(lambda series: series - series.mean()),
)
# Series
assert_eq(
pdf.groupby("group")["value"].transform(
lambda series: series - series.mean()
),
ddf.groupby("group")["value"].transform(
lambda series: series - series.mean()
),
)
@pytest.mark.parametrize(
"grouping,agg",
[
(
lambda df: df.drop(columns="category_2").groupby("category_1"),
lambda grp: grp.mean(),
),
(
lambda df: df.drop(columns="category_2").groupby("category_1"),
lambda grp: grp.agg("mean"),
),
(lambda df: df.groupby(["category_1", "category_2"]), lambda grp: grp.mean()),
(
lambda df: df.groupby(["category_1", "category_2"]),
lambda grp: grp.agg("mean"),
),
],
)
def test_groupby_aggregate_categoricals(grouping, agg):
pdf = pd.DataFrame(
{
"category_1": pd.Categorical(list("AABBCC")),
"category_2": pd.Categorical(list("ABCABC")),
"value": np.random.uniform(size=6),
}
)
ddf = dd.from_pandas(pdf, 2)
# DataFrameGroupBy
assert_eq(agg(grouping(pdf)), agg(grouping(ddf)))
# SeriesGroupBy
assert_eq(agg(grouping(pdf)["value"]), agg(grouping(ddf)["value"]))
@pytest.mark.xfail(
not dask.dataframe.utils.PANDAS_GT_110,
reason="dropna kwarg not supported in pandas < 1.1.0.",
)
@pytest.mark.parametrize("dropna", [False, True])
def test_groupby_dropna_pandas(dropna):
df = pd.DataFrame(
{"a": [1, 2, 3, 4, None, None, 7, 8], "e": [4, 5, 6, 3, 2, 1, 0, 0]}
)
ddf = dd.from_pandas(df, npartitions=3)
dask_result = ddf.groupby("a", dropna=dropna).e.sum()
pd_result = df.groupby("a", dropna=dropna).e.sum()
assert_eq(dask_result, pd_result)
@pytest.mark.gpu
@pytest.mark.parametrize("dropna", [False, True, None])
@pytest.mark.parametrize("by", ["a", "c", "d", ["a", "b"], ["a", "c"], ["a", "d"]])
@pytest.mark.parametrize(
"group_keys",
[
True,
False,
None,
],
)
def test_groupby_dropna_cudf(dropna, by, group_keys):
# NOTE: This test requires cudf/dask_cudf, and will
# be skipped by non-GPU CI
cudf = pytest.importorskip("cudf")
dask_cudf = pytest.importorskip("dask_cudf")
df = cudf.DataFrame(
{
"a": [1, 2, 3, 4, None, None, 7, 8],
"b": [1, 0] * 4,
"c": ["a", "b", None, None, "e", "f", "g", "h"],
"e": [4, 5, 6, 3, 2, 1, 0, 0],
}
)
df["d"] = df["c"].astype("category")
ddf = dask_cudf.from_cudf(df, npartitions=3)
if dropna is None:
dask_result = ddf.groupby(by, group_keys=group_keys).e.sum()
cudf_result = df.groupby(by, group_keys=group_keys).e.sum()
else:
dask_result = ddf.groupby(by, dropna=dropna, group_keys=group_keys).e.sum()
cudf_result = df.groupby(by, dropna=dropna, group_keys=group_keys).e.sum()
if by in ["c", "d"]:
# Lose string/category index name in cudf...
dask_result = dask_result.compute()
dask_result.index.name = cudf_result.index.name
assert_eq(dask_result, cudf_result)
@pytest.mark.gpu
@pytest.mark.parametrize("key", ["a", "b"])
def test_groupby_grouper_dispatch(key):
cudf = pytest.importorskip("cudf")
# not directly used but must be imported
dask_cudf = pytest.importorskip("dask_cudf") # noqa: F841
pdf = pd.DataFrame(
{
"a": ["a", "b", "c", "d", "e", "f", "g", "h"],
"b": [1, 2, 3, 4, 5, 6, 7, 8],
"c": [1.0, 2.0, 3.5, 4.1, 5.5, 6.6, 7.9, 8.8],
}
)
gdf = cudf.from_pandas(pdf)
pd_grouper = grouper_dispatch(pdf)(key=key)
gd_grouper = grouper_dispatch(gdf)(key=key)
expect = pdf.groupby(pd_grouper).sum()
got = gdf.groupby(gd_grouper).sum()
assert_eq(expect, got)
@pytest.mark.xfail(
not dask.dataframe.utils.PANDAS_GT_110,
reason="Should work starting from pandas 1.1.0",
)
def test_groupby_dropna_with_agg():
# https://github.com/dask/dask/issues/6986
df = pd.DataFrame(
{"id1": ["a", None, "b"], "id2": [1, 2, None], "v1": [4.5, 5.5, None]}
)
expected = df.groupby(["id1", "id2"], dropna=False).agg("sum")
ddf = dd.from_pandas(df, 1, sort=False)
actual = ddf.groupby(["id1", "id2"], dropna=False).agg("sum")
assert_eq(expected, actual)
def test_groupby_observed_with_agg():
df = pd.DataFrame(
{
"cat_1": pd.Categorical(list("AB"), categories=list("ABCDE")),
"cat_2": pd.Categorical([1, 2], categories=[1, 2, 3]),
"value_1": np.random.uniform(size=2),
}
)
expected = df.groupby(["cat_1", "cat_2"], observed=True).agg("sum")
ddf = dd.from_pandas(df, 2)
actual = ddf.groupby(["cat_1", "cat_2"], observed=True).agg("sum")
assert_eq(expected, actual)
def test_rounding_negative_var():
x = [-0.00179999999 for _ in range(10)]
ids = [1 for _ in range(5)] + [2 for _ in range(5)]
df = pd.DataFrame({"ids": ids, "x": x})
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(ddf.groupby("ids").x.std(), df.groupby("ids").x.std())
@pytest.mark.parametrize("split_out", [2, 3])
@pytest.mark.parametrize("column", [["b", "c"], ["b", "d"], ["b", "e"]])
def test_groupby_split_out_multiindex(split_out, column):
df = pd.DataFrame(
{
"a": np.arange(8),
"b": [1, 0, 0, 2, 1, 1, 2, 0],
"c": [0, 1] * 4,
"d": ["dog", "cat", "cat", "dog", "dog", "dog", "cat", "bird"],
}
).fillna(0)
df["e"] = df["d"].astype("category")
ddf = dd.from_pandas(df, npartitions=3)
ddf_result_so1 = (
ddf.groupby(column, sort=False).a.mean(split_out=1).compute().dropna()
)
ddf_result = (
ddf.groupby(column, sort=False).a.mean(split_out=split_out).compute().dropna()
)
assert_eq(ddf_result, ddf_result_so1)
@pytest.mark.parametrize(
"backend",
[
"pandas",
pytest.param("cudf", marks=pytest.mark.gpu),
],
)
def test_groupby_large_ints_exception(backend):
data_source = pytest.importorskip(backend)
if backend == "cudf":
dask_cudf = pytest.importorskip("dask_cudf")
data_frame = dask_cudf.from_cudf
else:
data_frame = dd.from_pandas
max = np.iinfo(np.uint64).max
sqrt = max**0.5
series = data_source.Series(
np.concatenate([sqrt * np.arange(5), np.arange(35)])
).astype("int64")
df = data_source.DataFrame({"x": series, "z": np.arange(40), "y": np.arange(40)})
ddf = data_frame(df, npartitions=1)
assert_eq(
df.groupby("x").std(),
ddf.groupby("x").std().compute(scheduler="single-threaded"),
)
@pytest.mark.parametrize("by", ["a", "b", "c", ["a", "b"], ["a", "c"]])
@pytest.mark.parametrize("agg", ["count", "mean", "std"])
@pytest.mark.parametrize("sort", [True, False])
def test_groupby_sort_argument(by, agg, sort):
df = pd.DataFrame(
{
"a": [1, 2, 3, 4, None, None, 7, 8],
"b": [1, 0] * 4,
"c": ["a", "b", None, None, "e", "f", "g", "h"],
"e": [4, 5, 6, 3, 2, 1, 0, 0],
}
)
ddf = dd.from_pandas(df, npartitions=3)
gb = ddf.groupby(by, sort=sort)
gb_pd = df.groupby(by, sort=sort)
# Basic groupby aggregation
result_1 = getattr(gb, agg)
def result_1_pd():
with check_numeric_only_deprecation():
return getattr(gb_pd, agg)()
# Choose single column
result_2 = getattr(gb.e, agg)
result_2_pd = getattr(gb_pd.e, agg)
# Use `agg()` api
result_3 = gb.agg({"e": agg})
result_3_pd = gb_pd.agg({"e": agg})
if agg == "mean":
assert_eq(result_1(), result_1_pd().astype("float"))
assert_eq(result_2(), result_2_pd().astype("float"))
assert_eq(result_3, result_3_pd.astype("float"))
else:
assert_eq(result_1(), result_1_pd())
assert_eq(result_2(), result_2_pd())
assert_eq(result_3, result_3_pd)
@pytest.mark.parametrize("agg", [M.sum, M.prod, M.max, M.min])
@pytest.mark.parametrize("sort", [True, False])
def test_groupby_sort_argument_agg(agg, sort):
df = pd.DataFrame({"x": [4, 2, 1, 2, 3, 1], "y": [1, 2, 3, 4, 5, 6]})
ddf = dd.from_pandas(df, npartitions=3)
result = agg(ddf.groupby("x", sort=sort))
result_pd = agg(df.groupby("x", sort=sort))
assert_eq(result, result_pd)
if sort:
# Check order of index if sort==True
# (no guarantee that order will match otherwise)
assert_eq(result.index, result_pd.index)
def test_groupby_sort_true_split_out():
df = pd.DataFrame({"x": [4, 2, 1, 2, 3, 1], "y": [1, 2, 3, 4, 5, 6]})
ddf = dd.from_pandas(df, npartitions=3)
# Works fine for split_out==1 or sort=False/None
M.sum(ddf.groupby("x", sort=True), split_out=1)
M.sum(ddf.groupby("x", sort=False), split_out=2)
# Warns for sort=None
with pytest.warns(FutureWarning, match="split_out>1"):
M.sum(ddf.groupby("x"), split_out=2)
with pytest.raises(NotImplementedError):
# Cannot use sort=True with split_out>1 using non-shuffle-based approach
M.sum(ddf.groupby("x", sort=True), shuffle=False, split_out=2)
# Can use sort=True with split_out>1 with agg() if shuffle=True
ddf.groupby("x", sort=True).agg("sum", split_out=2, shuffle=True)
@pytest.mark.skipif(
not PANDAS_GT_110, reason="observed only supported for newer pandas"
)
@pytest.mark.parametrize("known_cats", [True, False], ids=["known", "unknown"])
@pytest.mark.parametrize("ordered_cats", [True, False], ids=["ordered", "unordererd"])
@pytest.mark.parametrize("groupby", ["cat_1", ["cat_1", "cat_2"]])
@pytest.mark.parametrize("observed", [True, False], ids=["observed", "unobserved"])
def test_groupby_aggregate_categorical_observed(
known_cats, ordered_cats, agg_func, groupby, observed
):
if agg_func in ["cov", "corr", "nunique"]:
pytest.skip("Not implemented for DataFrameGroupBy yet.")
if agg_func in ["sum", "count", "prod"] and groupby != "cat_1":
pytest.skip("Gives zeros rather than nans.")
if agg_func in ["std", "var"] and observed:
pytest.skip("Can't calculate observed with all nans")
pdf = pd.DataFrame(
{
"cat_1": pd.Categorical(
list("AB"), categories=list("ABCDE"), ordered=ordered_cats
),
"cat_2": pd.Categorical([1, 2], categories=[1, 2, 3], ordered=ordered_cats),
"value_1": np.random.uniform(size=2),
}
)
ddf = dd.from_pandas(pdf, 2)
if not known_cats:
ddf["cat_1"] = ddf["cat_1"].cat.as_unknown()
ddf["cat_2"] = ddf["cat_2"].cat.as_unknown()
def agg(grp, **kwargs):
if isinstance(grp, pd.core.groupby.DataFrameGroupBy):
ctx = check_numeric_only_deprecation
else:
ctx = contextlib.nullcontext
with ctx():
return getattr(grp, agg_func)(**kwargs)
# only include numeric columns when passing to "min" or "max"
# pandas default is numeric_only=False
if ordered_cats is False and agg_func in ["min", "max"] and groupby == "cat_1":
pdf = pdf[["cat_1", "value_1"]]
ddf = ddf[["cat_1", "value_1"]]
assert_eq(
agg(pdf.groupby(groupby, observed=observed)),
agg(ddf.groupby(groupby, observed=observed)),
)
def test_empty_partitions_with_value_counts():
# https://github.com/dask/dask/issues/7065
df = pd.DataFrame(
data=[
["a1", "b1"],
["a1", None],
["a1", "b1"],
[None, None],
[None, None],
[None, None],
["a3", "b3"],
["a3", "b3"],
["a5", "b5"],
],
columns=["A", "B"],
)
expected = df.groupby("A")["B"].value_counts()
ddf = dd.from_pandas(df, npartitions=3)
actual = ddf.groupby("A")["B"].value_counts()
assert_eq(expected, actual)
def test_groupby_with_pd_grouper():
ddf = dd.from_pandas(
pd.DataFrame(
{"key1": ["a", "b", "a"], "key2": ["c", "c", "c"], "value": [1, 2, 3]}
),
npartitions=3,
)
with pytest.raises(NotImplementedError):
ddf.groupby(pd.Grouper(key="key1"))
with pytest.raises(NotImplementedError):
ddf.groupby(["key1", pd.Grouper(key="key2")])
# TODO: Remove filter once https://github.com/pandas-dev/pandas/issues/46814 is resolved
@pytest.mark.filterwarnings("ignore:Invalid value encountered:RuntimeWarning")
@pytest.mark.parametrize("operation", ["head", "tail"])
def test_groupby_empty_partitions_with_rows_operation(operation):
df = pd.DataFrame(
data=[
["a1", "b1"],
["a1", None],
["a1", "b1"],
[None, None],
[None, None],
[None, None],
["a3", "b3"],
["a3", "b3"],
["a5", "b5"],
],
columns=["A", "B"],
)
caller = operator.methodcaller(operation, 1)
expected = caller(df.groupby("A")["B"])
ddf = dd.from_pandas(df, npartitions=3)
actual = caller(ddf.groupby("A")["B"])
assert_eq(expected, actual)
@pytest.mark.parametrize("operation", ["head", "tail"])
def test_groupby_with_row_operations(operation):
df = pd.DataFrame(
data=[
["a0", "b1"],
["a0", "b2"],
["a1", "b1"],
["a3", "b3"],
["a3", "b3"],
["a5", "b5"],
["a1", "b1"],
["a1", "b1"],
["a1", "b1"],
],
columns=["A", "B"],
)
caller = operator.methodcaller(operation)
expected = caller(df.groupby("A")["B"])
ddf = dd.from_pandas(df, npartitions=3)
actual = caller(ddf.groupby("A")["B"])
assert_eq(expected, actual)
@pytest.mark.parametrize("operation", ["head", "tail"])
def test_groupby_multi_index_with_row_operations(operation):
df = pd.DataFrame(
data=[
["a0", "b1"],
["a0", "b2"],
["a1", "b1"],
["a3", "b3"],
["a3", "b3"],
["a5", "b5"],
["a1", "b1"],
["a1", "b1"],
["a1", "b1"],
],
columns=["A", "B"],
)
caller = operator.methodcaller(operation)
expected = caller(df.groupby(["A", df["A"].eq("a1")])["B"])
ddf = dd.from_pandas(df, npartitions=3)
actual = caller(ddf.groupby(["A", ddf["A"].eq("a1")])["B"])
assert_eq(expected, actual)
def test_groupby_iter_fails():
df = pd.DataFrame(
data=[
["a0", "b1"],
["a1", "b1"],
["a3", "b3"],
["a5", "b5"],
],
columns=["A", "B"],
)
ddf = dd.from_pandas(df, npartitions=1)
with pytest.raises(NotImplementedError, match="computing the groups"):
list(ddf.groupby("A"))
def test_groupby_None_split_out_warns():
df = pd.DataFrame({"a": [1, 1, 2], "b": [2, 3, 4]})
ddf = dd.from_pandas(df, npartitions=1)
with pytest.warns(FutureWarning, match="split_out=None"):
ddf.groupby("a").agg({"b": "max"}, split_out=None)