Repository URL to install this package:
|
Version:
2022.10.0 ▾
|
import gzip
import os
import pathlib
import sys
from functools import partial
from time import sleep
import cloudpickle
import pytest
from fsspec.compression import compr
from fsspec.core import open_files
from fsspec.implementations.local import LocalFileSystem
from tlz import concat, valmap
from dask import compute
from dask.bytes.core import read_bytes
from dask.bytes.utils import compress
from dask.utils import filetexts
compute = partial(compute, scheduler="sync")
files = {
".test.accounts.1.json": (
b'{"amount": 100, "name": "Alice"}\n'
b'{"amount": 200, "name": "Bob"}\n'
b'{"amount": 300, "name": "Charlie"}\n'
b'{"amount": 400, "name": "Dennis"}\n'
),
".test.accounts.2.json": (
b'{"amount": 500, "name": "Alice"}\n'
b'{"amount": 600, "name": "Bob"}\n'
b'{"amount": 700, "name": "Charlie"}\n'
b'{"amount": 800, "name": "Dennis"}\n'
),
}
csv_files = {
".test.fakedata.1.csv": (b"a,b\n" b"1,2\n"),
".test.fakedata.2.csv": (b"a,b\n" b"3,4\n"),
"subdir/.test.fakedata.2.csv": (b"a,b\n" b"5,6\n"),
}
def to_uri(path):
return pathlib.Path(os.path.abspath(path)).as_uri()
def test_unordered_urlpath_errors():
# Unordered urlpath argument
with pytest.raises(TypeError):
read_bytes(
{
"sets/are.csv",
"unordered/so/they.csv",
"should/not/be.csv",
"allowed.csv",
}
)
def test_read_bytes():
with filetexts(files, mode="b"):
sample, values = read_bytes(".test.accounts.*")
assert isinstance(sample, bytes)
assert sample[:5] == files[sorted(files)[0]][:5]
assert sample.endswith(b"\n")
assert isinstance(values, (list, tuple))
assert isinstance(values[0], (list, tuple))
assert hasattr(values[0][0], "dask")
assert sum(map(len, values)) >= len(files)
results = compute(*concat(values))
assert set(results) == set(files.values())
def test_read_bytes_sample_delimiter():
with filetexts(files, mode="b"):
sample, values = read_bytes(".test.accounts.*", sample=80, delimiter=b"\n")
assert sample.endswith(b"\n")
sample, values = read_bytes(".test.accounts.1.json", sample=80, delimiter=b"\n")
assert sample.endswith(b"\n")
sample, values = read_bytes(".test.accounts.1.json", sample=2, delimiter=b"\n")
assert sample.endswith(b"\n")
def test_parse_sample_bytes():
with filetexts(files, mode="b"):
sample, values = read_bytes(".test.accounts.*", sample="40 B")
assert len(sample) == 40
def test_read_bytes_no_sample():
with filetexts(files, mode="b"):
sample, _ = read_bytes(".test.accounts.1.json", sample=False)
assert sample is False
def test_read_bytes_blocksize_none():
with filetexts(files, mode="b"):
sample, values = read_bytes(".test.accounts.*", blocksize=None)
assert sum(map(len, values)) == len(files)
@pytest.mark.parametrize("blocksize", [5.0, "5 B"])
def test_read_bytes_blocksize_types(blocksize):
with filetexts(files, mode="b"):
sample, vals = read_bytes(".test.account*", blocksize=blocksize)
results = compute(*concat(vals))
ourlines = b"".join(results).split(b"\n")
testlines = b"".join(files.values()).split(b"\n")
assert set(ourlines) == set(testlines)
def test_read_bytes_blocksize_float_errs():
with filetexts(files, mode="b"):
with pytest.raises(TypeError):
read_bytes(".test.account*", blocksize=5.5)
def test_read_bytes_include_path():
with filetexts(files, mode="b"):
_, _, paths = read_bytes(".test.accounts.*", include_path=True)
assert {os.path.split(path)[1] for path in paths} == files.keys()
def test_with_urls():
with filetexts(files, mode="b"):
# OS-independent file:// URI with glob *
url = to_uri(".test.accounts.") + "*"
sample, values = read_bytes(url, blocksize=None)
assert sum(map(len, values)) == len(files)
@pytest.mark.skipif(sys.platform == "win32", reason="pathlib and moto clash on windows")
def test_with_paths():
with filetexts(files, mode="b"):
url = pathlib.Path("./.test.accounts.*")
sample, values = read_bytes(url, blocksize=None)
assert sum(map(len, values)) == len(files)
with pytest.raises(OSError):
# relative path doesn't work
url = pathlib.Path("file://.test.accounts.*")
read_bytes(url, blocksize=None)
def test_read_bytes_block():
with filetexts(files, mode="b"):
for bs in [5, 15, 45, 1500]:
sample, vals = read_bytes(".test.account*", blocksize=bs)
assert list(map(len, vals)) == [
max((len(v) // bs), 1) for v in files.values()
]
results = compute(*concat(vals))
assert sum(len(r) for r in results) == sum(len(v) for v in files.values())
ourlines = b"".join(results).split(b"\n")
testlines = b"".join(files.values()).split(b"\n")
assert set(ourlines) == set(testlines)
def test_read_bytes_delimited():
with filetexts(files, mode="b"):
for bs in [5, 15, 45, "1.5 kB"]:
_, values = read_bytes(".test.accounts*", blocksize=bs, delimiter=b"\n")
_, values2 = read_bytes(".test.accounts*", blocksize=bs, delimiter=b"foo")
assert [a.key for a in concat(values)] != [b.key for b in concat(values2)]
results = compute(*concat(values))
res = [r for r in results if r]
assert all(r.endswith(b"\n") for r in res)
ourlines = b"".join(res).split(b"\n")
testlines = b"".join(files[k] for k in sorted(files)).split(b"\n")
assert ourlines == testlines
# delimiter not at the end
d = b"}"
_, values = read_bytes(".test.accounts*", blocksize=bs, delimiter=d)
results = compute(*concat(values))
res = [r for r in results if r]
# All should end in } except EOF
assert sum(r.endswith(b"}") for r in res) == len(res) - 2
ours = b"".join(res)
test = b"".join(files[v] for v in sorted(files))
assert ours == test
fmt_bs = [(fmt, None) for fmt in compr] + [(fmt, 10) for fmt in compr] # type: ignore
@pytest.mark.parametrize("fmt,blocksize", fmt_bs)
def test_compression(fmt, blocksize):
if fmt not in compress:
pytest.skip("compression function not provided")
files2 = valmap(compress[fmt], files)
with filetexts(files2, mode="b"):
if fmt and blocksize:
with pytest.raises(ValueError):
read_bytes(
".test.accounts.*.json",
blocksize=blocksize,
delimiter=b"\n",
compression=fmt,
)
return
sample, values = read_bytes(
".test.accounts.*.json",
blocksize=blocksize,
delimiter=b"\n",
compression=fmt,
)
assert sample[:5] == files[sorted(files)[0]][:5]
assert sample.endswith(b"\n")
results = compute(*concat(values))
assert b"".join(results) == b"".join([files[k] for k in sorted(files)])
def test_open_files():
with filetexts(files, mode="b"):
myfiles = open_files(".test.accounts.*")
assert len(myfiles) == len(files)
for lazy_file, data_file in zip(myfiles, sorted(files)):
with lazy_file as f:
x = f.read()
assert x == files[data_file]
@pytest.mark.parametrize("encoding", ["utf-8", "ascii"])
def test_open_files_text_mode(encoding):
with filetexts(files, mode="b"):
myfiles = open_files(".test.accounts.*", mode="rt", encoding=encoding)
assert len(myfiles) == len(files)
data = []
for file in myfiles:
with file as f:
data.append(f.read())
assert list(data) == [files[k].decode(encoding) for k in sorted(files)]
@pytest.mark.parametrize("mode", ["rt", "rb"])
@pytest.mark.parametrize("fmt", list(compr))
def test_open_files_compression(mode, fmt):
if fmt not in compress:
pytest.skip("compression function not provided")
files2 = valmap(compress[fmt], files)
with filetexts(files2, mode="b"):
myfiles = open_files(".test.accounts.*", mode=mode, compression=fmt)
data = []
for file in myfiles:
with file as f:
data.append(f.read())
sol = [files[k] for k in sorted(files)]
if mode == "rt":
sol = [b.decode() for b in sol]
assert list(data) == sol
def test_bad_compression():
with filetexts(files, mode="b"):
for func in [read_bytes, open_files]:
with pytest.raises(ValueError):
sample, values = func(".test.accounts.*", compression="not-found")
def test_not_found():
fn = "not-a-file"
with pytest.raises((FileNotFoundError, OSError), match=fn):
read_bytes(fn)
@pytest.mark.slow
def test_names():
with filetexts(files, mode="b"):
_, a = read_bytes(".test.accounts.*")
_, b = read_bytes(".test.accounts.*")
a = list(concat(a))
b = list(concat(b))
assert [aa._key for aa in a] == [bb._key for bb in b]
sleep(1)
for fn in files:
with open(fn, "ab") as f:
f.write(b"x")
_, c = read_bytes(".test.accounts.*")
c = list(concat(c))
assert [aa._key for aa in a] != [cc._key for cc in c]
@pytest.mark.parametrize("compression_opener", [(None, open), ("gzip", gzip.open)])
def test_open_files_write(tmpdir, compression_opener):
compression, opener = compression_opener
tmpdir = str(tmpdir)
files = open_files(tmpdir, num=2, mode="wb", compression=compression)
assert len(files) == 2
assert {f.mode for f in files} == {"wb"}
for fil in files:
with fil as f:
f.write(b"000")
files = sorted(os.listdir(tmpdir))
assert files == ["0.part", "1.part"]
with opener(os.path.join(tmpdir, files[0]), "rb") as f:
d = f.read()
assert d == b"000"
def test_pickability_of_lazy_files(tmpdir):
tmpdir = str(tmpdir)
with filetexts(files, mode="b"):
myfiles = open_files(".test.accounts.*")
myfiles2 = cloudpickle.loads(cloudpickle.dumps(myfiles))
for f, f2 in zip(myfiles, myfiles2):
assert f.path == f2.path
assert type(f.fs) == type(f2.fs)
with f as f_open, f2 as f2_open:
assert f_open.read() == f2_open.read()
def test_py2_local_bytes(tmpdir):
fn = str(tmpdir / "myfile.txt.gz")
with gzip.open(fn, mode="wb") as f:
f.write(b"hello\nworld")
files = open_files(fn, compression="gzip", mode="rt")
with files[0] as f:
assert all(isinstance(line, str) for line in f)
def test_abs_paths(tmpdir):
tmpdir = str(tmpdir)
here = os.getcwd()
os.chdir(tmpdir)
with open("tmp", "w") as f:
f.write("hi")
out = LocalFileSystem().glob("*")
assert len(out) == 1
assert "/" in out[0]
assert "tmp" in out[0]
fs = LocalFileSystem()
os.chdir(here)
with fs.open(out[0], "r") as f:
res = f.read()
assert res == "hi"