Repository URL to install this package:
|
Version:
2022.10.0 ▾
|
from datetime import datetime
import numpy as np
import pandas as pd
import pytest
import dask.array as da
import dask.dataframe as dd
from dask.blockwise import Blockwise
from dask.dataframe._compat import tm
from dask.dataframe.io.io import _meta_from_array
from dask.dataframe.optimize import optimize
from dask.dataframe.utils import assert_eq
from dask.delayed import Delayed, delayed
from dask.utils_test import hlg_layer_topological
##########
# Arrays #
##########
def test_meta_from_array():
x = np.array([[1, 2], [3, 4]], dtype=np.int64)
res = _meta_from_array(x)
assert isinstance(res, pd.DataFrame)
assert res[0].dtype == np.int64
assert res[1].dtype == np.int64
tm.assert_index_equal(res.columns, pd.Index([0, 1]))
x = np.array([[1.0, 2.0], [3.0, 4.0]], dtype=np.float64)
res = _meta_from_array(x, columns=["a", "b"])
assert isinstance(res, pd.DataFrame)
assert res["a"].dtype == np.float64
assert res["b"].dtype == np.float64
tm.assert_index_equal(res.columns, pd.Index(["a", "b"]))
with pytest.raises(ValueError):
_meta_from_array(x, columns=["a", "b", "c"])
np.random.seed(42)
x = np.random.rand(201, 2)
x = dd.from_array(x, chunksize=50, columns=["a", "b"])
assert len(x.divisions) == 6 # Should be 5 partitions and the end
def test_meta_from_1darray():
x = np.array([1.0, 2.0, 3.0], dtype=np.float64)
res = _meta_from_array(x)
assert isinstance(res, pd.Series)
assert res.dtype == np.float64
x = np.array([1, 2, 3], dtype=np.object_)
res = _meta_from_array(x, columns="x")
assert isinstance(res, pd.Series)
assert res.name == "x"
assert res.dtype == np.object_
x = np.array([1, 2, 3], dtype=np.object_)
res = _meta_from_array(x, columns=["x"])
assert isinstance(res, pd.DataFrame)
assert res["x"].dtype == np.object_
tm.assert_index_equal(res.columns, pd.Index(["x"]))
with pytest.raises(ValueError):
_meta_from_array(x, columns=["a", "b"])
def test_meta_from_recarray():
x = np.array(
[(i, i * 10) for i in range(10)], dtype=[("a", np.float64), ("b", np.int64)]
)
res = _meta_from_array(x)
assert isinstance(res, pd.DataFrame)
assert res["a"].dtype == np.float64
assert res["b"].dtype == np.int64
tm.assert_index_equal(res.columns, pd.Index(["a", "b"]))
res = _meta_from_array(x, columns=["b", "a"])
assert isinstance(res, pd.DataFrame)
assert res["a"].dtype == np.float64
assert res["b"].dtype == np.int64
tm.assert_index_equal(res.columns, pd.Index(["b", "a"]))
with pytest.raises(ValueError):
_meta_from_array(x, columns=["a", "b", "c"])
def test_from_array():
x = np.arange(10 * 3).reshape(10, 3)
d = dd.from_array(x, chunksize=4)
assert isinstance(d, dd.DataFrame)
tm.assert_index_equal(d.columns, pd.Index([0, 1, 2]))
assert d.divisions == (0, 4, 8, 9)
assert (d.compute().values == x).all()
d = dd.from_array(x, chunksize=4, columns=list("abc"))
assert isinstance(d, dd.DataFrame)
tm.assert_index_equal(d.columns, pd.Index(["a", "b", "c"]))
assert d.divisions == (0, 4, 8, 9)
assert (d.compute().values == x).all()
with pytest.raises(ValueError):
dd.from_array(np.ones(shape=(10, 10, 10)))
def test_from_array_with_record_dtype():
x = np.array([(i, i * 10) for i in range(10)], dtype=[("a", "i4"), ("b", "i4")])
d = dd.from_array(x, chunksize=4)
assert isinstance(d, dd.DataFrame)
assert list(d.columns) == ["a", "b"]
assert d.divisions == (0, 4, 8, 9)
assert (d.compute().to_records(index=False) == x).all()
def test_from_pandas_dataframe():
a = list("aaaaaaabbbbbbbbccccccc")
df = pd.DataFrame(
dict(a=a, b=np.random.randn(len(a))),
index=pd.date_range(start="20120101", periods=len(a)),
)
ddf = dd.from_pandas(df, 3)
assert len(ddf.dask) == 3
assert len(ddf.divisions) == len(ddf.dask) + 1
assert isinstance(ddf.divisions[0], type(df.index[0]))
tm.assert_frame_equal(df, ddf.compute())
ddf = dd.from_pandas(df, chunksize=8)
msg = "Exactly one of npartitions and chunksize must be specified."
with pytest.raises(ValueError) as err:
dd.from_pandas(df, npartitions=2, chunksize=2)
assert msg in str(err.value)
with pytest.raises((ValueError, AssertionError)) as err:
dd.from_pandas(df)
assert msg in str(err.value)
assert len(ddf.dask) == 3
assert len(ddf.divisions) == len(ddf.dask) + 1
assert isinstance(ddf.divisions[0], type(df.index[0]))
tm.assert_frame_equal(df, ddf.compute())
def test_from_pandas_small():
df = pd.DataFrame({"x": [1, 2, 3]})
for i in [1, 2, 30]:
a = dd.from_pandas(df, i)
assert len(a.compute()) == 3
assert a.divisions[0] == 0
assert a.divisions[-1] == 2
a = dd.from_pandas(df, chunksize=i)
assert len(a.compute()) == 3
assert a.divisions[0] == 0
assert a.divisions[-1] == 2
for sort in [True, False]:
for i in [0, 2]:
df = pd.DataFrame({"x": [0] * i})
ddf = dd.from_pandas(df, npartitions=5, sort=sort)
assert_eq(df, ddf)
s = pd.Series([0] * i, name="x", dtype=int)
ds = dd.from_pandas(s, npartitions=5, sort=sort)
assert_eq(s, ds)
@pytest.mark.parametrize("n", [1, 2, 4, 5])
def test_from_pandas_npartitions_is_accurate(n):
df = pd.DataFrame(
{"x": [1, 2, 3, 4, 5, 6], "y": list("abdabd")}, index=[10, 20, 30, 40, 50, 60]
)
assert dd.from_pandas(df, npartitions=n).npartitions <= n
def test_from_pandas_series():
n = 20
s = pd.Series(np.random.randn(n), index=pd.date_range(start="20120101", periods=n))
ds = dd.from_pandas(s, 3)
assert len(ds.dask) == 3
assert len(ds.divisions) == len(ds.dask) + 1
assert isinstance(ds.divisions[0], type(s.index[0]))
tm.assert_series_equal(s, ds.compute())
ds = dd.from_pandas(s, chunksize=8)
assert len(ds.dask) == 3
assert len(ds.divisions) == len(ds.dask) + 1
assert isinstance(ds.divisions[0], type(s.index[0]))
tm.assert_series_equal(s, ds.compute())
def test_from_pandas_non_sorted():
df = pd.DataFrame({"x": [1, 2, 3]}, index=[3, 1, 2])
ddf = dd.from_pandas(df, npartitions=2, sort=False)
assert not ddf.known_divisions
assert_eq(df, ddf)
ddf = dd.from_pandas(df, chunksize=2, sort=False)
assert not ddf.known_divisions
assert_eq(df, ddf)
def test_from_pandas_single_row():
df = pd.DataFrame({"x": [1]}, index=[1])
ddf = dd.from_pandas(df, npartitions=1)
assert ddf.divisions == (1, 1)
assert_eq(ddf, df)
def test_from_pandas_with_datetime_index():
df = pd.DataFrame(
{
"Date": [
"2015-08-28",
"2015-08-27",
"2015-08-26",
"2015-08-25",
"2015-08-24",
"2015-08-21",
"2015-08-20",
"2015-08-19",
"2015-08-18",
],
"Val": list(range(9)),
}
)
df.Date = df.Date.astype("datetime64[ns]")
ddf = dd.from_pandas(df, 2)
assert_eq(df, ddf)
ddf = dd.from_pandas(df, chunksize=2)
assert_eq(df, ddf)
@pytest.mark.parametrize("null_value", [None, pd.NaT, pd.NA])
def test_from_pandas_with_index_nulls(null_value):
df = pd.DataFrame({"x": [1, 2, 3]}, index=["C", null_value, "A"])
with pytest.raises(NotImplementedError, match="is non-numeric and contains nulls"):
dd.from_pandas(df, npartitions=2, sort=False)
def test_from_pandas_with_wrong_args():
df = pd.DataFrame({"x": [1, 2, 3]}, index=[3, 2, 1])
with pytest.raises(TypeError, match="must be a pandas DataFrame or Series"):
dd.from_pandas("foo")
with pytest.raises(
ValueError, match="one of npartitions and chunksize must be specified"
):
dd.from_pandas(df)
with pytest.raises(TypeError, match="provide npartitions as an int"):
dd.from_pandas(df, npartitions=5.2, sort=False)
with pytest.raises(TypeError, match="provide chunksize as an int"):
dd.from_pandas(df, chunksize=18.27)
def test_from_pandas_chunksize_one():
# See: https://github.com/dask/dask/issues/9218
df = pd.DataFrame(np.random.randint(0, 10, size=(10, 4)), columns=list("ABCD"))
ddf = dd.from_pandas(df, chunksize=1)
num_rows = list(ddf.map_partitions(len).compute())
# chunksize=1 with range index should
# always have unit-length partitions
assert num_rows == [1] * 10
@pytest.mark.parametrize(
"index",
[
["A", "B", "C", "C", "C", "C", "C", "C"],
["A", "B", "B", "B", "B", "B", "B", "C"],
["A", "A", "A", "A", "A", "B", "B", "C"],
],
)
def test_from_pandas_npartitions_duplicates(index):
df = pd.DataFrame({"a": range(8), "index": index}).set_index("index")
ddf = dd.from_pandas(df, npartitions=3)
assert ddf.divisions == ("A", "B", "C", "C")
@pytest.mark.gpu
def test_gpu_from_pandas_npartitions_duplicates():
cudf = pytest.importorskip("cudf")
index = ["A", "A", "A", "A", "A", "B", "B", "C"]
df = cudf.DataFrame({"a": range(8), "index": index}).set_index("index")
ddf = dd.from_pandas(df, npartitions=3)
assert ddf.divisions == ("A", "B", "C", "C")
def test_DataFrame_from_dask_array():
x = da.ones((10, 3), chunks=(4, 2))
pdf = pd.DataFrame(np.ones((10, 3)), columns=["a", "b", "c"])
df = dd.from_dask_array(x, ["a", "b", "c"])
assert not hlg_layer_topological(df.dask, -1).is_materialized()
assert_eq(df, pdf)
# dd.from_array should re-route to from_dask_array
df2 = dd.from_array(x, columns=["a", "b", "c"])
assert not hlg_layer_topological(df2.dask, -1).is_materialized()
assert_eq(df, df2)
def test_DataFrame_from_dask_array_with_blockwise_ops():
x = da.ones((10, 3), chunks=(4, 2))
x *= 2
pdf = pd.DataFrame(np.ones((10, 3)) * 2, columns=["a", "b", "c"])
df = dd.from_dask_array(x, ["a", "b", "c"])
# None of the layers in this graph should be materialized, everything should
# be a HighLevelGraph still.
assert all(
not hlg_layer_topological(df.dask, i).is_materialized()
for i in range(len(df.dask.layers))
)
assert_eq(df, pdf)
def test_Series_from_dask_array():
x = da.ones(10, chunks=4)
pser = pd.Series(np.ones(10), name="a")
ser = dd.from_dask_array(x, "a")
assert_eq(ser, pser)
# Not passing a name should result in the name == None
pser = pd.Series(np.ones(10))
ser = dd.from_dask_array(x)
assert_eq(ser, pser)
# dd.from_array should re-route to from_dask_array
ser2 = dd.from_array(x)
assert_eq(ser, ser2)
@pytest.mark.parametrize("as_frame", [True, False])
def test_from_dask_array_index(as_frame):
s = dd.from_pandas(pd.Series(range(10), index=list("abcdefghij")), npartitions=3)
if as_frame:
s = s.to_frame()
result = dd.from_dask_array(s.values, index=s.index)
assert_eq(s, result)
def test_from_dask_array_index_raises():
x = da.random.uniform(size=(10,), chunks=(5,))
with pytest.raises(ValueError, match="must be an instance"):
dd.from_dask_array(x, index=pd.Index(np.arange(10)))
a = dd.from_pandas(pd.Series(range(12)), npartitions=2)
b = dd.from_pandas(pd.Series(range(12)), npartitions=4)
with pytest.raises(ValueError, match=".*index.*numbers of blocks.*4 != 2"):
dd.from_dask_array(a.values, index=b.index)
def test_from_array_raises_more_than_2D():
x = da.ones((3, 3, 3), chunks=2)
y = np.ones((3, 3, 3))
with pytest.raises(ValueError, match="more than 2D array"):
dd.from_dask_array(x) # dask
with pytest.raises(ValueError, match="more than 2D array"):
dd.from_array(y) # numpy
def test_from_dask_array_compat_numpy_array():
x = da.ones((10, 3), chunks=(3, 3))
y = np.ones((10, 3))
d1 = dd.from_dask_array(x) # dask
p1 = pd.DataFrame(y)
assert_eq(d1, p1)
d2 = dd.from_array(y) # numpy
assert_eq(d2, d1)
def test_from_array_wrong_column_shape_error():
x = da.ones((10, 3), chunks=(3, 3))
with pytest.raises(ValueError, match="names must match width"):
dd.from_dask_array(x, columns=["a"]) # dask
y = np.ones((10, 3))
with pytest.raises(ValueError, match="names must match width"):
dd.from_array(y, columns=["a"]) # numpy
def test_from_array_with_column_names():
x = da.ones((10, 3), chunks=(3, 3))
y = np.ones((10, 3))
d1 = dd.from_dask_array(x, columns=["a", "b", "c"]) # dask
p1 = pd.DataFrame(y, columns=["a", "b", "c"])
assert_eq(d1, p1)
d2 = dd.from_array(y, columns=["a", "b", "c"]) # numpy
assert_eq(d1, d2)
def test_from_dask_array_compat_numpy_array_1d():
x = da.ones(10, chunks=3)
y = np.ones(10)
d1 = dd.from_dask_array(x) # dask
p1 = pd.Series(y)
assert_eq(d1, p1)
d2 = dd.from_array(y) # numpy
assert_eq(d2, d1)
def test_from_array_1d_with_column_names():
x = da.ones(10, chunks=3)
y = np.ones(10)
d1 = dd.from_dask_array(x, columns="name") # dask
p1 = pd.Series(y, name="name")
assert_eq(d1, p1)
d2 = dd.from_array(x.compute(), columns="name") # numpy
assert_eq(d2, d1)
def test_from_array_1d_list_of_columns_gives_dataframe():
x = da.ones(10, chunks=3)
y = np.ones(10)
# passing list via columns results in DataFrame
d1 = dd.from_dask_array(x, columns=["name"]) # dask
p1 = pd.DataFrame(y, columns=["name"])
assert_eq(d1, p1)
d2 = dd.from_array(y, columns=["name"]) # numpy
assert_eq(d2, d1)
def test_from_dask_array_struct_dtype():
x = np.array([(1, "a"), (2, "b")], dtype=[("a", "i4"), ("b", "object")])
y = da.from_array(x, chunks=(1,))
df = dd.from_dask_array(y)
tm.assert_index_equal(df.columns, pd.Index(["a", "b"]))
assert_eq(df, pd.DataFrame(x))
assert_eq(
dd.from_dask_array(y, columns=["b", "a"]), pd.DataFrame(x, columns=["b", "a"])
)
def test_from_dask_array_unknown_chunks():
# Series
dx = da.Array(
{("x", 0): np.arange(5), ("x", 1): np.arange(5, 11)},
"x",
((np.nan, np.nan),),
np.arange(1).dtype,
)
df = dd.from_dask_array(dx)
assert isinstance(df, dd.Series)
assert not df.known_divisions
assert_eq(df, pd.Series(np.arange(11)), check_index=False)
# DataFrame
dsk = {("x", 0, 0): np.random.random((2, 3)), ("x", 1, 0): np.random.random((5, 3))}
dx = da.Array(dsk, "x", ((np.nan, np.nan), (3,)), np.float64)
df = dd.from_dask_array(dx)
assert isinstance(df, dd.DataFrame)
assert not df.known_divisions
assert_eq(df, pd.DataFrame(dx.compute()), check_index=False)
@pytest.mark.parametrize(
"chunksizes, expected_divisions",
[
pytest.param((1, 2, 3, 0), (0, 1, 3, 5, 5)),
pytest.param((0, 1, 2, 3), (0, 0, 1, 3, 5)),
pytest.param((1, 0, 2, 3), (0, 1, 1, 3, 5)),
],
)
def test_from_dask_array_empty_chunks(chunksizes, expected_divisions):
monotonic_index = da.from_array(np.arange(6), chunks=chunksizes)
df = dd.from_dask_array(monotonic_index)
assert df.divisions == expected_divisions
def test_from_dask_array_unknown_width_error():
dsk = {("x", 0, 0): np.random.random((2, 3)), ("x", 1, 0): np.random.random((5, 3))}
dx = da.Array(dsk, "x", ((np.nan, np.nan), (np.nan,)), np.float64)
with pytest.raises(ValueError, match="Shape along axis 1 must be known"):
dd.from_dask_array(dx)
def test_to_bag():
a = pd.DataFrame(
{"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]},
index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"),
)
ddf = dd.from_pandas(a, 2)
assert ddf.to_bag().compute() == list(a.itertuples(False))
assert ddf.to_bag(True).compute() == list(a.itertuples(True))
assert ddf.to_bag(format="dict").compute() == [
{"x": "a", "y": 2},
{"x": "b", "y": 3},
{"x": "c", "y": 4},
{"x": "d", "y": 5},
]
assert ddf.to_bag(True, format="dict").compute() == [
{"index": 1.0, "x": "a", "y": 2},
{"index": 2.0, "x": "b", "y": 3},
{"index": 3.0, "x": "c", "y": 4},
{"index": 4.0, "x": "d", "y": 5},
]
assert ddf.x.to_bag(True).compute() == list(a.x.items())
assert ddf.x.to_bag().compute() == list(a.x)
assert ddf.x.to_bag(True, format="dict").compute() == [
{"x": "a"},
{"x": "b"},
{"x": "c"},
{"x": "d"},
]
assert ddf.x.to_bag(format="dict").compute() == [
{"x": "a"},
{"x": "b"},
{"x": "c"},
{"x": "d"},
]
def test_to_bag_frame():
from dask import get
from dask.bag import Bag
ddf = dd.from_pandas(
pd.DataFrame(
{"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]},
index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"),
),
npartitions=2,
)
# Convert to bag, and check that
# collection type has changed, but
# partition data has not
bagdf = ddf.to_bag(format="frame")
assert isinstance(bagdf, Bag)
assert_eq(get(bagdf.dask, (bagdf.name, 0)), ddf.partitions[0])
assert_eq(get(bagdf.dask, (bagdf.name, 1)), ddf.partitions[1])
def test_to_records():
pytest.importorskip("dask.array")
from dask.array.utils import assert_eq
df = pd.DataFrame(
{"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]},
index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"),
)
ddf = dd.from_pandas(df, 2)
assert_eq(
df.to_records(), ddf.to_records(), check_type=False
) # TODO: make check_type pass
@pytest.mark.parametrize("lengths", [[2, 2], True])
def test_to_records_with_lengths(lengths):
pytest.importorskip("dask.array")
from dask.array.utils import assert_eq
df = pd.DataFrame(
{"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]},
index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"),
)
ddf = dd.from_pandas(df, 2)
result = ddf.to_records(lengths=lengths)
assert_eq(df.to_records(), result, check_type=False) # TODO: make check_type pass
assert isinstance(result, da.Array)
expected_chunks = ((2, 2),)
assert result.chunks == expected_chunks
def test_to_records_raises():
pytest.importorskip("dask.array")
df = pd.DataFrame(
{"x": ["a", "b", "c", "d"], "y": [2, 3, 4, 5]},
index=pd.Index([1.0, 2.0, 3.0, 4.0], name="ind"),
)
ddf = dd.from_pandas(df, 2)
with pytest.raises(ValueError):
ddf.to_records(lengths=[2, 2, 2])
pytest.fail("3 != 2")
with pytest.raises(ValueError):
ddf.to_records(lengths=5)
pytest.fail("Unexpected value")
def test_from_delayed():
df = pd.DataFrame(data=np.random.normal(size=(10, 4)), columns=list("abcd"))
parts = [df.iloc[:1], df.iloc[1:3], df.iloc[3:6], df.iloc[6:10]]
dfs = [delayed(parts.__getitem__)(i) for i in range(4)]
meta = dfs[0].compute()
my_len = lambda x: pd.Series([len(x)])
for divisions in [None, [0, 1, 3, 6, 10]]:
ddf = dd.from_delayed(dfs, meta=meta, divisions=divisions)
assert_eq(ddf, df)
assert list(ddf.map_partitions(my_len).compute()) == [1, 2, 3, 4]
assert ddf.known_divisions == (divisions is not None)
s = dd.from_delayed([d.a for d in dfs], meta=meta.a, divisions=divisions)
assert_eq(s, df.a)
assert list(s.map_partitions(my_len).compute()) == [1, 2, 3, 4]
assert ddf.known_divisions == (divisions is not None)
meta2 = [(c, "f8") for c in df.columns]
# Make sure `from_delayed` is Blockwise
check_ddf = dd.from_delayed(dfs, meta=meta2)
assert isinstance(check_ddf.dask.layers[check_ddf._name], Blockwise)
assert_eq(dd.from_delayed(dfs, meta=meta2), df)
assert_eq(dd.from_delayed([d.a for d in dfs], meta=("a", "f8")), df.a)
with pytest.raises(ValueError):
dd.from_delayed(dfs, meta=meta, divisions=[0, 1, 3, 6])
with pytest.raises(ValueError) as e:
dd.from_delayed(dfs, meta=meta.a).compute()
assert str(e.value).startswith("Metadata mismatch found in `from_delayed`")
def test_from_delayed_optimize_fusion():
# Test that DataFrame optimization fuses a `from_delayed`
# layer with other Blockwise layers and input Delayed tasks.
# See: https://github.com/dask/dask/pull/8852
ddf = (
dd.from_delayed(
map(delayed(lambda x: pd.DataFrame({"x": [x] * 10})), range(10)),
meta=pd.DataFrame({"x": [0] * 10}),
)
+ 1
)
# NOTE: Fusion requires `optimize_blockwise`` and `fuse_roots`
assert isinstance(ddf.dask.layers[ddf._name], Blockwise)
assert len(optimize(ddf.dask, ddf.__dask_keys__()).layers) == 1
def test_from_delayed_to_dask_array():
# Check that `from_delayed`` can be followed
# by `to_dask_array` without breaking
# optimization behavior
# See: https://github.com/dask-contrib/dask-sql/issues/497
from dask.blockwise import optimize_blockwise
dfs = [delayed(pd.DataFrame)(np.ones((3, 2))) for i in range(3)]
ddf = dd.from_delayed(dfs)
arr = ddf.to_dask_array()
# If we optimize this graph without calling
# `fuse_roots`, the underlying `BlockwiseDep`
# `mapping` keys will be 1-D (e.g. `(4,)`),
# while the collection keys will be 2-D
# (e.g. `(4, 0)`)
keys = [k[0] for k in arr.__dask_keys__()]
dsk = optimize_blockwise(arr.dask, keys=keys)
dsk.cull(keys)
result = arr.compute()
assert result.shape == (9, 2)
def test_from_delayed_preserves_hlgs():
df = pd.DataFrame(data=np.random.normal(size=(10, 4)), columns=list("abcd"))
parts = [df.iloc[:1], df.iloc[1:3], df.iloc[3:6], df.iloc[6:10]]
dfs = [delayed(parts.__getitem__)(i) for i in range(4)]
meta = dfs[0].compute()
chained = [d.a for d in dfs]
hlg = dd.from_delayed(chained, meta=meta).dask
for d in chained:
for layer_name, layer in d.dask.layers.items():
assert hlg.layers[layer_name] == layer
assert hlg.dependencies[layer_name] == d.dask.dependencies[layer_name]
def test_from_delayed_misordered_meta():
df = pd.DataFrame(
columns=["(1)", "(2)", "date", "ent", "val"],
data=[range(i * 5, i * 5 + 5) for i in range(3)],
index=range(3),
)
# meta with different order for columns
misordered_meta = pd.DataFrame(
columns=["date", "ent", "val", "(1)", "(2)"], data=[range(5)]
)
ddf = dd.from_delayed([delayed(lambda: df)()], meta=misordered_meta)
with pytest.raises(ValueError) as info:
# produces dataframe which does not match meta
ddf.reset_index().compute(scheduler="sync")
msg = (
"The columns in the computed data do not match the columns in the"
" provided metadata"
)
assert msg in str(info.value)
def test_from_delayed_sorted():
a = pd.DataFrame({"x": [1, 2]}, index=[1, 10])
b = pd.DataFrame({"x": [4, 1]}, index=[100, 200])
A = dd.from_delayed([delayed(a), delayed(b)], divisions="sorted")
assert A.known_divisions
assert A.divisions == (1, 100, 200)
def test_to_delayed():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
ddf = dd.from_pandas(df, npartitions=2)
# Frame
a, b = ddf.to_delayed()
assert isinstance(a, Delayed)
assert isinstance(b, Delayed)
assert_eq(a.compute(), df.iloc[:2])
# Scalar
x = ddf.x.sum()
dx = x.to_delayed()
assert isinstance(dx, Delayed)
assert_eq(dx.compute(), x)
def test_to_delayed_optimize_graph():
df = pd.DataFrame({"x": list(range(20))})
ddf = dd.from_pandas(df, npartitions=20)
ddf2 = (ddf + 1).loc[:2]
# Frame
d = ddf2.to_delayed()[0]
assert len(d.dask) < 20
d2 = ddf2.to_delayed(optimize_graph=False)[0]
assert sorted(d2.dask) == sorted(ddf2.dask)
assert_eq(ddf2.get_partition(0), d.compute())
assert_eq(ddf2.get_partition(0), d2.compute())
# Scalar
x = ddf2.x.sum()
dx = x.to_delayed()
dx2 = x.to_delayed(optimize_graph=False)
assert len(dx.dask) < len(dx2.dask)
assert_eq(dx.compute(), dx2.compute())
def test_from_dask_array_index_dtype():
x = da.ones((10,), chunks=(5,))
df = pd.DataFrame(
{
"date": pd.date_range("2019-01-01", periods=10, freq="1T"),
"val1": list(range(10)),
}
)
ddf = dd.from_pandas(df, npartitions=2).set_index("date")
ddf2 = dd.from_dask_array(x, index=ddf.index, columns="val2")
assert ddf.index.dtype == ddf2.index.dtype
assert ddf.index.name == ddf2.index.name
df = pd.DataFrame({"idx": np.arange(0, 1, 0.1), "val1": list(range(10))})
ddf = dd.from_pandas(df, npartitions=2).set_index("idx")
ddf2 = dd.from_dask_array(x, index=ddf.index, columns="val2")
assert ddf.index.dtype == ddf2.index.dtype
assert ddf.index.name == ddf2.index.name
@pytest.mark.parametrize(
"vals",
[
("A", "B"),
(3, 4),
(datetime(2020, 10, 1), datetime(2022, 12, 31)),
],
)
def test_from_map_simple(vals):
# Simple test to ensure required inputs (func & iterable)
# and basic kwargs work as expected for `from_map`
def func(input, size=0):
# Simple function to create Series with a
# repeated value and index
value, index = input
return pd.Series([value] * size, index=[index] * size)
iterable = [(vals[0], 1), (vals[1], 2)]
ser = dd.from_map(func, iterable, size=2)
expect = pd.Series(
[vals[0], vals[0], vals[1], vals[1]],
index=[1, 1, 2, 2],
)
# Make sure `from_map` produces single `Blockwise` layer
layers = ser.dask.layers
assert len(layers) == 1
assert isinstance(layers[ser._name], Blockwise)
# Check that result and partition count make sense
assert ser.npartitions == len(iterable)
assert_eq(ser, expect)
def test_from_map_multi():
# Test that `iterables` can contain multiple Iterables
func = lambda x, y: pd.DataFrame({"add": x + y})
iterables = (
[np.arange(2, dtype="int64"), np.arange(2, dtype="int64")],
[np.array([2, 2], dtype="int64"), np.array([2, 2], dtype="int64")],
)
index = np.array([0, 1, 0, 1], dtype="int64")
expect = pd.DataFrame({"add": np.array([2, 3, 2, 3], dtype="int64")}, index=index)
ddf = dd.from_map(func, *iterables)
assert_eq(ddf, expect)
def test_from_map_args():
# Test that the optional `args` argument works as expected
func = lambda x, y, z: pd.DataFrame({"add": x + y + z})
iterable = [np.arange(2, dtype="int64"), np.arange(2, dtype="int64")]
index = np.array([0, 1, 0, 1], dtype="int64")
expect = pd.DataFrame({"add": np.array([5, 6, 5, 6], dtype="int64")}, index=index)
ddf = dd.from_map(func, iterable, args=[2, 3])
assert_eq(ddf, expect)
def test_from_map_divisions():
# Test that `divisions` argument works as expected for `from_map`
func = lambda x: pd.Series([x[0]] * 2, index=range(x[1], x[1] + 2))
iterable = [("B", 0), ("C", 2)]
divisions = (0, 2, 4)
ser = dd.from_map(func, iterable, divisions=divisions)
expect = pd.Series(
["B", "B", "C", "C"],
index=[0, 1, 2, 3],
)
assert ser.divisions == divisions
assert_eq(ser, expect)
def test_from_map_meta():
# Test that `meta` can be specified to `from_map`,
# and that `enforce_metadata` works as expected
func = lambda x, s=0: pd.DataFrame({"x": [x] * s})
iterable = ["A", "B"]
expect = pd.DataFrame({"x": ["A", "A", "B", "B"]}, index=[0, 1, 0, 1])
# First Check - Pass in valid metadata
meta = pd.DataFrame({"x": ["A"]}).iloc[:0]
ddf = dd.from_map(func, iterable, meta=meta, s=2)
assert_eq(ddf._meta, meta)
assert_eq(ddf, expect)
# Second Check - Pass in invalid metadata
meta = pd.DataFrame({"a": ["A"]}).iloc[:0]
ddf = dd.from_map(func, iterable, meta=meta, s=2)
assert_eq(ddf._meta, meta)
with pytest.raises(ValueError, match="The columns in the computed data"):
assert_eq(ddf.compute(), expect)
# Third Check - Pass in invalid metadata,
# but use `enforce_metadata=False`
ddf = dd.from_map(func, iterable, meta=meta, enforce_metadata=False, s=2)
assert_eq(ddf._meta, meta)
assert_eq(ddf.compute(), expect)
def test_from_map_custom_name():
# Test that `label` and `token` arguments to
# `from_map` works as expected
func = lambda x: pd.DataFrame({"x": [x] * 2})
iterable = ["A", "B"]
label = "my-label"
token = "8675309"
expect = pd.DataFrame({"x": ["A", "A", "B", "B"]}, index=[0, 1, 0, 1])
ddf = dd.from_map(func, iterable, label=label, token=token)
assert ddf._name == label + "-" + token
assert_eq(ddf, expect)
def _generator():
# Simple generator for test_from_map_other_iterables
yield from enumerate(["A", "B", "C"])
@pytest.mark.parametrize(
"iterable",
[
enumerate(["A", "B", "C"]),
((0, "A"), (1, "B"), (2, "C")),
_generator(),
],
)
def test_from_map_other_iterables(iterable):
# Test that iterable arguments to `from_map`
# can be enumerate and generator
# See: https://github.com/dask/dask/issues/9064
def func(t):
size = t[0] + 1
x = t[1]
return pd.Series([x] * size)
ddf = dd.from_map(func, iterable)
expect = pd.Series(
["A", "B", "B", "C", "C", "C"],
index=[0, 0, 1, 0, 1, 2],
)
assert_eq(ddf.compute(), expect)
def test_from_map_column_projection():
# Test that column projection works
# as expected with from_map when
# enforce_metadata=True
projected = []
class MyFunc:
def __init__(self, columns=None):
self.columns = columns
def project_columns(self, columns):
return MyFunc(columns)
def __call__(self, t):
size = t[0] + 1
x = t[1]
df = pd.DataFrame({"A": [x] * size, "B": [10] * size})
if self.columns is None:
return df
projected.extend(self.columns)
return df[self.columns]
ddf = dd.from_map(
MyFunc(),
enumerate([0, 1, 2]),
label="myfunc",
enforce_metadata=True,
)
expect = pd.DataFrame(
{
"A": [0, 1, 1, 2, 2, 2],
"B": [10] * 6,
},
index=[0, 0, 1, 0, 1, 2],
)
assert_eq(ddf["A"], expect["A"])
assert set(projected) == {"A"}
assert_eq(ddf, expect)