Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
dask / dask / dataframe / io / tests / test_demo.py
Size: Mime:
import pandas as pd

import dask
import dask.dataframe as dd
from dask.blockwise import Blockwise, optimize_blockwise
from dask.dataframe._compat import tm
from dask.dataframe.optimize import optimize_dataframe_getitem
from dask.dataframe.utils import assert_eq


def test_make_timeseries():
    df = dd.demo.make_timeseries(
        "2000", "2015", {"A": float, "B": int, "C": str}, freq="2D", partition_freq="6M"
    )

    assert df.divisions[0] == pd.Timestamp("2000-01-31")
    assert df.divisions[-1] == pd.Timestamp("2014-07-31")
    tm.assert_index_equal(df.columns, pd.Index(["A", "B", "C"]))
    assert df["A"].head().dtype == float
    assert df["B"].head().dtype == int
    assert df["C"].head().dtype == object
    assert df.index.name == "timestamp"
    assert df.head().index.name == df.index.name
    assert df.divisions == tuple(pd.date_range(start="2000", end="2015", freq="6M"))

    tm.assert_frame_equal(df.head(), df.head())

    a = dd.demo.make_timeseries(
        "2000",
        "2015",
        {"A": float, "B": int, "C": str},
        freq="2D",
        partition_freq="6M",
        seed=123,
    )
    b = dd.demo.make_timeseries(
        "2000",
        "2015",
        {"A": float, "B": int, "C": str},
        freq="2D",
        partition_freq="6M",
        seed=123,
    )
    c = dd.demo.make_timeseries(
        "2000",
        "2015",
        {"A": float, "B": int, "C": str},
        freq="2D",
        partition_freq="6M",
        seed=456,
    )
    d = dd.demo.make_timeseries(
        "2000",
        "2015",
        {"A": float, "B": int, "C": str},
        freq="2D",
        partition_freq="3M",
        seed=123,
    )
    e = dd.demo.make_timeseries(
        "2000",
        "2015",
        {"A": float, "B": int, "C": str},
        freq="1D",
        partition_freq="6M",
        seed=123,
    )
    tm.assert_frame_equal(a.head(), b.head())
    assert not (a.head(10) == c.head(10)).all().all()
    assert a._name == b._name
    assert a._name != c._name
    assert a._name != d._name
    assert a._name != e._name


def test_make_timeseries_no_args():
    df = dd.demo.make_timeseries()
    assert 1 < df.npartitions < 1000
    assert len(df.columns) > 1
    assert len(set(df.dtypes)) > 1


def test_make_timeseries_blockwise():
    df = dd.demo.make_timeseries()
    df = df[["x", "y"]]
    keys = [(df._name, i) for i in range(df.npartitions)]

    # Check that `optimize_dataframe_getitem` changes the
    # `columns` attribute of the "make-timeseries" layer
    graph = optimize_dataframe_getitem(df.__dask_graph__(), keys)
    key = [k for k in graph.layers.keys() if k.startswith("make-timeseries-")][0]
    assert set(graph.layers[key].columns) == {"x", "y"}

    # Check that `optimize_blockwise` fuses both
    # `Blockwise` layers together into a singe `Blockwise` layer
    graph = optimize_blockwise(df.__dask_graph__(), keys)
    layers = graph.layers
    name = list(layers.keys())[0]
    assert len(layers) == 1
    assert isinstance(layers[name], Blockwise)


def test_no_overlaps():
    df = dd.demo.make_timeseries(
        "2000", "2001", {"A": float}, freq="3H", partition_freq="3M"
    )

    assert all(
        df.get_partition(i).index.max().compute()
        < df.get_partition(i + 1).index.min().compute()
        for i in range(df.npartitions - 2)
    )


def test_make_timeseries_keywords():
    df = dd.demo.make_timeseries(
        "2000",
        "2001",
        {"A": int, "B": int, "C": str},
        freq="1D",
        partition_freq="6M",
        A_lam=1000000,
        B_lam=2,
    )
    a_cardinality = df.A.nunique()
    b_cardinality = df.B.nunique()

    aa, bb = dask.compute(a_cardinality, b_cardinality, scheduler="single-threaded")

    assert 100 < aa <= 10000000
    assert 1 < bb <= 100


def test_make_timeseries_fancy_keywords():
    df = dd.demo.make_timeseries(
        "2000",
        "2001",
        {"A_B": int, "B_": int, "C": str},
        freq="1D",
        partition_freq="6M",
        A_B_lam=1000000,
        B__lam=2,
    )
    a_cardinality = df.A_B.nunique()
    b_cardinality = df.B_.nunique()

    aa, bb = dask.compute(a_cardinality, b_cardinality, scheduler="single-threaded")

    assert 100 < aa <= 10000000
    assert 1 < bb <= 100


def test_make_timeseries_getitem_compute():
    # See https://github.com/dask/dask/issues/7692

    df = dd.demo.make_timeseries()
    df2 = df[df.y > 0]
    df3 = df2.compute()
    assert df3["y"].min() > 0
    assert list(df.columns) == list(df3.columns)


def test_make_timeseries_column_projection():
    ddf = dd.demo.make_timeseries(
        "2001", "2002", freq="1D", partition_freq="3M", seed=42
    )

    assert_eq(ddf[["x"]].compute(), ddf.compute()[["x"]])
    assert_eq(
        ddf.groupby("name").aggregate({"x": "sum", "y": "max"}).compute(),
        ddf.compute().groupby("name").aggregate({"x": "sum", "y": "max"}),
    )