Repository URL to install this package:
|
Version:
2022.10.0 ▾
|
import numpy as np
import pandas as pd
import pytest
import dask.dataframe as dd
rs = np.random.RandomState(96)
@pytest.mark.parametrize(
"df",
[
pd.DataFrame(
{
"x": [1, 2, 3] * 3,
"y": [1.2, 3.4, 5.6] * 3,
"z": -(np.arange(9, dtype=np.int8)),
}
),
pd.DataFrame(
{
"x": rs.randint(0, 1000000, (10000,)),
"y": rs.randn(10000),
"z": rs.uniform(0, 9999999, (10000,)),
}
),
pd.DataFrame(
{
"x": np.repeat(rs.randint(0, 1000000, (1000,)), 3),
"y": np.repeat(rs.randn(1000), 3),
"z": np.repeat(rs.uniform(0, 9999999, (1000,)), 3),
}
),
pd.DataFrame({"x": rs.randint(0, 1000000, (10000,))}),
pd.DataFrame(
{
"x": rs.randint(0, 1000000, (7,)),
"y": ["a", "bet", "is", "a", "tax", "on", "bs"],
}
),
pd.DataFrame(
{
"w": np.zeros((20000,)),
"x": np.zeros((20000,)),
"y": np.zeros((20000,)) + 4803592,
"z": np.zeros((20000,)),
}
),
pd.DataFrame({"x": [1, 2, 3] * 1000}),
pd.DataFrame({"x": np.random.random(1000)}),
pd.DataFrame(
{
"a": [1, 2, 3] * 3,
"b": [1.2, 3.4, 5.6] * 3,
"c": [1 + 2j, 3 + 4j, 5 + 6j] * 3,
"d": -(np.arange(9, dtype=np.int8)),
}
),
pd.Series([1, 2, 3] * 1000),
pd.Series(np.random.random(1000)),
pd.Series(np.random.random(1000), index=np.ones(1000)),
pd.Series(np.random.random(1000), index=np.random.random(1000)),
],
)
@pytest.mark.parametrize("npartitions", [2, 20])
def test_basic(df, npartitions):
ddf = dd.from_pandas(df, npartitions=npartitions)
approx = ddf.nunique_approx().compute(scheduler="sync")
exact = len(df.drop_duplicates())
assert abs(approx - exact) <= 2 or abs(approx - exact) / exact < 0.05
@pytest.mark.parametrize("split_every", [None, 2, 10])
@pytest.mark.parametrize("npartitions", [2, 20])
def test_split_every(split_every, npartitions):
df = pd.Series([1, 2, 3] * 1000)
ddf = dd.from_pandas(df, npartitions=npartitions)
approx = ddf.nunique_approx(split_every=split_every).compute(scheduler="sync")
exact = len(df.drop_duplicates())
assert abs(approx - exact) <= 2 or abs(approx - exact) / exact < 0.05
def test_larger_data():
df = dd.demo.make_timeseries(
"2000-01-01",
"2000-04-01",
{"value": float, "id": int},
freq="10s",
partition_freq="1D",
seed=1,
)
assert df.nunique_approx().compute() > 1000