Repository URL to install this package:
|
Version:
2022.10.0 ▾
|
import warnings
from datetime import datetime
import numpy as np
import pandas as pd
import pytest
from pandas.api.types import is_scalar
import dask.dataframe as dd
from dask.dataframe._compat import (
PANDAS_GT_120,
PANDAS_VERSION,
check_numeric_only_deprecation,
)
from dask.dataframe.utils import assert_dask_graph, assert_eq, make_meta
try:
import scipy
except ImportError:
scipy = None
@pytest.mark.slow
def test_arithmetics():
dsk = {
("x", 0): pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]),
("x", 1): pd.DataFrame({"a": [4, 5, 6], "b": [3, 2, 1]}, index=[5, 6, 8]),
("x", 2): pd.DataFrame({"a": [7, 8, 9], "b": [0, 0, 0]}, index=[9, 9, 9]),
}
meta = make_meta(
{"a": "i8", "b": "i8"}, index=pd.Index([], "i8"), parent_meta=pd.DataFrame()
)
ddf1 = dd.DataFrame(dsk, "x", meta, [0, 4, 9, 9])
pdf1 = ddf1.compute()
pdf2 = pd.DataFrame({"a": [1, 2, 3, 4, 5, 6, 7, 8], "b": [5, 6, 7, 8, 1, 2, 3, 4]})
pdf3 = pd.DataFrame({"a": [5, 6, 7, 8, 4, 3, 2, 1], "b": [2, 4, 5, 3, 4, 2, 1, 0]})
ddf2 = dd.from_pandas(pdf2, 3)
ddf3 = dd.from_pandas(pdf3, 2)
dsk4 = {
("y", 0): pd.DataFrame({"a": [3, 2, 1], "b": [7, 8, 9]}, index=[0, 1, 3]),
("y", 1): pd.DataFrame({"a": [5, 2, 8], "b": [4, 2, 3]}, index=[5, 6, 8]),
("y", 2): pd.DataFrame({"a": [1, 4, 10], "b": [1, 0, 5]}, index=[9, 9, 9]),
}
ddf4 = dd.DataFrame(dsk4, "y", meta, [0, 4, 9, 9])
pdf4 = ddf4.compute()
# Arithmetics
cases = [
(ddf1, ddf1, pdf1, pdf1),
(ddf1, ddf1.repartition([0, 1, 3, 6, 9]), pdf1, pdf1),
(ddf2, ddf3, pdf2, pdf3),
(ddf2.repartition([0, 3, 6, 7]), ddf3.repartition([0, 7]), pdf2, pdf3),
(ddf2.repartition([0, 7]), ddf3.repartition([0, 2, 4, 5, 7]), pdf2, pdf3),
(ddf1, ddf4, pdf1, pdf4),
(ddf1, ddf4.repartition([0, 9]), pdf1, pdf4),
(ddf1.repartition([0, 3, 9]), ddf4.repartition([0, 5, 9]), pdf1, pdf4),
# dask + pandas
(ddf1, pdf4, pdf1, pdf4),
(ddf2, pdf3, pdf2, pdf3),
]
for (l, r, el, er) in cases:
check_series_arithmetics(l.a, r.b, el.a, er.b)
check_frame_arithmetics(l, r, el, er)
# different index, pandas raises ValueError in comparison ops
pdf5 = pd.DataFrame(
{"a": [3, 2, 1, 5, 2, 8, 1, 4, 10], "b": [7, 8, 9, 4, 2, 3, 1, 0, 5]},
index=[0, 1, 3, 5, 6, 8, 9, 9, 9],
)
ddf5 = dd.from_pandas(pdf5, 2)
pdf6 = pd.DataFrame(
{"a": [3, 2, 1, 5, 2, 8, 1, 4, 10], "b": [7, 8, 9, 5, 7, 8, 4, 2, 5]},
index=[0, 1, 2, 3, 4, 5, 6, 7, 9],
)
ddf6 = dd.from_pandas(pdf6, 4)
pdf7 = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8], "b": [5, 6, 7, 8, 1, 2, 3, 4]},
index=list("aaabcdeh"),
)
pdf8 = pd.DataFrame(
{"a": [5, 6, 7, 8, 4, 3, 2, 1], "b": [2, 4, 5, 3, 4, 2, 1, 0]},
index=list("abcdefgh"),
)
ddf7 = dd.from_pandas(pdf7, 3)
ddf8 = dd.from_pandas(pdf8, 4)
pdf9 = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5, 6, 7, 8],
"b": [5, 6, 7, 8, 1, 2, 3, 4],
"c": [5, 6, 7, 8, 1, 2, 3, 4],
},
index=list("aaabcdeh"),
)
pdf10 = pd.DataFrame(
{
"b": [5, 6, 7, 8, 4, 3, 2, 1],
"c": [2, 4, 5, 3, 4, 2, 1, 0],
"d": [2, 4, 5, 3, 4, 2, 1, 0],
},
index=list("abcdefgh"),
)
ddf9 = dd.from_pandas(pdf9, 3)
ddf10 = dd.from_pandas(pdf10, 4)
# Arithmetics with different index
cases = [
(ddf5, ddf6, pdf5, pdf6),
(ddf5.repartition([0, 9]), ddf6, pdf5, pdf6),
(ddf5.repartition([0, 5, 9]), ddf6.repartition([0, 7, 9]), pdf5, pdf6),
(ddf7, ddf8, pdf7, pdf8),
(ddf7.repartition(["a", "c", "h"]), ddf8.repartition(["a", "h"]), pdf7, pdf8),
(
ddf7.repartition(["a", "b", "e", "h"]),
ddf8.repartition(["a", "e", "h"]),
pdf7,
pdf8,
),
(ddf9, ddf10, pdf9, pdf10),
(ddf9.repartition(["a", "c", "h"]), ddf10.repartition(["a", "h"]), pdf9, pdf10),
# dask + pandas
(ddf5, pdf6, pdf5, pdf6),
(ddf7, pdf8, pdf7, pdf8),
(ddf9, pdf10, pdf9, pdf10),
]
for (l, r, el, er) in cases:
check_series_arithmetics(l.a, r.b, el.a, er.b, allow_comparison_ops=False)
check_frame_arithmetics(l, r, el, er, allow_comparison_ops=False)
def test_deterministic_arithmetic_names():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [5, 6, 7, 8]})
a = dd.from_pandas(df, npartitions=2)
assert sorted((a.x + a.y**2).dask) == sorted((a.x + a.y**2).dask)
assert sorted((a.x + a.y**2).dask) != sorted((a.x + a.y**3).dask)
assert sorted((a.x + a.y**2).dask) != sorted((a.x - a.y**2).dask)
@pytest.mark.slow
def test_arithmetics_different_index():
# index are different, but overwraps
pdf1 = pd.DataFrame(
{"a": [1, 2, 3, 4, 5], "b": [3, 5, 2, 5, 7]}, index=[1, 2, 3, 4, 5]
)
ddf1 = dd.from_pandas(pdf1, 2)
pdf2 = pd.DataFrame(
{"a": [3, 2, 6, 7, 8], "b": [9, 4, 2, 6, 2]}, index=[3, 4, 5, 6, 7]
)
ddf2 = dd.from_pandas(pdf2, 2)
# index are not overwrapped
pdf3 = pd.DataFrame(
{"a": [1, 2, 3, 4, 5], "b": [3, 5, 2, 5, 7]}, index=[1, 2, 3, 4, 5]
)
ddf3 = dd.from_pandas(pdf3, 2)
pdf4 = pd.DataFrame(
{"a": [3, 2, 6, 7, 8], "b": [9, 4, 2, 6, 2]}, index=[10, 11, 12, 13, 14]
)
ddf4 = dd.from_pandas(pdf4, 2)
# index is included in another
pdf5 = pd.DataFrame(
{"a": [1, 2, 3, 4, 5], "b": [3, 5, 2, 5, 7]}, index=[1, 3, 5, 7, 9]
)
ddf5 = dd.from_pandas(pdf5, 2)
pdf6 = pd.DataFrame(
{"a": [3, 2, 6, 7, 8], "b": [9, 4, 2, 6, 2]}, index=[2, 3, 4, 5, 6]
)
ddf6 = dd.from_pandas(pdf6, 2)
cases = [
(ddf1, ddf2, pdf1, pdf2),
(ddf2, ddf1, pdf2, pdf1),
(ddf1.repartition([1, 3, 5]), ddf2.repartition([3, 4, 7]), pdf1, pdf2),
(ddf2.repartition([3, 4, 5, 7]), ddf1.repartition([1, 2, 4, 5]), pdf2, pdf1),
(ddf3, ddf4, pdf3, pdf4),
(ddf4, ddf3, pdf4, pdf3),
(
ddf3.repartition([1, 2, 3, 4, 5]),
ddf4.repartition([10, 11, 12, 13, 14]),
pdf3,
pdf4,
),
(ddf4.repartition([10, 14]), ddf3.repartition([1, 3, 4, 5]), pdf4, pdf3),
(ddf5, ddf6, pdf5, pdf6),
(ddf6, ddf5, pdf6, pdf5),
(ddf5.repartition([1, 7, 8, 9]), ddf6.repartition([2, 3, 4, 6]), pdf5, pdf6),
(ddf6.repartition([2, 6]), ddf5.repartition([1, 3, 7, 9]), pdf6, pdf5),
# dask + pandas
(ddf1, pdf2, pdf1, pdf2),
(ddf2, pdf1, pdf2, pdf1),
(ddf3, pdf4, pdf3, pdf4),
(ddf4, pdf3, pdf4, pdf3),
(ddf5, pdf6, pdf5, pdf6),
(ddf6, pdf5, pdf6, pdf5),
]
for (l, r, el, er) in cases:
check_series_arithmetics(l.a, r.b, el.a, er.b, allow_comparison_ops=False)
check_frame_arithmetics(l, r, el, er, allow_comparison_ops=False)
pdf7 = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8], "b": [5, 6, 7, 8, 1, 2, 3, 4]},
index=[0, 2, 4, 8, 9, 10, 11, 13],
)
pdf8 = pd.DataFrame(
{"a": [5, 6, 7, 8, 4, 3, 2, 1], "b": [2, 4, 5, 3, 4, 2, 1, 0]},
index=[1, 3, 4, 8, 9, 11, 12, 13],
)
ddf7 = dd.from_pandas(pdf7, 3)
ddf8 = dd.from_pandas(pdf8, 2)
pdf9 = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6, 7, 8], "b": [5, 6, 7, 8, 1, 2, 3, 4]},
index=[0, 2, 4, 8, 9, 10, 11, 13],
)
pdf10 = pd.DataFrame(
{"a": [5, 6, 7, 8, 4, 3, 2, 1], "b": [2, 4, 5, 3, 4, 2, 1, 0]},
index=[0, 3, 4, 8, 9, 11, 12, 13],
)
ddf9 = dd.from_pandas(pdf9, 3)
ddf10 = dd.from_pandas(pdf10, 2)
cases = [
(ddf7, ddf8, pdf7, pdf8),
(ddf8, ddf7, pdf8, pdf7),
# (ddf7.repartition([0, 13]),
# ddf8.repartition([0, 4, 11, 14], force=True),
# pdf7, pdf8),
(
ddf8.repartition([-5, 10, 15], force=True),
ddf7.repartition([-1, 4, 11, 14], force=True),
pdf8,
pdf7,
),
(
ddf7.repartition([0, 8, 12, 13]),
ddf8.repartition([0, 2, 8, 12, 13], force=True),
pdf7,
pdf8,
),
(
ddf8.repartition([-5, 0, 10, 20], force=True),
ddf7.repartition([-1, 4, 11, 13], force=True),
pdf8,
pdf7,
),
(ddf9, ddf10, pdf9, pdf10),
(ddf10, ddf9, pdf10, pdf9),
# dask + pandas
(ddf7, pdf8, pdf7, pdf8),
(ddf8, pdf7, pdf8, pdf7),
(ddf9, pdf10, pdf9, pdf10),
(ddf10, pdf9, pdf10, pdf9),
]
for (l, r, el, er) in cases:
check_series_arithmetics(l.a, r.b, el.a, er.b, allow_comparison_ops=False)
check_frame_arithmetics(l, r, el, er, allow_comparison_ops=False)
def check_series_arithmetics(l, r, el, er, allow_comparison_ops=True):
assert isinstance(l, dd.Series)
assert isinstance(r, (dd.Series, pd.Series))
assert isinstance(el, pd.Series)
assert isinstance(er, pd.Series)
# l, r may be repartitioned, test whether repartition keeps original data
assert_eq(l, el)
assert_eq(r, er)
assert_eq(l + r, el + er)
assert_eq(l * r, el * er)
assert_eq(l - r, el - er)
assert_eq(l / r, el / er)
assert_eq(l // r, el // er)
assert_eq(l**r, el**er)
assert_eq(l % r, el % er)
if allow_comparison_ops:
# comparison is allowed if data have same index
assert_eq(l & r, el & er)
assert_eq(l | r, el | er)
assert_eq(l ^ r, el ^ er)
assert_eq(l > r, el > er)
assert_eq(l < r, el < er)
assert_eq(l >= r, el >= er)
assert_eq(l <= r, el <= er)
assert_eq(l == r, el == er)
assert_eq(l != r, el != er)
assert_eq(l.lt(r), el.lt(er))
assert_eq(l.gt(r), el.gt(er))
assert_eq(l.le(r), el.le(er))
assert_eq(l.ge(r), el.ge(er))
assert_eq(l.ne(r), el.ne(er))
assert_eq(l.eq(r), el.eq(er))
assert_eq(l + 2, el + 2)
assert_eq(l * 2, el * 2)
assert_eq(l - 2, el - 2)
assert_eq(l / 2, el / 2)
assert_eq(l & True, el & True)
assert_eq(l | True, el | True)
assert_eq(l ^ True, el ^ True)
assert_eq(l // 2, el // 2)
assert_eq(l**2, el**2)
assert_eq(l % 2, el % 2)
assert_eq(l > 2, el > 2)
assert_eq(l < 2, el < 2)
assert_eq(l >= 2, el >= 2)
assert_eq(l <= 2, el <= 2)
assert_eq(l == 2, el == 2)
assert_eq(l != 2, el != 2)
assert_eq(2 + r, 2 + er)
assert_eq(2 * r, 2 * er)
assert_eq(2 - r, 2 - er)
assert_eq(2 / r, 2 / er)
assert_eq(True & r, True & er)
assert_eq(True | r, True | er)
assert_eq(True ^ r, True ^ er)
assert_eq(2 // r, 2 // er)
assert_eq(2**r, 2**er)
assert_eq(2 % r, 2 % er)
assert_eq(2 > r, 2 > er)
assert_eq(2 < r, 2 < er)
assert_eq(2 >= r, 2 >= er)
assert_eq(2 <= r, 2 <= er)
assert_eq(2 == r, 2 == er)
assert_eq(2 != r, 2 != er)
assert_eq(l.lt(2), el.lt(2))
assert_eq(l.gt(2), el.gt(2))
assert_eq(l.le(2), el.le(2))
assert_eq(l.ge(2), el.ge(2))
assert_eq(l.ne(2), el.ne(2))
assert_eq(l.eq(2), el.eq(2))
assert_eq(-l, -el)
assert_eq(abs(l), abs(el))
if allow_comparison_ops:
# comparison is allowed if data have same index
assert_eq(~(l == r), ~(el == er))
def check_frame_arithmetics(l, r, el, er, allow_comparison_ops=True):
assert isinstance(l, dd.DataFrame)
assert isinstance(r, (dd.DataFrame, pd.DataFrame))
assert isinstance(el, pd.DataFrame)
assert isinstance(er, pd.DataFrame)
# l, r may be repartitioned, test whether repartition keeps original data
assert_eq(l, el)
assert_eq(r, er)
assert_eq(l + r, el + er)
assert_eq(l * r, el * er)
assert_eq(l - r, el - er)
assert_eq(l / r, el / er)
assert_eq(l // r, el // er)
assert_eq(l**r, el**er)
assert_eq(l % r, el % er)
if allow_comparison_ops:
# comparison is allowed if data have same index
assert_eq(l & r, el & er)
assert_eq(l | r, el | er)
assert_eq(l ^ r, el ^ er)
assert_eq(l > r, el > er)
assert_eq(l < r, el < er)
assert_eq(l >= r, el >= er)
assert_eq(l <= r, el <= er)
assert_eq(l == r, el == er)
assert_eq(l != r, el != er)
assert_eq(l.lt(r), el.lt(er))
assert_eq(l.gt(r), el.gt(er))
assert_eq(l.le(r), el.le(er))
assert_eq(l.ge(r), el.ge(er))
assert_eq(l.ne(r), el.ne(er))
assert_eq(l.eq(r), el.eq(er))
assert_eq(l + 2, el + 2)
assert_eq(l * 2, el * 2)
assert_eq(l - 2, el - 2)
assert_eq(l / 2, el / 2)
assert_eq(l & True, el & True)
assert_eq(l | True, el | True)
assert_eq(l ^ True, el ^ True)
assert_eq(l // 2, el // 2)
assert_eq(l**2, el**2)
assert_eq(l % 2, el % 2)
assert_eq(l > 2, el > 2)
assert_eq(l < 2, el < 2)
assert_eq(l >= 2, el >= 2)
assert_eq(l <= 2, el <= 2)
assert_eq(l == 2, el == 2)
assert_eq(l != 2, el != 2)
assert_eq(2 + l, 2 + el)
assert_eq(2 * l, 2 * el)
assert_eq(2 - l, 2 - el)
assert_eq(2 / l, 2 / el)
assert_eq(True & l, True & el)
assert_eq(True | l, True | el)
assert_eq(True ^ l, True ^ el)
assert_eq(2 // l, 2 // el)
assert_eq(2**l, 2**el)
assert_eq(2 % l, 2 % el)
assert_eq(2 > l, 2 > el)
assert_eq(2 < l, 2 < el)
assert_eq(2 >= l, 2 >= el)
assert_eq(2 <= l, 2 <= el)
assert_eq(2 == l, 2 == el)
assert_eq(2 != l, 2 != el)
assert_eq(l.lt(2), el.lt(2))
assert_eq(l.gt(2), el.gt(2))
assert_eq(l.le(2), el.le(2))
assert_eq(l.ge(2), el.ge(2))
assert_eq(l.ne(2), el.ne(2))
assert_eq(l.eq(2), el.eq(2))
assert_eq(-l, -el)
assert_eq(abs(l), abs(el))
if allow_comparison_ops:
# comparison is allowed if data have same index
assert_eq(~(l == r), ~(el == er))
def test_scalar_arithmetics():
el = np.int64(10)
er = np.int64(4)
l = dd.core.Scalar({("l", 0): el}, "l", "i8")
r = dd.core.Scalar({("r", 0): er}, "r", "i8")
assert isinstance(l, dd.core.Scalar)
assert isinstance(r, dd.core.Scalar)
assert_eq(l, el)
assert_eq(r, er)
assert_eq(l + r, el + er)
assert_eq(l * r, el * er)
assert_eq(l - r, el - er)
assert_eq(l / r, el / er)
assert_eq(l // r, el // er)
assert_eq(l**r, el**er)
assert_eq(l % r, el % er)
assert_eq(l & r, el & er)
assert_eq(l | r, el | er)
assert_eq(l ^ r, el ^ er)
assert_eq(l > r, el > er)
assert_eq(l < r, el < er)
assert_eq(l >= r, el >= er)
assert_eq(l <= r, el <= er)
assert_eq(l == r, el == er)
assert_eq(l != r, el != er)
assert_eq(l + 2, el + 2)
assert_eq(l * 2, el * 2)
assert_eq(l - 2, el - 2)
assert_eq(l / 2, el / 2)
assert_eq(l & True, el & True)
assert_eq(l | True, el | True)
assert_eq(l ^ True, el ^ True)
assert_eq(l // 2, el // 2)
assert_eq(l**2, el**2)
assert_eq(l % 2, el % 2)
assert_eq(l > 2, el > 2)
assert_eq(l < 2, el < 2)
assert_eq(l >= 2, el >= 2)
assert_eq(l <= 2, el <= 2)
assert_eq(l == 2, el == 2)
assert_eq(l != 2, el != 2)
assert_eq(2 + r, 2 + er)
assert_eq(2 * r, 2 * er)
assert_eq(2 - r, 2 - er)
assert_eq(2 / r, 2 / er)
assert_eq(True & r, True & er)
assert_eq(True | r, True | er)
assert_eq(True ^ r, True ^ er)
assert_eq(2 // r, 2 // er)
assert_eq(2**r, 2**er)
assert_eq(2 % r, 2 % er)
assert_eq(2 > r, 2 > er)
assert_eq(2 < r, 2 < er)
assert_eq(2 >= r, 2 >= er)
assert_eq(2 <= r, 2 <= er)
assert_eq(2 == r, 2 == er)
assert_eq(2 != r, 2 != er)
assert_eq(-l, -el)
assert_eq(abs(l), abs(el))
assert_eq(~(l == r), ~(el == er))
def test_scalar_arithmetics_with_dask_instances():
s = dd.core.Scalar({("s", 0): 10}, "s", "i8")
e = 10
pds = pd.Series([1, 2, 3, 4, 5, 6, 7])
dds = dd.from_pandas(pds, 2)
pdf = pd.DataFrame({"a": [1, 2, 3, 4, 5, 6, 7], "b": [7, 6, 5, 4, 3, 2, 1]})
ddf = dd.from_pandas(pdf, 2)
# pandas Series
result = pds + s # this result pd.Series (automatically computed)
assert isinstance(result, pd.Series)
assert_eq(result, pds + e)
result = s + pds # this result dd.Series
assert isinstance(result, dd.Series)
assert_eq(result, pds + e)
# dask Series
result = dds + s # this result dd.Series
assert isinstance(result, dd.Series)
assert_eq(result, pds + e)
result = s + dds # this result dd.Series
assert isinstance(result, dd.Series)
assert_eq(result, pds + e)
# pandas DataFrame
result = pdf + s # this result pd.DataFrame (automatically computed)
assert isinstance(result, pd.DataFrame)
assert_eq(result, pdf + e)
result = s + pdf # this result dd.DataFrame
assert isinstance(result, dd.DataFrame)
assert_eq(result, pdf + e)
# dask DataFrame
result = ddf + s # this result dd.DataFrame
assert isinstance(result, dd.DataFrame)
assert_eq(result, pdf + e)
result = s + ddf # this result dd.DataFrame
assert isinstance(result, dd.DataFrame)
assert_eq(result, pdf + e)
@pytest.mark.xfail(
PANDAS_VERSION == "1.0.2",
reason="https://github.com/pandas-dev/pandas/issues/32685",
)
def test_frame_series_arithmetic_methods():
pdf1 = pd.DataFrame(
{
"A": np.arange(10),
"B": [np.nan, 1, 2, 3, 4] * 2,
"C": [np.nan] * 10,
"D": np.arange(10),
},
index=list("abcdefghij"),
columns=list("ABCD"),
)
pdf2 = pd.DataFrame(
np.random.randn(10, 4), index=list("abcdefghjk"), columns=list("ABCX")
)
ps1 = pdf1.A
ps2 = pdf2.A
ps3 = pd.Series(np.random.randn(10), index=list("ABCDXabcde"))
ddf1 = dd.from_pandas(pdf1, 2)
ddf2 = dd.from_pandas(pdf2, 2)
ds1 = ddf1.A
ds2 = ddf2.A
s = dd.core.Scalar({("s", 0): 4}, "s", "i8")
for l, r, el, er in [
(ddf1, ddf2, pdf1, pdf2),
(ds1, ds2, ps1, ps2),
(ddf1.repartition(["a", "f", "j"]), ddf2, pdf1, pdf2),
(ds1.repartition(["a", "b", "f", "j"]), ds2, ps1, ps2),
(ddf1, ddf2.repartition(["a", "k"]), pdf1, pdf2),
(ds1, ds2.repartition(["a", "b", "d", "h", "k"]), ps1, ps2),
(ddf1, 3, pdf1, 3),
(ds1, 3, ps1, 3),
(ddf1, s, pdf1, 4),
(ds1, s, ps1, 4),
]:
# l, r may be repartitioned, test whether repartition keeps original data
assert_eq(l, el)
assert_eq(r, er)
assert_eq(l.add(r, fill_value=0), el.add(er, fill_value=0))
assert_eq(l.sub(r, fill_value=0), el.sub(er, fill_value=0))
assert_eq(l.mul(r, fill_value=0), el.mul(er, fill_value=0))
assert_eq(l.div(r, fill_value=0), el.div(er, fill_value=0))
assert_eq(l.divide(r, fill_value=0), el.divide(er, fill_value=0))
assert_eq(l.truediv(r, fill_value=0), el.truediv(er, fill_value=0))
assert_eq(l.floordiv(r, fill_value=1), el.floordiv(er, fill_value=1))
assert_eq(l.pow(r, fill_value=0), el.pow(er, fill_value=0))
assert_eq(l.mod(r, fill_value=0), el.mod(er, fill_value=0))
assert_eq(l.radd(r, fill_value=0), el.radd(er, fill_value=0))
assert_eq(l.rsub(r, fill_value=0), el.rsub(er, fill_value=0))
assert_eq(l.rmul(r, fill_value=0), el.rmul(er, fill_value=0))
assert_eq(l.rdiv(r, fill_value=0), el.rdiv(er, fill_value=0))
assert_eq(l.rtruediv(r, fill_value=0), el.rtruediv(er, fill_value=0))
assert_eq(l.rpow(r, fill_value=0), el.rpow(er, fill_value=0))
assert_eq(l.rmod(r, fill_value=0), el.rmod(er, fill_value=0))
for l, r, el, er in [(ddf1, ds2, pdf1, ps2), (ddf1, ddf2.X, pdf1, pdf2.X)]:
assert_eq(l, el)
assert_eq(r, er)
# must specify axis=0 to add Series to each column
# axis=1 is not supported (add to each row)
assert_eq(l.add(r, axis=0), el.add(er, axis=0))
assert_eq(l.sub(r, axis=0), el.sub(er, axis=0))
assert_eq(l.mul(r, axis=0), el.mul(er, axis=0))
assert_eq(l.div(r, axis=0), el.div(er, axis=0))
assert_eq(l.divide(r, axis=0), el.divide(er, axis=0))
assert_eq(l.truediv(r, axis=0), el.truediv(er, axis=0))
assert_eq(l.floordiv(r, axis=0), el.floordiv(er, axis=0))
assert_eq(l.mod(r, axis=0), el.mod(er, axis=0))
assert_eq(l.pow(r, axis=0), el.pow(er, axis=0))
assert_eq(l.radd(r, axis=0), el.radd(er, axis=0))
assert_eq(l.rsub(r, axis=0), el.rsub(er, axis=0))
assert_eq(l.rmul(r, axis=0), el.rmul(er, axis=0))
assert_eq(l.rdiv(r, axis=0), el.rdiv(er, axis=0))
assert_eq(l.rtruediv(r, axis=0), el.rtruediv(er, axis=0))
assert_eq(l.rmod(r, axis=0), el.rmod(er, axis=0))
assert_eq(l.rpow(r, axis=0), el.rpow(er, axis=0))
pytest.raises(ValueError, lambda l=l, r=r: l.add(r, axis=1))
for l, r, el, er in [(ddf1, pdf2, pdf1, pdf2), (ddf1, ps3, pdf1, ps3)]:
assert_eq(l, el)
assert_eq(r, er)
for axis in [0, 1, "index", "columns"]:
assert_eq(l.add(r, axis=axis), el.add(er, axis=axis))
assert_eq(l.sub(r, axis=axis), el.sub(er, axis=axis))
assert_eq(l.mul(r, axis=axis), el.mul(er, axis=axis))
assert_eq(l.div(r, axis=axis), el.div(er, axis=axis))
assert_eq(l.divide(r, axis=axis), el.divide(er, axis=axis))
assert_eq(l.truediv(r, axis=axis), el.truediv(er, axis=axis))
assert_eq(l.floordiv(r, axis=axis), el.floordiv(er, axis=axis))
assert_eq(l.mod(r, axis=axis), el.mod(er, axis=axis))
assert_eq(l.pow(r, axis=axis), el.pow(er, axis=axis))
assert_eq(l.rdiv(r, axis=axis), el.rdiv(er, axis=axis))
assert_eq(l.rtruediv(r, axis=axis), el.rtruediv(er, axis=axis))
assert_eq(l.rpow(r, axis=axis), el.rpow(er, axis=axis))
assert_eq(l.rmod(r, axis=axis), el.rmod(er, axis=axis))
assert_eq(l.radd(r, axis=axis), el.radd(er, axis=axis))
assert_eq(l.rsub(r, axis=axis), el.rsub(er, axis=axis))
assert_eq(l.rmul(r, axis=axis), el.rmul(er, axis=axis))
@pytest.mark.parametrize("split_every", [False, 2])
def test_reductions(split_every):
dsk = {
("x", 0): pd.DataFrame(
{"a": [1, 2, 3], "b": [4, 5, 6], "c": [True, True, False]}, index=[0, 1, 3]
),
("x", 1): pd.DataFrame(
{"a": [4, 5, 6], "b": [3, 2, 1], "c": [False, False, False]},
index=[5, 6, 8],
),
("x", 2): pd.DataFrame(
{
"a": [13094304034, 3489385935, 100006774],
"b": [0, 0, 0],
"c": [True, True, True],
},
index=[9, 9, 9],
),
}
meta = make_meta(
{"a": "i8", "b": "i8", "c": "bool"},
index=pd.Index([], "i8"),
parent_meta=pd.DataFrame(),
)
ddf1 = dd.DataFrame(dsk, "x", meta, [0, 4, 9, 9])
pdf1 = ddf1.compute()
nans1 = pd.Series([1] + [np.nan] * 4 + [2] + [np.nan] * 3)
nands1 = dd.from_pandas(nans1, 2)
nans2 = pd.Series([1] + [np.nan] * 8)
nands2 = dd.from_pandas(nans2, 2)
nans3 = pd.Series([np.nan] * 9)
nands3 = dd.from_pandas(nans3, 2)
bools = pd.Series([True, False, True, False, True], dtype=bool)
boolds = dd.from_pandas(bools, 2)
for dds, pds in [
(ddf1.a, pdf1.a),
(ddf1.b, pdf1.b),
(ddf1.c, pdf1.c),
(ddf1["a"], pdf1["a"]),
(ddf1["b"], pdf1["b"]),
(nands1, nans1),
(nands2, nans2),
(nands3, nans3),
(boolds, bools),
]:
assert isinstance(dds, dd.Series)
assert isinstance(pds, pd.Series)
assert_eq(dds.sum(split_every=split_every), pds.sum())
assert_eq(dds.prod(split_every=split_every), pds.prod())
assert_eq(dds.product(split_every=split_every), pds.product())
assert_eq(dds.min(split_every=split_every), pds.min())
assert_eq(dds.max(split_every=split_every), pds.max())
assert_eq(dds.count(split_every=split_every), pds.count())
if scipy:
# pandas uses unbiased skew, need to correct for that
n = pds.shape[0]
bias_factor = (n * (n - 1)) ** 0.5 / (n - 2)
assert_eq(dds.skew(), pds.skew() / bias_factor)
if scipy:
# pandas uses a bias factor for kurtosis, need to correct for that
n = pds.shape[0]
factor = ((n - 1) * (n + 1)) / ((n - 2) * (n - 3))
offset = (6 * (n - 1)) / ((n - 2) * (n - 3))
assert_eq(factor * dds.kurtosis() + offset, pds.kurtosis())
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
# runtime warnings; https://github.com/dask/dask/issues/2381
assert_eq(dds.std(split_every=split_every), pds.std())
assert_eq(dds.var(split_every=split_every), pds.var())
assert_eq(dds.sem(split_every=split_every), pds.sem())
with warnings.catch_warnings():
# dask.dataframe should probably filter this, to match pandas, but
# it seems quite difficult.
warnings.simplefilter("ignore", RuntimeWarning)
assert_eq(dds.std(ddof=0, split_every=split_every), pds.std(ddof=0))
assert_eq(dds.var(ddof=0, split_every=split_every), pds.var(ddof=0))
assert_eq(dds.sem(ddof=0, split_every=split_every), pds.sem(ddof=0))
assert_eq(dds.mean(split_every=split_every), pds.mean())
assert_eq(dds.nunique(split_every=split_every), pds.nunique())
assert_eq(dds.sum(skipna=False, split_every=split_every), pds.sum(skipna=False))
assert_eq(
dds.prod(skipna=False, split_every=split_every), pds.prod(skipna=False)
)
assert_eq(
dds.product(skipna=False, split_every=split_every),
pds.product(skipna=False),
)
assert_eq(dds.min(skipna=False, split_every=split_every), pds.min(skipna=False))
assert_eq(dds.max(skipna=False, split_every=split_every), pds.max(skipna=False))
assert_eq(dds.std(skipna=False, split_every=split_every), pds.std(skipna=False))
assert_eq(dds.var(skipna=False, split_every=split_every), pds.var(skipna=False))
assert_eq(dds.sem(skipna=False, split_every=split_every), pds.sem(skipna=False))
assert_eq(
dds.std(skipna=False, ddof=0, split_every=split_every),
pds.std(skipna=False, ddof=0),
)
assert_eq(
dds.var(skipna=False, ddof=0, split_every=split_every),
pds.var(skipna=False, ddof=0),
)
assert_eq(
dds.sem(skipna=False, ddof=0, split_every=split_every),
pds.sem(skipna=False, ddof=0),
)
assert_eq(
dds.mean(skipna=False, split_every=split_every), pds.mean(skipna=False)
)
assert_dask_graph(ddf1.b.sum(split_every=split_every), "series-sum")
assert_dask_graph(ddf1.b.prod(split_every=split_every), "series-prod")
assert_dask_graph(ddf1.b.min(split_every=split_every), "series-min")
assert_dask_graph(ddf1.b.max(split_every=split_every), "series-max")
assert_dask_graph(ddf1.b.count(split_every=split_every), "series-count")
assert_dask_graph(ddf1.b.std(split_every=split_every), "series-std")
assert_dask_graph(ddf1.b.var(split_every=split_every), "series-var")
assert_dask_graph(ddf1.b.sem(split_every=split_every), "series-sem")
assert_dask_graph(ddf1.b.std(ddof=0, split_every=split_every), "series-std")
assert_dask_graph(ddf1.b.var(ddof=0, split_every=split_every), "series-var")
assert_dask_graph(ddf1.b.sem(ddof=0, split_every=split_every), "series-sem")
assert_dask_graph(ddf1.b.mean(split_every=split_every), "series-mean")
# nunique is performed using drop-duplicates
assert_dask_graph(ddf1.b.nunique(split_every=split_every), "drop-duplicates")
# testing index
assert_eq(ddf1.index.min(split_every=split_every), pdf1.index.min())
assert_eq(ddf1.index.max(split_every=split_every), pdf1.index.max())
assert_eq(ddf1.index.count(split_every=split_every), pd.notnull(pdf1.index).sum())
@pytest.mark.parametrize("split_every", [False, 2])
def test_reductions_timedelta(split_every):
ds = pd.Series(pd.to_timedelta([2, 3, 4, np.nan, 5]))
dds = dd.from_pandas(ds, 2)
assert_eq(dds.sum(split_every=split_every), ds.sum())
assert_eq(dds.min(split_every=split_every), ds.min())
assert_eq(dds.max(split_every=split_every), ds.max())
assert_eq(dds.count(split_every=split_every), ds.count())
@pytest.mark.parametrize(
"frame,axis,out",
[
(
pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]),
0,
pd.Series([], dtype="float64"),
),
(
pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]),
1,
pd.Series([], dtype="float64"),
),
(pd.Series([1, 2.5, 6]), None, None),
],
)
@pytest.mark.parametrize(
"redfunc", ["sum", "prod", "product", "min", "max", "mean", "var", "std"]
)
def test_reductions_out(frame, axis, out, redfunc):
dsk_in = dd.from_pandas(frame, 3)
dsk_out = dd.from_pandas(pd.Series([0]), 1).sum()
if out is not None:
dsk_out = dd.from_pandas(out, 3)
np_redfunc = getattr(np, redfunc)
pd_redfunc = getattr(frame.__class__, redfunc)
dsk_redfunc = getattr(dsk_in.__class__, redfunc)
if redfunc in ["var", "std"]:
# numpy has default ddof value 0 while
# dask and pandas have 1, so ddof should be passed
# explicitly when calling np.var(dask)
np_redfunc(dsk_in, axis=axis, ddof=1, out=dsk_out)
else:
np_redfunc(dsk_in, axis=axis, out=dsk_out)
assert_eq(dsk_out, pd_redfunc(frame, axis=axis))
dsk_redfunc(dsk_in, axis=axis, split_every=False, out=dsk_out)
assert_eq(dsk_out, pd_redfunc(frame, axis=axis))
dsk_redfunc(dsk_in, axis=axis, split_every=2, out=dsk_out)
assert_eq(dsk_out, pd_redfunc(frame, axis=axis))
@pytest.mark.parametrize("split_every", [False, 2])
def test_allany(split_every):
df = pd.DataFrame(
np.random.choice([True, False], size=(100, 4)), columns=["A", "B", "C", "D"]
)
df["E"] = list("abcde") * 20
ddf = dd.from_pandas(df, 10)
assert_eq(ddf.all(split_every=split_every), df.all())
assert_eq(ddf.all(axis=1, split_every=split_every), df.all(axis=1))
assert_eq(ddf.all(axis=0, split_every=split_every), df.all(axis=0))
assert_eq(ddf.any(split_every=split_every), df.any())
assert_eq(ddf.any(axis=1, split_every=split_every), df.any(axis=1))
assert_eq(ddf.any(axis=0, split_every=split_every), df.any(axis=0))
assert_eq(ddf.A.all(split_every=split_every), df.A.all())
assert_eq(ddf.A.any(split_every=split_every), df.A.any())
# testing numpy functions with out param
ddf_out_axis_default = dd.from_pandas(
pd.Series([False, False, False, False, False], index=["A", "B", "C", "D", "E"]),
10,
)
ddf_out_axis1 = dd.from_pandas(
pd.Series(np.random.choice([True, False], size=(100,))), 10
)
# all
ddf.all(split_every=split_every, out=ddf_out_axis_default)
assert_eq(ddf_out_axis_default, df.all())
ddf.all(axis=1, split_every=split_every, out=ddf_out_axis1)
assert_eq(ddf_out_axis1, df.all(axis=1))
ddf.all(split_every=split_every, axis=0, out=ddf_out_axis_default)
assert_eq(ddf_out_axis_default, df.all(axis=0))
# any
ddf.any(split_every=split_every, out=ddf_out_axis_default)
assert_eq(ddf_out_axis_default, df.any())
ddf.any(axis=1, split_every=split_every, out=ddf_out_axis1)
assert_eq(ddf_out_axis1, df.any(axis=1))
ddf.any(split_every=split_every, axis=0, out=ddf_out_axis_default)
assert_eq(ddf_out_axis_default, df.any(axis=0))
@pytest.mark.parametrize("split_every", [False, 2])
def test_deterministic_reduction_names(split_every):
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [5, 6, 7, 8]})
ddf = dd.from_pandas(df, npartitions=2)
for x in [ddf, ddf.x]:
assert (
x.sum(split_every=split_every)._name == x.sum(split_every=split_every)._name
)
assert (
x.prod(split_every=split_every)._name
== x.prod(split_every=split_every)._name
)
assert (
x.product(split_every=split_every)._name
== x.product(split_every=split_every)._name
)
assert (
x.min(split_every=split_every)._name == x.min(split_every=split_every)._name
)
assert (
x.max(split_every=split_every)._name == x.max(split_every=split_every)._name
)
assert (
x.count(split_every=split_every)._name
== x.count(split_every=split_every)._name
)
assert (
x.std(split_every=split_every)._name == x.std(split_every=split_every)._name
)
assert (
x.var(split_every=split_every)._name == x.var(split_every=split_every)._name
)
assert (
x.sem(split_every=split_every)._name == x.sem(split_every=split_every)._name
)
assert (
x.mean(split_every=split_every)._name
== x.mean(split_every=split_every)._name
)
assert (
ddf.x.nunique(split_every=split_every)._name
== ddf.x.nunique(split_every=split_every)._name
)
def test_reduction_series_invalid_axis():
dsk = {
("x", 0): pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]),
("x", 1): pd.DataFrame({"a": [4, 5, 6], "b": [3, 2, 1]}, index=[5, 6, 8]),
("x", 2): pd.DataFrame({"a": [7, 8, 9], "b": [0, 0, 0]}, index=[9, 9, 9]),
}
meta = make_meta(
{"a": "i8", "b": "i8"}, index=pd.Index([], "i8"), parent_meta=pd.DataFrame()
)
ddf1 = dd.DataFrame(dsk, "x", meta, [0, 4, 9, 9])
pdf1 = ddf1.compute()
for axis in [1, "columns"]:
for s in [ddf1.a, pdf1.a]: # both must behave the same
pytest.raises(ValueError, lambda s=s, axis=axis: s.sum(axis=axis))
pytest.raises(ValueError, lambda s=s, axis=axis: s.prod(axis=axis))
pytest.raises(ValueError, lambda s=s, axis=axis: s.product(axis=axis))
pytest.raises(ValueError, lambda s=s, axis=axis: s.min(axis=axis))
pytest.raises(ValueError, lambda s=s, axis=axis: s.max(axis=axis))
# only count doesn't have axis keyword
pytest.raises(TypeError, lambda s=s, axis=axis: s.count(axis=axis))
pytest.raises(ValueError, lambda s=s, axis=axis: s.std(axis=axis))
pytest.raises(ValueError, lambda s=s, axis=axis: s.var(axis=axis))
pytest.raises(ValueError, lambda s=s, axis=axis: s.sem(axis=axis))
pytest.raises(ValueError, lambda s=s, axis=axis: s.mean(axis=axis))
def test_reductions_non_numeric_dtypes():
# test non-numric blocks
def check_raises(d, p, func):
pytest.raises((TypeError, ValueError), lambda: getattr(d, func)().compute())
pytest.raises((TypeError, ValueError), lambda: getattr(p, func)())
pds = pd.Series(["a", "b", "c", "d", "e"])
dds = dd.from_pandas(pds, 2)
assert_eq(dds.sum(), pds.sum())
check_raises(dds, pds, "prod")
check_raises(dds, pds, "product")
assert_eq(dds.min(), pds.min())
assert_eq(dds.max(), pds.max())
assert_eq(dds.count(), pds.count())
check_raises(dds, pds, "std")
check_raises(dds, pds, "var")
check_raises(dds, pds, "sem")
check_raises(dds, pds, "skew")
check_raises(dds, pds, "kurtosis")
assert_eq(dds.nunique(), pds.nunique())
for pds in [
pd.Series(pd.Categorical([1, 2, 3, 4, 5], ordered=True)),
pd.Series(pd.Categorical(list("abcde"), ordered=True)),
pd.Series(pd.date_range("2011-01-01", freq="D", periods=5)),
]:
dds = dd.from_pandas(pds, 2)
check_raises(dds, pds, "sum")
check_raises(dds, pds, "prod")
check_raises(dds, pds, "product")
assert_eq(dds.min(), pds.min())
assert_eq(dds.max(), pds.max())
assert_eq(dds.count(), pds.count())
if PANDAS_GT_120 and pds.dtype == "datetime64[ns]":
# std is implemented for datetimes in pandas 1.2.0, but dask
# implementation depends on var which isn't
pass
else:
check_raises(dds, pds, "std")
check_raises(dds, pds, "var")
check_raises(dds, pds, "sem")
check_raises(dds, pds, "skew")
check_raises(dds, pds, "kurtosis")
assert_eq(dds.nunique(), pds.nunique())
pds = pd.Series(pd.timedelta_range("1 days", freq="D", periods=5))
dds = dd.from_pandas(pds, 2)
assert_eq(dds.sum(), pds.sum())
assert_eq(dds.min(), pds.min())
assert_eq(dds.max(), pds.max())
assert_eq(dds.count(), pds.count())
# both pandas and dask skew calculations do not support timedelta
check_raises(dds, pds, "skew")
check_raises(dds, pds, "kurtosis")
# ToDo: pandas supports timedelta std, dask returns float64
# assert_eq(dds.std(), pds.std())
# ToDo: pandas supports timedelta std, otherwise dask raises:
# TypeError: unsupported operand type(s) for *: 'float' and 'Timedelta'
# assert_eq(dds.mean(), pds.mean())
assert_eq(dds.nunique(), pds.nunique())
@pytest.mark.parametrize("split_every", [False, 2])
def test_reductions_frame(split_every):
dsk = {
("x", 0): pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]),
("x", 1): pd.DataFrame({"a": [4, 5, 6], "b": [3, 2, 1]}, index=[5, 6, 8]),
("x", 2): pd.DataFrame({"a": [7, 8, 9], "b": [0, 0, 0]}, index=[9, 9, 9]),
}
meta = make_meta(
{"a": "i8", "b": "i8"}, index=pd.Index([], "i8"), parent_meta=pd.DataFrame()
)
ddf1 = dd.DataFrame(dsk, "x", meta, [0, 4, 9, 9])
pdf1 = ddf1.compute()
assert_eq(ddf1.sum(split_every=split_every), pdf1.sum())
assert_eq(ddf1.prod(split_every=split_every), pdf1.prod())
assert_eq(ddf1.product(split_every=split_every), pdf1.product())
assert_eq(ddf1.min(split_every=split_every), pdf1.min())
assert_eq(ddf1.max(split_every=split_every), pdf1.max())
assert_eq(ddf1.count(split_every=split_every), pdf1.count())
assert_eq(ddf1.std(split_every=split_every), pdf1.std())
assert_eq(ddf1.var(split_every=split_every), pdf1.var())
assert_eq(ddf1.sem(split_every=split_every), pdf1.sem())
assert_eq(ddf1.std(ddof=0, split_every=split_every), pdf1.std(ddof=0))
assert_eq(ddf1.var(ddof=0, split_every=split_every), pdf1.var(ddof=0))
assert_eq(ddf1.sem(ddof=0, split_every=split_every), pdf1.sem(ddof=0))
assert_eq(ddf1.mean(split_every=split_every), pdf1.mean())
for axis in [0, 1, "index", "columns"]:
assert_eq(ddf1.sum(axis=axis, split_every=split_every), pdf1.sum(axis=axis))
assert_eq(ddf1.prod(axis=axis, split_every=split_every), pdf1.prod(axis=axis))
assert_eq(
ddf1.product(axis=axis, split_every=split_every), pdf1.product(axis=axis)
)
assert_eq(ddf1.min(axis=axis, split_every=split_every), pdf1.min(axis=axis))
assert_eq(ddf1.max(axis=axis, split_every=split_every), pdf1.max(axis=axis))
assert_eq(ddf1.count(axis=axis, split_every=split_every), pdf1.count(axis=axis))
assert_eq(ddf1.std(axis=axis, split_every=split_every), pdf1.std(axis=axis))
assert_eq(ddf1.var(axis=axis, split_every=split_every), pdf1.var(axis=axis))
assert_eq(ddf1.sem(axis=axis, split_every=split_every), pdf1.sem(axis=axis))
assert_eq(
ddf1.std(axis=axis, ddof=0, split_every=split_every),
pdf1.std(axis=axis, ddof=0),
)
assert_eq(
ddf1.var(axis=axis, ddof=0, split_every=split_every),
pdf1.var(axis=axis, ddof=0),
)
assert_eq(
ddf1.sem(axis=axis, ddof=0, split_every=split_every),
pdf1.sem(axis=axis, ddof=0),
)
assert_eq(ddf1.mean(axis=axis, split_every=split_every), pdf1.mean(axis=axis))
pytest.raises(ValueError, lambda: ddf1.sum(axis="incorrect").compute())
# axis=0
assert_dask_graph(ddf1.sum(split_every=split_every), "dataframe-sum")
assert_dask_graph(ddf1.prod(split_every=split_every), "dataframe-prod")
assert_dask_graph(ddf1.min(split_every=split_every), "dataframe-min")
assert_dask_graph(ddf1.max(split_every=split_every), "dataframe-max")
assert_dask_graph(ddf1.count(split_every=split_every), "dataframe-count")
# std, var, sem, and mean consist of moment_* operations
assert_dask_graph(ddf1.std(split_every=split_every), "dataframe-var")
assert_dask_graph(ddf1.std(split_every=split_every), "moment_chunk")
assert_dask_graph(ddf1.std(split_every=split_every), "moment_agg")
assert_dask_graph(ddf1.std(split_every=split_every), "values")
assert_dask_graph(ddf1.var(split_every=split_every), "moment_chunk")
assert_dask_graph(ddf1.var(split_every=split_every), "moment_agg")
assert_dask_graph(ddf1.var(split_every=split_every), "values")
assert_dask_graph(ddf1.sem(split_every=split_every), "dataframe-var")
assert_dask_graph(ddf1.sem(split_every=split_every), "moment_chunk")
assert_dask_graph(ddf1.sem(split_every=split_every), "moment_agg")
assert_dask_graph(ddf1.sem(split_every=split_every), "values")
assert_dask_graph(ddf1.mean(split_every=split_every), "dataframe-sum")
assert_dask_graph(ddf1.mean(split_every=split_every), "dataframe-count")
# axis=1
assert_dask_graph(ddf1.sum(axis=1, split_every=split_every), "dataframe-sum")
assert_dask_graph(ddf1.prod(axis=1, split_every=split_every), "dataframe-prod")
assert_dask_graph(ddf1.min(axis=1, split_every=split_every), "dataframe-min")
assert_dask_graph(ddf1.max(axis=1, split_every=split_every), "dataframe-max")
assert_dask_graph(ddf1.count(axis=1, split_every=split_every), "dataframe-count")
assert_dask_graph(ddf1.std(axis=1, split_every=split_every), "dataframe-std")
assert_dask_graph(ddf1.var(axis=1, split_every=split_every), "dataframe-var")
assert_dask_graph(ddf1.sem(axis=1, split_every=split_every), "dataframe-sem")
assert_dask_graph(ddf1.mean(axis=1, split_every=split_every), "dataframe-mean")
@pytest.mark.filterwarnings(
"ignore:Dropping of nuisance columns:FutureWarning"
) # https://github.com/dask/dask/issues/7714
def test_reductions_frame_dtypes():
df = pd.DataFrame(
{
"int": [1, 2, 3, 4, 5, 6, 7, 8],
"float": [1.0, 2.0, 3.0, 4.0, np.nan, 6.0, 7.0, 8.0],
"dt": [pd.NaT] + [datetime(2011, i, 1) for i in range(1, 8)],
"str": list("abcdefgh"),
"timedelta": pd.to_timedelta([1, 2, 3, 4, 5, 6, 7, np.nan]),
"bool": [True, False] * 4,
}
)
ddf = dd.from_pandas(df, 3)
# TODO: std and mean do not support timedelta dtype
df_no_timedelta = df.drop("timedelta", axis=1, inplace=False)
ddf_no_timedelta = dd.from_pandas(df_no_timedelta, 3)
assert_eq(df.drop(columns="dt").sum(), ddf.drop(columns="dt").sum())
with check_numeric_only_deprecation():
expected = df_no_timedelta.drop(columns="dt").mean()
assert_eq(
expected,
ddf_no_timedelta.drop(columns="dt").mean(),
)
with check_numeric_only_deprecation():
expected = df.prod()
assert_eq(expected, ddf.prod())
with check_numeric_only_deprecation():
expected = df.product()
assert_eq(expected, ddf.product())
assert_eq(df.min(), ddf.min())
assert_eq(df.max(), ddf.max())
assert_eq(df.count(), ddf.count())
with check_numeric_only_deprecation():
expected = df.sem()
assert_eq(expected, ddf.sem())
with check_numeric_only_deprecation():
expected = df.sem(ddof=0)
assert_eq(expected, ddf.sem(ddof=0))
with check_numeric_only_deprecation():
expected = df_no_timedelta.std()
assert_eq(expected, ddf_no_timedelta.std())
with check_numeric_only_deprecation():
expected = df_no_timedelta.std(skipna=False)
assert_eq(expected, ddf_no_timedelta.std(skipna=False))
with check_numeric_only_deprecation():
expected = df_no_timedelta.std(ddof=0)
assert_eq(expected, ddf_no_timedelta.std(ddof=0))
with check_numeric_only_deprecation():
expected = df_no_timedelta.var()
assert_eq(expected, ddf_no_timedelta.var())
with check_numeric_only_deprecation():
expected = df_no_timedelta.var(skipna=False)
assert_eq(expected, ddf_no_timedelta.var(skipna=False))
with check_numeric_only_deprecation():
expected = df_no_timedelta.var(ddof=0)
assert_eq(expected, ddf_no_timedelta.var(ddof=0))
with check_numeric_only_deprecation():
expected = df_no_timedelta.var(ddof=0, skipna=False)
assert_eq(
expected,
ddf_no_timedelta.var(ddof=0, skipna=False),
)
assert_eq(df._get_numeric_data(), ddf._get_numeric_data())
numerics = ddf[["int", "float"]]
assert numerics._get_numeric_data().dask == numerics.dask
# test var corner cases
# only timedelta
df_td = df[["timedelta"]]
ddf_td = dd.from_pandas(df_td, 3)
with check_numeric_only_deprecation():
expected = df_td.var(ddof=0)
assert_eq(expected, ddf_td.var(ddof=0))
with check_numeric_only_deprecation():
expected = df_td.var()
assert_eq(expected, ddf_td.var())
# only numercis
df_numerics = df[["int", "float", "bool"]]
ddf_numerics = dd.from_pandas(df_numerics, 3)
assert_eq(df_numerics.var(), ddf_numerics.var())
def test_reductions_frame_dtypes_numeric_only():
df = pd.DataFrame(
{
"int": [1, 2, 3, 4, 5, 6, 7, 8],
"float": [1.0, 2.0, 3.0, 4.0, np.nan, 6.0, 7.0, 8.0],
"dt": [pd.NaT] + [datetime(2011, i, 1) for i in range(1, 8)],
"str": list("abcdefgh"),
"timedelta": pd.to_timedelta([1, 2, 3, 4, 5, 6, 7, np.nan]),
"bool": [True, False] * 4,
}
)
ddf = dd.from_pandas(df, 3)
kwargs = {"numeric_only": True}
funcs = [
"sum",
"prod",
"product",
"min",
"max",
"mean",
"var",
"std",
"count",
"sem",
]
for func in funcs:
assert_eq(
getattr(df, func)(**kwargs),
getattr(ddf, func)(**kwargs),
check_dtype=func in ["mean", "max"] and PANDAS_GT_120,
)
with pytest.raises(NotImplementedError, match="'numeric_only=False"):
getattr(ddf, func)(numeric_only=False)
assert_eq(df.sem(ddof=0, **kwargs), ddf.sem(ddof=0, **kwargs))
assert_eq(df.std(ddof=0, **kwargs), ddf.std(ddof=0, **kwargs))
assert_eq(df.var(ddof=0, **kwargs), ddf.var(ddof=0, **kwargs))
assert_eq(df.var(skipna=False, **kwargs), ddf.var(skipna=False, **kwargs))
assert_eq(
df.var(skipna=False, ddof=0, **kwargs), ddf.var(skipna=False, ddof=0, **kwargs)
)
# ------ only include numerics columns ------ #
assert_eq(df._get_numeric_data(), ddf._get_numeric_data())
df_numerics = df[["int", "float", "bool"]]
ddf_numerics = ddf[["int", "float", "bool"]]
assert_eq(df_numerics, ddf._get_numeric_data())
assert ddf_numerics._get_numeric_data().dask == ddf_numerics.dask
for func in funcs:
assert_eq(
getattr(df_numerics, func)(),
getattr(ddf_numerics, func)(),
check_dtype=func in ["mean", "max"] and PANDAS_GT_120,
)
@pytest.mark.parametrize("split_every", [False, 2])
def test_reductions_frame_nan(split_every):
df = pd.DataFrame(
{
"a": [1, 2, np.nan, 4, 5, 6, 7, 8],
"b": [1, 2, np.nan, np.nan, np.nan, 5, np.nan, np.nan],
"c": [np.nan] * 8,
}
)
ddf = dd.from_pandas(df, 3)
assert_eq(df.sum(), ddf.sum(split_every=split_every))
assert_eq(df.prod(), ddf.prod(split_every=split_every))
assert_eq(df.product(), ddf.product(split_every=split_every))
assert_eq(df.min(), ddf.min(split_every=split_every))
assert_eq(df.max(), ddf.max(split_every=split_every))
assert_eq(df.count(), ddf.count(split_every=split_every))
with warnings.catch_warnings():
# dask.dataframe should probably filter this, to match pandas, but
# it seems quite difficult.
warnings.simplefilter("ignore", RuntimeWarning)
assert_eq(df.std(), ddf.std(split_every=split_every))
assert_eq(df.var(), ddf.var(split_every=split_every))
assert_eq(df.sem(), ddf.sem(split_every=split_every))
assert_eq(df.std(ddof=0), ddf.std(ddof=0, split_every=split_every))
assert_eq(df.var(ddof=0), ddf.var(ddof=0, split_every=split_every))
assert_eq(df.sem(ddof=0), ddf.sem(ddof=0, split_every=split_every))
assert_eq(df.mean(), ddf.mean(split_every=split_every))
with warnings.catch_warnings(record=True):
assert_eq(df.sum(skipna=False), ddf.sum(skipna=False, split_every=split_every))
assert_eq(
df.prod(skipna=False), ddf.prod(skipna=False, split_every=split_every)
)
assert_eq(
df.product(skipna=False), ddf.product(skipna=False, split_every=split_every)
)
assert_eq(df.min(skipna=False), ddf.min(skipna=False, split_every=split_every))
assert_eq(df.max(skipna=False), ddf.max(skipna=False, split_every=split_every))
assert_eq(df.std(skipna=False), ddf.std(skipna=False, split_every=split_every))
assert_eq(df.var(skipna=False), ddf.var(skipna=False, split_every=split_every))
assert_eq(df.sem(skipna=False), ddf.sem(skipna=False, split_every=split_every))
assert_eq(
df.std(skipna=False, ddof=0),
ddf.std(skipna=False, ddof=0, split_every=split_every),
)
assert_eq(
df.var(skipna=False, ddof=0),
ddf.var(skipna=False, ddof=0, split_every=split_every),
)
assert_eq(
df.sem(skipna=False, ddof=0),
ddf.sem(skipna=False, ddof=0, split_every=split_every),
)
assert_eq(
df.mean(skipna=False), ddf.mean(skipna=False, split_every=split_every)
)
assert_eq(
df.sum(axis=1, skipna=False),
ddf.sum(axis=1, skipna=False, split_every=split_every),
)
assert_eq(
df.prod(axis=1, skipna=False),
ddf.prod(axis=1, skipna=False, split_every=split_every),
)
assert_eq(
df.product(axis=1, skipna=False),
ddf.product(axis=1, skipna=False, split_every=split_every),
)
assert_eq(
df.min(axis=1, skipna=False),
ddf.min(axis=1, skipna=False, split_every=split_every),
)
assert_eq(
df.max(axis=1, skipna=False),
ddf.max(axis=1, skipna=False, split_every=split_every),
)
assert_eq(
df.std(axis=1, skipna=False),
ddf.std(axis=1, skipna=False, split_every=split_every),
)
assert_eq(
df.var(axis=1, skipna=False),
ddf.var(axis=1, skipna=False, split_every=split_every),
)
assert_eq(
df.sem(axis=1, skipna=False),
ddf.sem(axis=1, skipna=False, split_every=split_every),
)
assert_eq(
df.std(axis=1, skipna=False, ddof=0),
ddf.std(axis=1, skipna=False, ddof=0, split_every=split_every),
)
assert_eq(
df.var(axis=1, skipna=False, ddof=0),
ddf.var(axis=1, skipna=False, ddof=0, split_every=split_every),
)
assert_eq(
df.sem(axis=1, skipna=False, ddof=0),
ddf.sem(axis=1, skipna=False, ddof=0, split_every=split_every),
)
assert_eq(
df.mean(axis=1, skipna=False),
ddf.mean(axis=1, skipna=False, split_every=split_every),
)
@pytest.mark.parametrize("comparison", ["lt", "gt", "le", "ge", "ne", "eq"])
def test_series_comparison_nan(comparison):
s = pd.Series([1, 2, 3, 4, 5, 6, 7])
s_nan = pd.Series([1, -1, 8, np.nan, 5, 6, 2.4])
ds = dd.from_pandas(s, 3)
ds_nan = dd.from_pandas(s_nan, 3)
fill_value = 7
comparison_pd = getattr(s, comparison)
comparison_dd = getattr(ds, comparison)
assert_eq(
comparison_dd(ds_nan, fill_value=fill_value),
comparison_pd(s_nan, fill_value=fill_value),
)
def test_sum_intna():
a = pd.Series([1, None, 2], dtype=pd.Int32Dtype())
b = dd.from_pandas(a, 2)
assert_eq(a.sum(), b.sum())
def test_divmod():
df1 = pd.Series(np.random.rand(10))
df2 = pd.Series(np.random.rand(10))
ddf1 = dd.from_pandas(df1, npartitions=3)
ddf2 = dd.from_pandas(df2, npartitions=3)
result = divmod(ddf1, 2.0)
expected = divmod(df1, 2.0)
assert_eq(result[0], expected[0])
assert_eq(result[1], expected[1])
result = divmod(ddf1, ddf2)
expected = divmod(df1, df2)
assert_eq(result[0], expected[0])
assert_eq(result[1], expected[1])
@pytest.mark.skipif("not scipy")
def test_moment():
from dask.array import stats
from dask.array.utils import assert_eq
df = pd.Series(list(range(10)))
ddf = dd.from_pandas(df, npartitions=2)
assert_eq(stats.moment(ddf, 2, 0), scipy.stats.moment(df, 2, 0))
@pytest.mark.parametrize("func", ["sum", "count", "mean", "var", "sem"])
def test_empty_df_reductions(func):
pdf = pd.DataFrame()
ddf = dd.from_pandas(pdf, npartitions=1)
dsk_func = getattr(ddf.__class__, func)
pd_func = getattr(pdf.__class__, func)
assert_eq(dsk_func(ddf), pd_func(pdf))
idx = pd.date_range("2000", periods=4)
pdf = pd.DataFrame(index=idx)
ddf = dd.from_pandas(pdf, npartitions=1)
assert_eq(dsk_func(ddf), pd_func(pdf))
@pytest.mark.parametrize("method", ["sum", "prod", "product"])
@pytest.mark.parametrize("min_count", [0, 9])
def test_series_agg_with_min_count(method, min_count):
df = pd.DataFrame([[1]], columns=["a"])
ddf = dd.from_pandas(df, npartitions=1)
func = getattr(ddf["a"], method)
result = func(min_count=min_count).compute()
if min_count == 0:
assert result == 1
else:
assert result is np.nan
# Default absolute tolerance of 2000 nanoseconds
def assert_near_timedeltas(t1, t2, atol=2000):
if is_scalar(t1):
t1 = pd.Series([t1])
if is_scalar(t2):
t2 = pd.Series([t2])
assert t1.dtype == t2.dtype
assert_eq(pd.to_numeric(t1), pd.to_numeric(t2), atol=atol)
@pytest.mark.skipif(
not PANDAS_GT_120, reason="std() for datetime only added in pandas>=1.2"
)
@pytest.mark.parametrize("axis", [0, 1])
def test_datetime_std_creates_copy_cols(axis):
pdf = pd.DataFrame(
{
"dt1": [
datetime.fromtimestamp(1636426700 + (i * 250000)) for i in range(10)
],
"dt2": [
datetime.fromtimestamp(1636426700 + (i * 300000)) for i in range(10)
],
}
)
ddf = dd.from_pandas(pdf, 3)
# Series test (same line twice to make sure data structure wasn't mutated)
assert_eq(ddf["dt1"].std(), pdf["dt1"].std())
assert_eq(ddf["dt1"].std(), pdf["dt1"].std())
# DataFrame test (same line twice to make sure data structure wasn't mutated)
assert_near_timedeltas(ddf.std(axis=axis).compute(), pdf.std(axis=axis))
assert_near_timedeltas(ddf.std(axis=axis).compute(), pdf.std(axis=axis))
@pytest.mark.skipif(
not PANDAS_GT_120, reason="std() for datetime only added in pandas>=1.2"
)
@pytest.mark.parametrize("axis", [0, 1])
@pytest.mark.parametrize("skipna", [False, True])
def test_datetime_std_with_larger_dataset(axis, skipna):
num_rows = 250
dt1 = pd.concat(
[
pd.Series([pd.NaT] * 15, index=range(15)),
pd.to_datetime(
pd.Series(
[
datetime.fromtimestamp(1636426704 + (i * 250000))
for i in range(num_rows - 15)
],
index=range(15, 250),
)
),
],
ignore_index=False,
)
base_numbers = [
(1638290040706793300 + (i * 69527182702409)) for i in range(num_rows)
]
pdf = pd.DataFrame(
{"dt1": dt1, "dt2": pd.to_datetime(pd.Series(base_numbers))}, index=range(250)
)
for i in range(3, 8):
pdf[f"dt{i}"] = pd.to_datetime(
pd.Series([int(x + (0.12 * i)) for x in base_numbers])
)
ddf = dd.from_pandas(pdf, 8)
assert_near_timedeltas(
ddf[["dt1"]].std(axis=axis, skipna=skipna).compute(),
pdf[["dt1"]].std(axis=axis, skipna=skipna),
)
# Same thing but as Series. No axis, since axis=1 raises error
assert_near_timedeltas(
ddf["dt1"].std(skipna=skipna).compute(), pdf["dt1"].std(skipna=skipna)
)
# Computation on full dataset
assert_near_timedeltas(
ddf.std(axis=axis, skipna=skipna).compute(), pdf.std(axis=axis, skipna=skipna)
)
@pytest.mark.skipif(
not PANDAS_GT_120, reason="std() for datetime only added in pandas>=1.2"
)
@pytest.mark.filterwarnings(
"ignore:Dropping of nuisance columns:FutureWarning"
) # https://github.com/dask/dask/issues/7714
@pytest.mark.parametrize("skipna", [False, True])
def test_datetime_std_across_axis1_null_results(skipna):
pdf = pd.DataFrame(
{
"dt1": [
datetime.fromtimestamp(1636426704 + (i * 250000)) for i in range(10)
],
"dt2": [
datetime.fromtimestamp(1636426704 + (i * 217790)) for i in range(10)
],
"nums": [i for i in range(10)],
}
)
ddf = dd.from_pandas(pdf, 3)
# Single column always results in NaT
assert_eq(
ddf[["dt1"]].std(axis=1, skipna=skipna), pdf[["dt1"]].std(axis=1, skipna=skipna)
)
# Mix of datetimes with other numeric types produces NaNs
assert_eq(ddf.std(axis=1, skipna=skipna), pdf.std(axis=1, skipna=skipna))
# Test with mix of na and truthy datetimes
pdf2 = pd.DataFrame(
{
"dt1": [pd.NaT]
+ [datetime.fromtimestamp(1636426704 + (i * 250000)) for i in range(10)]
+ [pd.NaT],
"dt2": [
datetime.fromtimestamp(1636426704 + (i * 250000)) for i in range(12)
],
"dt3": [
datetime.fromtimestamp(1636426704 + (i * 282616)) for i in range(12)
],
}
)
ddf2 = dd.from_pandas(pdf2, 3)
assert_eq(ddf2.std(axis=1, skipna=skipna), pdf2.std(axis=1, skipna=skipna))
def test_std_raises_on_index():
with pytest.raises(
NotImplementedError,
match="`std` is only supported with objects that are Dataframes or Series",
):
dd.from_pandas(pd.DataFrame({"test": [1, 2]}), npartitions=2).index.std()