Repository URL to install this package:
|
Version:
2022.10.0 ▾
|
import json
import os
import pandas as pd
import pytest
import dask
import dask.dataframe as dd
from dask.dataframe.utils import assert_eq
from dask.utils import tmpdir, tmpfile
df = pd.DataFrame({"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]})
ddf = dd.from_pandas(df, npartitions=2)
@pytest.mark.parametrize("orient", ["split", "records", "index", "columns", "values"])
def test_read_json_with_path_column(orient):
with tmpfile("json") as f:
df.to_json(f, orient=orient, lines=False)
actual = dd.read_json(f, orient=orient, lines=False, include_path_column=True)
actual_pd = pd.read_json(f, orient=orient, lines=False)
# The default column name when include_path_colum is True is "path"
# The paths on Windows are converted to forward slash somewhere in the file
# reading chain in Dask, so we have to do the same here.
actual_pd["path"] = pd.Series(
(f.replace(os.sep, "/"),) * len(actual_pd), dtype="category"
)
assert actual.path.dtype == "category"
assert_eq(actual, actual_pd)
def test_read_json_path_column_with_duplicate_name_is_error():
with tmpfile("json") as f:
df.to_json(f, orient="records", lines=False)
with pytest.raises(ValueError, match="Files already contain"):
dd.read_json(f, orient="records", lines=False, include_path_column="x")
def test_read_json_with_path_converter():
path_column_name = "filenames"
def path_converter(x):
return "asdf.json"
with tmpfile("json") as f:
df.to_json(f, orient="records", lines=False)
actual = dd.read_json(
f,
orient="records",
lines=False,
include_path_column=path_column_name,
path_converter=path_converter,
)
actual_pd = pd.read_json(f, orient="records", lines=False)
actual_pd[path_column_name] = pd.Series(
(path_converter(f),) * len(actual_pd), dtype="category"
)
assert_eq(actual, actual_pd)
def test_read_orient_not_records_and_lines():
with pytest.raises(ValueError, match="Line-delimited JSON"):
dd.read_json("nofile.json", orient="split", lines=True)
def test_write_orient_not_records_and_lines():
with tmpfile("json") as f:
with pytest.raises(ValueError, match="Line-delimited JSON"):
dd.to_json(ddf, f, orient="split", lines=True)
@pytest.mark.parametrize("blocksize", [5, 15, 33, 200, 90000])
def test_read_json_multiple_files_with_path_column(blocksize, tmpdir):
fil1 = str(tmpdir.join("fil1.json")).replace(os.sep, "/")
fil2 = str(tmpdir.join("fil2.json")).replace(os.sep, "/")
df = pd.DataFrame({"x": range(5), "y": ["a", "b", "c", "d", "e"]})
df2 = df.assign(x=df.x + 0.5)
orient = "records"
lines = True
df.to_json(fil1, orient=orient, lines=lines)
df2.to_json(fil2, orient=orient, lines=lines)
path_dtype = pd.CategoricalDtype((fil1, fil2))
df["path"] = pd.Series((fil1,) * len(df), dtype=path_dtype)
df2["path"] = pd.Series((fil2,) * len(df2), dtype=path_dtype)
sol = pd.concat([df, df2])
res = dd.read_json(
str(tmpdir.join("fil*.json")),
orient=orient,
lines=lines,
include_path_column=True,
blocksize=blocksize,
)
assert_eq(res, sol, check_index=False)
@pytest.mark.parametrize("orient", ["split", "records", "index", "columns", "values"])
def test_read_json_basic(orient):
with tmpfile("json") as f:
df.to_json(f, orient=orient, lines=False)
actual = dd.read_json(f, orient=orient, lines=False)
actual_pd = pd.read_json(f, orient=orient, lines=False)
assert_eq(actual, actual_pd)
if orient == "values":
actual.columns = list(df.columns)
assert_eq(actual, df)
@pytest.mark.parametrize("fkeyword", ["pandas", "json"])
def test_read_json_fkeyword(fkeyword):
def _my_json_reader(*args, **kwargs):
if fkeyword == "json":
return pd.DataFrame.from_dict(json.load(*args))
return pd.read_json(*args)
with tmpfile("json") as f:
df.to_json(f, orient="records", lines=False)
actual = dd.read_json(f, orient="records", lines=False, engine=_my_json_reader)
actual_pd = pd.read_json(f, orient="records", lines=False)
assert_eq(actual, actual_pd)
@pytest.mark.parametrize("orient", ["split", "records", "index", "columns", "values"])
def test_read_json_meta(orient, tmpdir):
df = pd.DataFrame({"x": range(5), "y": ["a", "b", "c", "d", "e"]})
df2 = df.assign(x=df.x + 0.5)
lines = orient == "records"
df.to_json(str(tmpdir.join("fil1.json")), orient=orient, lines=lines)
df2.to_json(str(tmpdir.join("fil2.json")), orient=orient, lines=lines)
sol = pd.concat([df, df2])
meta = df2.iloc[:0]
if orient == "values":
# orient=values loses column names
sol.columns = meta.columns = [0, 1]
res = dd.read_json(
str(tmpdir.join("fil*.json")), orient=orient, meta=meta, lines=lines
)
assert_eq(res, sol)
if orient == "records":
# Also check chunked version
res = dd.read_json(
str(tmpdir.join("fil*.json")),
orient=orient,
meta=meta,
lines=True,
blocksize=50,
)
assert_eq(res, sol, check_index=False)
@pytest.mark.parametrize("orient", ["split", "records", "index", "columns", "values"])
def test_write_json_basic(orient):
with tmpdir() as path:
fn = os.path.join(path, "1.json")
df.to_json(fn, orient=orient, lines=False)
actual = dd.read_json(fn, orient=orient, lines=False)
if orient == "values":
actual.columns = list(df.columns)
assert_eq(actual, df)
def test_to_json_with_get():
from dask.multiprocessing import get as mp_get
flag = [False]
def my_get(*args, **kwargs):
flag[0] = True
return mp_get(*args, **kwargs)
df = pd.DataFrame({"x": ["a", "b", "c", "d"], "y": [1, 2, 3, 4]})
ddf = dd.from_pandas(df, npartitions=2)
with tmpdir() as dn:
ddf.to_json(dn, compute_kwargs={"scheduler": my_get})
assert flag[0]
result = dd.read_json(os.path.join(dn, "*"))
assert_eq(result, df, check_index=False)
def test_read_json_error():
with tmpfile("json") as f:
with pytest.raises(ValueError):
df.to_json(f, orient="split", lines=True)
df.to_json(f, orient="split", lines=False)
with pytest.raises(ValueError):
dd.read_json(f, orient="split", blocksize=1)
@pytest.mark.parametrize("block", [5, 15, 33, 200, 90000])
def test_read_chunked(block):
with tmpdir() as path:
fn = os.path.join(path, "1.json")
df.to_json(fn, orient="records", lines=True)
d = dd.read_json(fn, blocksize=block, sample=10)
assert (d.npartitions > 1) or (block > 30)
assert_eq(d, df, check_index=False)
@pytest.mark.parametrize("compression", [None, "gzip", "xz"])
def test_json_compressed(compression):
with tmpdir() as path:
dd.to_json(ddf, path, compression=compression)
actual = dd.read_json(os.path.join(path, "*"), compression=compression)
assert_eq(df, actual, check_index=False)
def test_read_json_inferred_compression():
with tmpdir() as path:
fn = os.path.join(path, "*.json.gz")
dd.to_json(ddf, fn, compression="gzip")
actual = dd.read_json(fn)
assert_eq(df, actual, check_index=False)
def test_to_json_results():
with tmpfile("json") as f:
paths = ddf.to_json(f)
assert paths == [os.path.join(f, f"{n}.part") for n in range(ddf.npartitions)]
with tmpfile("json") as f:
list_of_delayed = ddf.to_json(f, compute=False)
paths = dask.compute(*list_of_delayed)
# this is a tuple rather than a list since it's the output of dask.compute
assert paths == tuple(
os.path.join(f, f"{n}.part") for n in range(ddf.npartitions)
)