Repository URL to install this package:
|
Version:
2022.10.0 ▾
|
import operator
import warnings
import numpy as np
import pandas as pd
import pytest
import dask
import dask.dataframe as dd
from dask.dataframe import _compat
from dask.dataframe._compat import check_numeric_only_deprecation, tm
from dask.dataframe.core import _concat
from dask.dataframe.utils import (
assert_eq,
clear_known_categories,
is_categorical_dtype,
make_meta,
)
# Generate a list of categorical series and indices
cat_series = []
for ordered in [True, False]:
s = pd.Series(pd.Categorical(list("bacbac"), ordered=ordered))
ds = dd.from_pandas(s, npartitions=2)
cat_series.append((s, ds))
s = pd.Series(range(6), index=pd.Categorical(list("bacbac")))
ds = dd.from_pandas(s, npartitions=2)
cat_series.append((ds.compute().index, ds.index))
a = pd.DataFrame(
{
"v": list("abcde"),
"w": list("xxxxx"),
"x": np.arange(5),
"y": list("abcbc"),
"z": np.arange(5, dtype="f8"),
}
)
b = pd.DataFrame(
{
"v": list("fghij"),
"w": list("yyyyy"),
"x": np.arange(5, 10),
"y": list("abbba"),
"z": np.arange(5, 10, dtype="f8"),
}
)
c = pd.DataFrame(
{
"v": list("klmno"),
"w": list("zzzzz"),
"x": np.arange(10, 15),
"y": list("bcbcc"),
"z": np.arange(10, 15, dtype="f8"),
}
)
frames = [a, b, c]
frames2 = []
for df in frames:
df.w = df.w.astype("category")
df.y = df.y.astype("category")
frames2.append(
df.assign(
w=df.w.cat.set_categories(list("xyz")),
y=df.y.cat.set_categories(list("abc")),
)
)
frames3 = [i.set_index(i.y) for i in frames]
frames4 = [i.set_index(i.y) for i in frames2]
frames5 = [i.set_index([i.y, i.x]) for i in frames]
frames6 = [i.set_index([i.y, i.x]) for i in frames2]
def test_concat_unions_categoricals():
# Categorical DataFrame, regular index
tm.assert_frame_equal(_concat(frames), pd.concat(frames2))
# Categorical Series, regular index
tm.assert_series_equal(
_concat([i.y for i in frames]), pd.concat([i.y for i in frames2])
)
# Categorical Index
tm.assert_index_equal(
_concat([i.index for i in frames3]), pd.concat([i for i in frames4]).index
)
# Categorical DataFrame, Categorical Index
tm.assert_frame_equal(_concat(frames3), pd.concat(frames4))
# Non-categorical DataFrame, Categorical Index
tm.assert_frame_equal(
_concat([i[["x", "z"]] for i in frames3]),
pd.concat([i[["x", "z"]] for i in frames4]),
)
# Categorical Series, Categorical Index
tm.assert_series_equal(
_concat([i.z for i in frames3]), pd.concat([i.z for i in frames4])
)
# Non-categorical Series, Categorical Index
tm.assert_series_equal(
_concat([i.x for i in frames3]), pd.concat([i.x for i in frames4])
)
# MultiIndex with Categorical Index
tm.assert_index_equal(
_concat([i.index for i in frames5]), pd.concat([i for i in frames6]).index
)
# DataFrame, MultiIndex with CategoricalIndex
tm.assert_frame_equal(_concat(frames5), pd.concat(frames6))
def test_unknown_categoricals(shuffle_method):
ddf = dd.DataFrame(
{("unknown", i): df for (i, df) in enumerate(frames)},
"unknown",
make_meta(
{"v": "object", "w": "category", "x": "i8", "y": "category", "z": "f8"},
parent_meta=frames[0],
),
[None] * 4,
)
# Compute
df = ddf.compute()
assert_eq(ddf.w.value_counts(), df.w.value_counts())
assert_eq(ddf.w.nunique(), df.w.nunique())
with check_numeric_only_deprecation():
expected = df.groupby(df.w).sum()
assert_eq(ddf.groupby(ddf.w).sum(), expected)
assert_eq(ddf.groupby(ddf.w).y.nunique(), df.groupby(df.w).y.nunique())
assert_eq(ddf.y.groupby(ddf.w).count(), df.y.groupby(df.w).count())
def test_is_categorical_dtype():
df = pd.DataFrame({"cat": pd.Categorical([1, 2, 3, 4]), "x": [1, 2, 3, 4]})
assert is_categorical_dtype(df["cat"])
assert not is_categorical_dtype(df["x"])
ddf = dd.from_pandas(df, 2)
assert is_categorical_dtype(ddf["cat"])
assert not is_categorical_dtype(ddf["x"])
def test_categorize():
# rename y to y_ to avoid pandas future warning about ambiguous
# levels
meta = clear_known_categories(frames4[0]).rename(columns={"y": "y_"})
ddf = dd.DataFrame(
{("unknown", i): df for (i, df) in enumerate(frames3)},
"unknown",
meta,
[None] * 4,
).rename(columns={"y": "y_"})
ddf = ddf.assign(w=ddf.w.cat.set_categories(["x", "y", "z"]))
assert ddf.w.cat.known
assert not ddf.y_.cat.known
assert not ddf.index.cat.known
df = ddf.compute()
for index in [None, True, False]:
known_index = index is not False
# By default categorize object and unknown cat columns
ddf2 = ddf.categorize(index=index)
assert ddf2.y_.cat.known
assert ddf2.v.cat.known
assert ddf2.index.cat.known == known_index
assert_eq(ddf2, df.astype({"v": "category"}), check_categorical=False)
# Specifying split_every works
ddf2 = ddf.categorize(index=index, split_every=2)
assert ddf2.y_.cat.known
assert ddf2.v.cat.known
assert ddf2.index.cat.known == known_index
assert_eq(ddf2, df.astype({"v": "category"}), check_categorical=False)
# Specifying one column doesn't affect others
ddf2 = ddf.categorize("v", index=index)
assert not ddf2.y_.cat.known
assert ddf2.v.cat.known
assert ddf2.index.cat.known == known_index
assert_eq(ddf2, df.astype({"v": "category"}), check_categorical=False)
ddf2 = ddf.categorize("y_", index=index)
assert ddf2.y_.cat.known
assert ddf2.v.dtype == "object"
assert ddf2.index.cat.known == known_index
assert_eq(ddf2, df)
ddf_known_index = ddf.categorize(columns=[], index=True)
assert ddf_known_index.index.cat.known
assert_eq(ddf_known_index, df)
# Specifying known categorical or no columns is a no-op:
assert ddf.categorize(["w"], index=False) is ddf
assert ddf.categorize([], index=False) is ddf
assert ddf_known_index.categorize(["w"]) is ddf_known_index
assert ddf_known_index.categorize([]) is ddf_known_index
# Bad split_every fails
with pytest.raises(ValueError):
ddf.categorize(split_every=1)
with pytest.raises(ValueError):
ddf.categorize(split_every="foo")
def test_categorical_dtype():
cat_dtype = dd.categorical.categorical_dtype(
meta=a, categories=["a", "b", "c"], ordered=False
)
assert_eq(cat_dtype.categories, pd.Index(["a", "b", "c"]))
assert_eq(cat_dtype.ordered, False)
cat_dtype = dd.categorical.categorical_dtype(meta=a, categories=["a", "b", "c"])
assert_eq(cat_dtype.categories, pd.Index(["a", "b", "c"]))
assert_eq(cat_dtype.ordered, False)
cat_dtype = dd.categorical.categorical_dtype(
meta=a, categories=[1, 100, 200], ordered=True
)
assert_eq(cat_dtype.categories, pd.Index([1, 100, 200]))
assert_eq(cat_dtype.ordered, True)
def test_categorize_index():
# Object dtype
ddf = dd.from_pandas(_compat.makeDataFrame(), npartitions=5)
df = ddf.compute()
ddf2 = ddf.categorize()
assert ddf2.index.cat.known
assert_eq(
ddf2,
df.set_index(pd.CategoricalIndex(df.index)),
check_divisions=False,
check_categorical=False,
)
assert ddf.categorize(index=False) is ddf
# Non-object dtype
ddf = dd.from_pandas(df.set_index(df.A.rename("idx")), npartitions=5)
df = ddf.compute()
ddf2 = ddf.categorize(index=True)
assert ddf2.index.cat.known
assert_eq(
ddf2,
df.set_index(pd.CategoricalIndex(df.index)),
check_divisions=False,
check_categorical=False,
)
assert ddf.categorize() is ddf
def test_categorical_set_index(shuffle_method):
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": ["a", "b", "b", "c"]})
df["y"] = pd.Categorical(df["y"], categories=["a", "b", "c"], ordered=True)
a = dd.from_pandas(df, npartitions=2)
with dask.config.set(scheduler="sync"):
b = a.set_index("y", npartitions=a.npartitions)
d1, d2 = b.get_partition(0), b.get_partition(1)
assert list(d1.index.compute()) == ["a"]
assert list(sorted(d2.index.compute())) == ["b", "b", "c"]
b = a.set_index(a.y, npartitions=a.npartitions)
d1, d2 = b.get_partition(0), b.get_partition(1)
assert list(d1.index.compute()) == ["a"]
assert list(sorted(d2.index.compute())) == ["b", "b", "c"]
b = a.set_index("y", divisions=["a", "b", "c"], npartitions=a.npartitions)
d1, d2 = b.get_partition(0), b.get_partition(1)
assert list(d1.index.compute()) == ["a"]
assert list(sorted(d2.index.compute())) == ["b", "b", "c"]
@pytest.mark.parametrize("ncategories", [1, 3, 6])
@pytest.mark.parametrize("npartitions", [1, 3, 6])
def test_categorical_set_index_npartitions_vs_ncategories(npartitions, ncategories):
"""https://github.com/dask/dask/issues/5343"""
rows_per_category = 10
n_rows = ncategories * rows_per_category
categories = ["CAT" + str(i) for i in range(ncategories)]
pdf = pd.DataFrame(
{"id": categories * rows_per_category, "value": np.random.random(n_rows)}
)
ddf = dd.from_pandas(pdf, npartitions=npartitions)
ddf["id"] = ddf["id"].astype("category").cat.as_ordered()
ddf = ddf.set_index("id")
# Test passes if this worked and didn't raise any warnings
@pytest.mark.parametrize("npartitions", [1, 4])
def test_repartition_on_categoricals(npartitions):
df = pd.DataFrame({"x": range(10), "y": list("abababcbcb")})
ddf = dd.from_pandas(df, npartitions=2)
ddf["y"] = ddf["y"].astype("category")
ddf2 = ddf.repartition(npartitions=npartitions)
df = df.copy()
df["y"] = df["y"].astype("category")
assert_eq(df, ddf)
assert_eq(df, ddf2)
def test_categorical_accessor_presence():
df = pd.DataFrame({"x": list("a" * 5 + "b" * 5 + "c" * 5), "y": range(15)})
df.x = df.x.astype("category")
ddf = dd.from_pandas(df, npartitions=2)
assert "cat" in dir(ddf.x)
assert "cat" not in dir(ddf.y)
assert hasattr(ddf.x, "cat")
assert not hasattr(ddf.y, "cat")
df2 = df.set_index(df.x)
ddf2 = dd.from_pandas(df2, npartitions=2, sort=False)
assert hasattr(ddf2.index, "categories")
assert not hasattr(ddf.index, "categories")
def test_categorize_nan():
df = dd.from_pandas(
pd.DataFrame({"A": ["a", "b", "a", float("nan")]}), npartitions=2
)
with warnings.catch_warnings(record=True) as record:
df.categorize().compute()
assert not record
def get_cat(x):
return x if isinstance(x, pd.CategoricalIndex) else x.cat
def assert_array_index_eq(left, right, check_divisions=False):
"""left and right are equal, treating index and array as equivalent"""
assert_eq(
left,
pd.Index(right) if isinstance(right, np.ndarray) else right,
check_divisions=check_divisions,
)
def test_return_type_known_categories():
df = pd.DataFrame({"A": ["a", "b", "c"]})
df["A"] = df["A"].astype("category")
dask_df = dd.from_pandas(df, 2)
ret_type = dask_df.A.cat.as_known()
assert isinstance(ret_type, dd.core.Series)
class TestCategoricalAccessor:
@pytest.mark.parametrize("series", cat_series)
@pytest.mark.parametrize(
"prop, compare",
[
("categories", assert_array_index_eq),
("ordered", assert_eq),
("codes", assert_array_index_eq),
],
)
def test_properties(self, series, prop, compare):
s, ds = series
expected = getattr(get_cat(s), prop)
result = getattr(get_cat(ds), prop)
compare(result, expected, check_divisions=False)
@pytest.mark.parametrize("series", cat_series)
@pytest.mark.parametrize(
"method, kwargs",
[
("add_categories", dict(new_categories=["d", "e"])),
("as_ordered", {}),
("as_unordered", {}),
("as_ordered", {}),
("remove_categories", dict(removals=["a"])),
("rename_categories", dict(new_categories=["d", "e", "f"])),
("reorder_categories", dict(new_categories=["a", "b", "c"])),
("set_categories", dict(new_categories=["a", "e", "b"])),
("remove_unused_categories", {}),
],
)
def test_callable(self, series, method, kwargs):
op = operator.methodcaller(method, **kwargs)
# Series
s, ds = series
expected = op(get_cat(s))
result = op(get_cat(ds))
assert_eq(result, expected, check_divisions=False)
assert_eq(
get_cat(result._meta).categories,
get_cat(expected).categories,
check_divisions=False,
)
assert_eq(
get_cat(result._meta).ordered,
get_cat(expected).ordered,
check_divisions=False,
)
def test_categorical_empty(self):
# GH 1705
def make_empty():
return pd.DataFrame({"A": pd.Categorical([np.nan, np.nan])})
def make_full():
return pd.DataFrame({"A": pd.Categorical(["a", "a"])})
a = dd.from_delayed([dask.delayed(make_empty)(), dask.delayed(make_full)()])
# Used to raise an IndexError
a.A.cat.categories
@pytest.mark.parametrize("series", cat_series)
def test_unknown_categories(self, series):
a, da = series
assert da.cat.known
da = da.cat.as_unknown()
assert not da.cat.known
with pytest.raises(NotImplementedError, match="with unknown categories"):
da.cat.categories
with pytest.raises(NotImplementedError, match="with unknown categories"):
da.cat.codes
# Also AttributeError so glob searching in IPython such as `da.cat.*?` works
with pytest.raises(AttributeError, match="with unknown categories"):
da.cat.categories
with pytest.raises(AttributeError, match="with unknown categories"):
da.cat.codes
db = da.cat.set_categories(["a", "b", "c"])
assert db.cat.known
tm.assert_index_equal(db.cat.categories, get_cat(a).categories)
assert_array_index_eq(db.cat.codes, get_cat(a).codes)
db = da.cat.as_known()
assert db.cat.known
res = db.compute()
tm.assert_index_equal(db.cat.categories, get_cat(res).categories)
assert_array_index_eq(db.cat.codes, get_cat(res).codes)
def test_categorical_string_ops(self):
a = pd.Series(["a", "a", "b"], dtype="category")
da = dd.from_pandas(a, 2)
result = da.str.upper()
expected = a.str.upper()
assert_eq(result, expected)
def test_categorical_non_string_raises(self):
a = pd.Series([1, 2, 3], dtype="category")
da = dd.from_pandas(a, 2)
with pytest.raises(AttributeError):
da.str.upper()