Repository URL to install this package:
|
Version:
3.16.7 ▾
|
dvc-data
/
cli.py
|
|---|
import enum
import errno
import json
import math
import os
import posixpath
import random
import stat
import sys
from collections import deque
from itertools import accumulate
from pathlib import Path
from posixpath import relpath
from typing import Optional
import click
import typer
from attrs import asdict
from dvc_objects.errors import ObjectFormatError
from dvc_objects.fs import LocalFileSystem, MemoryFileSystem
from rich.traceback import install
from tqdm import tqdm
from dvc_data.callbacks import TqdmCallback
from dvc_data.hashfile import load
from dvc_data.hashfile.build import build as _build
from dvc_data.hashfile.checkout import checkout as _checkout
from dvc_data.hashfile.db import HashFileDB
from dvc_data.hashfile.diff import ROOT
from dvc_data.hashfile.diff import diff as _diff
from dvc_data.hashfile.hash import algorithms_available
from dvc_data.hashfile.hash import file_md5 as _file_md5
from dvc_data.hashfile.hash import fobj_md5 as _fobj_md5
from dvc_data.hashfile.hash_info import HashInfo
from dvc_data.hashfile.obj import HashFile
from dvc_data.hashfile.state import State
from dvc_data.hashfile.transfer import transfer as _transfer
from dvc_data.hashfile.tree import Tree, merge
from dvc_data.hashfile.tree import du as _du
from dvc_data.repo import NotARepoError, Repo
install(show_locals=True, suppress=[typer, click])
file_type = typer.Argument(
...,
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
resolve_path=True,
allow_dash=True,
path_type=str,
)
dir_file_type = typer.Argument(
...,
exists=True,
file_okay=True,
dir_okay=True,
readable=True,
resolve_path=True,
allow_dash=True,
path_type=str,
)
dir_file_type_no_dash_no_resolve = typer.Argument(
...,
exists=True,
file_okay=True,
dir_okay=True,
readable=True,
path_type=str,
)
HashEnum = enum.Enum( # type: ignore[misc]
"HashEnum", {h: h for h in sorted(algorithms_available)}
)
LinkEnum = enum.Enum( # type: ignore[misc]
"LinkEnum", {lt: lt for lt in ["reflink", "hardlink", "symlink", "copy"]}
)
SIZE_HELP = "Human readable size, eg: '1kb', '100Mb', '10GB' etc"
# https://github.com/aws/aws-cli/blob/5aa599949f60b6af554fd5714d7161aa272716f7/awscli/customizations/s3/utils.py
MULTIPLIERS = {
"kb": 1024,
"mb": 1024**2,
"gb": 1024**3,
"tb": 1024**4,
"kib": 1024,
"mib": 1024**2,
"gib": 1024**3,
"tib": 1024**4,
}
def human_readable_to_bytes(value: str) -> int:
value = value.lower()
suffix = ""
if value.endswith(tuple(MULTIPLIERS.keys())):
size = 2
size += value[-2] == "i" # KiB, MiB etc
value, suffix = value[:-size], value[-size:]
multiplier = MULTIPLIERS.get(suffix, 1)
return int(value) * multiplier
class Application(typer.Typer):
def __init__(self, *args, **kwargs):
kwargs.setdefault("no_args_is_help", True)
super().__init__(*args, **kwargs)
def command(self, *args, **kwargs):
kwargs.setdefault("no_args_is_help", True)
return super().command(*args, **kwargs)
app = Application(
name="dvc-data",
help="dvc-data testingtool",
context_settings={"help_option_names": ["-h", "--help"]},
add_completion=False,
)
@app.command(name="hash", help="Compute checksum of the file")
def hash_file(
file: Path = file_type,
name: HashEnum = typer.Option("md5", "-n", "--name"),
):
path = relpath(file)
hash_name = name.value
if path == "-":
hash_value = _fobj_md5(sys.stdin.buffer, name=hash_name)
else:
hash_value = _file_md5(path, name=hash_name)
print(hash_name, hash_value, sep=": ")
@app.command(help="Generate sparse file")
def gensparse(
file: Path = typer.Argument(...),
size: str = typer.Argument(..., help=SIZE_HELP),
):
with file.open("wb") as f:
f.seek(human_readable_to_bytes(size) - 1)
f.write(b"\0")
@app.command(help="Generate file with random contents")
def genrand(
file: Path = typer.Argument(...),
size: str = typer.Argument(..., help=SIZE_HELP),
):
with file.open("wb") as f:
f.write(os.urandom(human_readable_to_bytes(size)))
def rand_gauss_int(num: int, sigma=0.2) -> int:
return round(random.gauss(num, sigma * num))
def _gentree(root: Path, num_dirs, num_files, fsize, depth=1):
for num in range(rand_gauss_int(num_files)):
nzeros = int(math.log10(num_files)) + 1
genrand(root / f"{num:0{nzeros}}.txt", size=str(rand_gauss_int(fsize)))
if depth < 1:
return
for num in range(rand_gauss_int(num_dirs)):
nzeros = int(math.log10(num_dirs)) + 1
(dir_ := root / f"dir{num:0{nzeros}}").mkdir()
_gentree(dir_, num_dirs, num_files, fsize, depth=depth - 1)
@app.command()
def gentree(
path: Path,
num: int,
size: str = typer.Argument(0, help=SIZE_HELP),
depth: int = 2,
seed: int = 0,
):
"""Generate a random tree structure.
Example:
gentree dataset 10000
gentree dataset 10000 1Gb
gentree dataset 10000 --depth 5
"""
assert depth >= 1
files_per_dir = max(num // 1000, 1)
total_dirs = num / files_per_dir
dirs_per_dir = round(math.pow(total_dirs, 1 / depth))
file_size = round(human_readable_to_bytes(size) / num)
path.mkdir(parents=True)
print(f"{total_dirs=}, {dirs_per_dir=}, {files_per_dir=}, {file_size=}, {depth=}")
random.seed(seed)
_gentree(path, dirs_per_dir, files_per_dir, file_size, depth=depth)
def from_shortoid(odb: HashFileDB, oid: str) -> str:
oid = oid if oid != "-" else sys.stdin.read().strip()
try:
return odb.exists_prefix(oid)
except KeyError as exc:
typer.echo(f"Not a valid {oid=}", err=True)
raise typer.Exit(1) from exc
except ValueError as exc:
typer.echo(f"Ambiguous {oid=}", err=True)
raise typer.Exit(1) from exc
def get_odb(**config):
try:
repo = Repo.discover()
except NotARepoError as exc:
typer.echo(exc, err=True)
raise typer.Abort(1) # noqa: B904
if "state" not in config:
config.setdefault("state", State(root_dir=repo.root, tmp_dir=repo.tmp_dir))
return HashFileDB(repo.fs, repo.object_dir, **config)
@app.command(help="Oid to path")
def o2p(oid: str = typer.Argument(..., allow_dash=True)):
odb = get_odb()
path = odb.oid_to_path(from_shortoid(odb, oid))
print(path)
@app.command(help="Path to Oid")
def p2o(path: Path = typer.Argument(..., allow_dash=True)):
odb = get_odb()
fs_path = relpath(path)
if fs_path == "-":
fs_path = sys.stdin.read().strip()
oid = odb.path_to_oid(fs_path)
print(oid)
def _cat_object(odb, oid):
path = odb.oid_to_path(oid)
contents = odb.fs.cat_file(path)
return typer.echo(contents)
@app.command(help="Provide content of the objects")
def cat(
oid: str = typer.Argument(..., allow_dash=True),
check: bool = typer.Option(False, "--check", "-c"),
):
odb = get_odb()
oid = from_shortoid(odb, oid)
if check:
try:
return odb.check(oid, check_hash=True)
except ObjectFormatError as exc:
typer.echo(exc, err=True)
raise typer.Exit(1) from exc
return _cat_object(odb, oid)
@app.command(help="Build and optionally write object to the database")
def build(
path: Path = dir_file_type,
write: bool = typer.Option(False, "--write", "-w"),
shallow: bool = False,
):
odb = get_odb()
fs_path = os.fspath(path)
fs = odb.fs
if fs_path == "-":
fs = MemoryFileSystem()
fs.put_file(sys.stdin.buffer, fs_path)
object_store, _, obj = _build(odb, fs_path, fs, name="md5")
if write:
_transfer(
object_store,
odb,
{obj.hash_info},
hardlink=True,
shallow=shallow,
)
print(obj)
return obj
def _ls_tree(tree):
for key, _, hash_info in tree:
print(hash_info.value, posixpath.sep.join(key), sep="\t")
@app.command("ls", help="List objects in a tree")
@app.command("ls-tree", help="List objects in a tree")
def ls(oid: str = typer.Argument(..., allow_dash=True)):
odb = get_odb()
oid = from_shortoid(odb, oid)
try:
tree = Tree.load(odb, HashInfo("md5", oid))
except ObjectFormatError as exc:
typer.echo(exc, err=True)
raise typer.Exit(1) from exc
return _ls_tree(tree)
@app.command(help="Show various types of objects")
def show(oid: str = typer.Argument(..., allow_dash=True)):
odb = get_odb()
oid = from_shortoid(odb, oid)
obj = load(odb, odb.get(oid).hash_info)
if isinstance(obj, Tree):
return _ls_tree(obj)
if isinstance(obj, HashFile):
return _cat_object(odb, obj.oid)
raise AssertionError(f"unknown object of type {type(obj)}")
@app.command(help="Summarize disk usage by an object")
def du(oid: str = typer.Argument(..., allow_dash=True)):
odb = get_odb()
oid = from_shortoid(odb, oid)
obj = load(odb, odb.get(oid).hash_info)
if not isinstance(obj, HashFile):
raise AssertionError(f"unknown object of type {type(obj)}")
if isinstance(obj, Tree):
tree = obj
else:
tree = Tree()
tree.add(ROOT, None, obj.hash_info)
total = _du(odb, tree)
print(tqdm.format_sizeof(total, suffix="B", divisor=1024))
@app.command(help="Remove object from the ODB")
def rm(oid: str = typer.Argument(..., allow_dash=True)):
odb = get_odb()
oid = from_shortoid(odb, oid)
odb.delete(oid)
@app.command(help="Count objects and their disk consumption", no_args_is_help=False)
def count_objects():
odb = get_odb()
it = (odb.fs.size(odb.oid_to_path(oid)) for oid in odb.all())
item = deque(enumerate(accumulate(it), 1), maxlen=1)
count, total = item[0] if item else (0, 0)
hsize = tqdm.format_sizeof(total, suffix="B", divisor=1024)
print(f"{count} objects, {hsize} size")
@app.command(help="Verify objects in the database", no_args_is_help=False)
def fsck():
odb = get_odb()
ret = 0
for oid in odb.all():
try:
odb.check(oid, check_hash=True)
except ObjectFormatError as exc:
ret = 1
print(exc)
raise typer.Exit(ret)
@app.command(help="Diff two objects in the database")
def diff(
short_oid1,
short_oid2: str,
unchanged: bool = False,
check_cache: bool = False,
check_hash: bool = False,
):
odb = get_odb()
obj1 = odb.get(from_shortoid(odb, short_oid1))
obj2 = odb.get(from_shortoid(odb, short_oid2))
odb_check = odb.check
def check(oid: str, check_hash: bool = check_hash):
return not check_cache or odb_check(oid, check_hash=check_hash)
odb.check = check
d = _diff(load(odb, obj1.hash_info), load(odb, obj2.hash_info), odb)
def _prepare_info(entry):
path = posixpath.join(*entry.key) or "ROOT"
oid = entry.oid.value
if not oid.endswith(".dir"):
oid = entry.oid.value[:9]
cache_info = "" if entry.in_cache else ", missing"
return f"{path} ({oid}{cache_info})"
for state, changes in asdict(d, recurse=False).items():
for change in changes:
if not unchanged and state == "unchanged" and change.new.in_cache:
continue
if state == "modified":
info1 = _prepare_info(change.old)
info2 = _prepare_info(change.new)
info = f"{info1} -> {info2}"
elif state == "added":
info = _prepare_info(change.new)
else:
# for unchanged, it does not matter which entry we use
# for deleted, we should be using old entry
info = _prepare_info(change.old)
print(state, info, sep=": ")
@app.command(help="Merge two trees and optionally write to the database.")
def merge_tree(oid1: str, oid2: str, force: bool = False):
odb = get_odb()
oid1 = from_shortoid(odb, oid1)
oid2 = from_shortoid(odb, oid2)
obj1 = load(odb, odb.get(oid1).hash_info)
obj2 = load(odb, odb.get(oid2).hash_info)
assert isinstance(obj1, Tree)
assert isinstance(obj2, Tree), "not a tree obj"
if not force:
# detect conflicts
d = _diff(obj1, obj2, odb)
modified = [
posixpath.join(*change.old.key)
for change in d.modified
if change.old.key != ROOT
]
if modified:
print("Following files in conflicts:", *modified, sep="\n")
raise typer.Exit(1)
tree = merge(odb, None, obj1.hash_info, obj2.hash_info)
tree.digest()
print(tree)
odb.add(tree.path, tree.fs, tree.oid, hardlink=True)
def process_patch(patch_file, **kwargs):
patch = []
if patch_file:
with typer.open_file(patch_file) as f:
text = f.read()
patch = json.loads(text)
for appl in patch:
op = appl.get("op")
path = appl.get("path")
if op and path and op in ("add", "modify"):
appl["path"] = os.fspath(patch_file.parent.joinpath(path))
for op, items in kwargs.items():
for item in items:
if isinstance(item, tuple):
path, to = item
extra = {"path": os.fspath(path), "to": to}
else:
extra = {"path": item}
patch.append({"op": op, **extra})
return patch
def apply_op(odb, obj, application):
assert "op" in application
op = application["op"]
path = application["path"]
keys = tuple(path.split("/"))
if op in ("add", "modify"):
new = tuple(application["to"].split("/"))
if op == "add" and new in obj._dict:
raise FileExistsError(errno.EEXIST, os.strerror(errno.EEXIST), path)
fs = LocalFileSystem()
_, meta, new_obj = _build(odb, path, fs, "md5")
odb.add(path, fs, new_obj.hash_info.value, hardlink=False)
return obj.add(new, meta, new_obj.hash_info)
if keys not in obj._dict:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path)
if op == "test":
return
if op == "remove":
obj._dict.pop(keys)
obj.__dict__.pop("_trie", None)
return
if op in ("copy", "move"):
new = tuple(application["to"].split("/"))
obj.add(new, *obj.get(keys))
if op == "move":
obj._dict.pop(keys)
return
raise ValueError(f"unknown {op=}")
def multi_value(*opts, **kwargs):
return click.option(*opts, multiple=True, required=False, **kwargs)
cl_path = click.Path(
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
path_type=Path,
resolve_path=True,
)
cl_path_dash = click.Path(
exists=True,
file_okay=True,
dir_okay=False,
readable=True,
allow_dash=True,
path_type=Path,
resolve_path=True,
)
@click.command()
@click.argument("oid")
@click.option("--patch-file", type=cl_path_dash)
@multi_value(
"--add",
type=(cl_path, str),
help="Add file from specified local path to a given path in the tree",
)
@multi_value(
"--modify",
type=(cl_path, str),
help="Modify file with specified local path to a given path in the tree",
)
@multi_value("--move", type=(str, str), help="Move a file in the tree")
@multi_value("--copy", type=(str, str), help="Copy path from a tree to another path")
@multi_value("--remove", type=str, help="Remove path from a tree")
@multi_value("--test", type=str, help="Check for the existence of the path")
def update_tree(oid, patch_file, add, modify, move, copy, remove, test):
"""Update tree contents virtually with a patch file in json format.
Example patch file for reference:
[\n
{"op": "remove", "path": "test/0/00004.png"},\n
{"op": "move", "path": "test/1/00003.png", "to": "test/0/00003.png"},\n
{"op": "copy", "path": "test/1/00003.png", "to": "test/1/11113.png"},\n
{"op": "test", "path": "test/1/00003.png"},\n
{"op": "add", "path": "local/path/to/patch.json", "to": "foo"},\n
{"op": "modify", "path": "local/path/to/patch.json", "to": "bar"}\n
]\n
Example: ./cli.py update-tree f23d4 patch.json
"""
odb = get_odb()
oid = from_shortoid(odb, oid)
obj = load(odb, odb.get(oid).hash_info)
assert isinstance(obj, Tree)
patch = process_patch(
patch_file,
add=add,
remove=remove,
modify=modify,
copy=copy,
move=move,
test=test,
)
for application in patch:
try:
apply_op(odb, obj, application)
except (FileExistsError, FileNotFoundError) as exc:
typer.echo(exc, err=True)
raise typer.Exit(1) from exc
obj.digest()
print(obj)
odb.add(obj.path, obj.fs, obj.oid, hardlink=True)
@app.command(help="Check object link")
def check_link( # noqa: C901
path: Path = dir_file_type_no_dash_no_resolve, object_dir: Optional[Path] = None
):
if object_dir is None:
object_dir = Repo.discover().object_dir
fs = LocalFileSystem()
def get_object_inodes(path) -> dict[int, str]:
ret = {}
for entry in os.scandir(path):
if entry.is_dir(follow_symlinks=False) and len(entry.name) == 2:
for e in os.scandir(entry.path):
if e.is_file(follow_symlinks=False):
ret[e.inode()] = e.path
return ret
object_inodes = None
files = [os.fspath(path)] if path.is_file() else fs.find(os.fspath(path))
for file in files:
try:
info = fs.info(file)
except OSError as e:
if e.errno != errno.ENOENT or not os.path.islink(file):
raise
infos = ["broken", os.readlink(file)]
mode = 0
else:
infos = ["unknown"]
inode = info["ino"]
if info["islink"]:
infos = ["symlink", info["destination"]]
elif info["nlink"] > 1:
if object_inodes is None:
object_inodes = get_object_inodes(object_dir)
if object_path := object_inodes.get(inode):
infos = ["hardlink", object_path]
mode = info["mode"]
print(stat.filemode(mode), file, *infos)
@app.command(help="Checkout from the object into a given path")
def checkout(
oid: str,
path: Path = typer.Argument(..., resolve_path=True),
relink: bool = False,
force: bool = False,
type: list[LinkEnum] = typer.Option(["copy"]), # noqa: A002
):
odb = get_odb(type=[t.value for t in type])
oid = from_shortoid(odb, oid)
obj = load(odb, odb.get(oid).hash_info)
with TqdmCallback(size=len(obj), desc="Checking out", unit="obj") as callback:
_checkout(
os.fspath(path),
LocalFileSystem(),
obj,
odb,
relink=relink,
force=force,
prompt=typer.confirm,
state=odb.state,
progress_callback=callback,
)
cmd = typer.main.get_command(app)
wrapper = click.version_option()
main = wrapper(cmd)
main.add_command(update_tree, "update-tree") # type: ignore[attr-defined]
if __name__ == "__main__":
main()