Repository URL to install this package:
|
Version:
1.26.0.dev0+gite506aa5f ▾
|
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
import binascii
import io
import logging
import os
import subprocess
import traceback
from contextlib import contextmanager
from pants.scm.scm import Scm
from pants.util.contextutil import pushd
from pants.util.memo import memoized_method
from pants.util.strutil import ensure_text
# 40 is Linux's hard-coded limit for total symlinks followed when resolving a path.
MAX_SYMLINKS_IN_REALPATH = 40
GIT_HASH_LENGTH = 20
NUL = b"\0"
SPACE = b" "
NEWLINE = b"\n"
EMPTY_STRING = b""
logger = logging.getLogger(__name__)
class Git(Scm):
"""An Scm implementation backed by git."""
@classmethod
def detect_worktree(cls, binary="git", subdir=None):
"""Detect the git working tree above cwd and return it; else, return None.
:param string binary: The path to the git binary to use, 'git' by default.
:param string subdir: The path to start searching for a git repo.
:returns: path to the directory where the git working tree is rooted.
:rtype: string
"""
# TODO(John Sirois): This is only used as a factory for a Git instance in
# pants.base.build_environment.get_scm, encapsulate in a true factory method.
cmd = [binary, "rev-parse", "--show-toplevel"]
try:
if subdir:
with pushd(subdir):
process, out = cls._invoke(cmd, stderr=subprocess.DEVNULL)
else:
process, out = cls._invoke(cmd, stderr=subprocess.DEVNULL)
cls._check_result(cmd, process.returncode, raise_type=Scm.ScmException)
except Scm.ScmException:
return None
return cls._cleanse(out)
@classmethod
def clone(cls, repo_url, dest, binary="git"):
"""Clone the repo at repo_url into dest.
:param string binary: The path to the git binary to use, 'git' by default.
:returns: an instance of this class representing the cloned repo.
:rtype: Git
"""
cmd = [binary, "clone", repo_url, dest]
process, out = cls._invoke(cmd)
cls._check_result(cmd, process.returncode)
return cls(binary=binary, worktree=dest)
@classmethod
def _invoke(cls, cmd, stderr=None):
"""Invoke the given command, and return a tuple of process and raw binary output.
If stderr is defined as None, it will flow to wherever it is currently mapped
for the parent process, generally to the terminal where the user can see the error
(cf. https://docs.python.org/3.7/library/subprocess.html#subprocess.Popen ). In
some cases we want to treat it specially, which is why it is exposed
in the signature of _invoke.
:param list cmd: The command in the form of a list of strings
:returns: The completed process object and its standard output.
:raises: Scm.LocalException if there was a problem exec'ing the command at all.
"""
try:
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr)
except OSError as e:
# Binary DNE or is not executable
cmd_str = " ".join(cmd)
raise cls.LocalException(f"Failed to execute command {cmd_str}: {e!r}")
out, _ = process.communicate()
return process, out
@classmethod
def _cleanse(cls, output, errors="strict"):
return output.strip().decode("utf-8", errors=errors)
@classmethod
def _check_result(cls, cmd, result, failure_msg=None, raise_type=Scm.ScmException):
if result != 0:
cmd_str = " ".join(cmd)
raise raise_type(failure_msg or f"{cmd_str} failed with exit code {result}")
def __init__(self, binary="git", gitdir=None, worktree=None, remote=None, branch=None):
"""Creates a git scm proxy that assumes the git repository is in the cwd by default.
binary: The path to the git binary to use, 'git' by default.
gitdir: The path to the repository's git metadata directory (typically '.git').
worktree: The path to the git repository working tree directory (typically '.').
remote: The default remote to use.
branch: The default remote branch to use.
"""
super().__init__()
self._gitcmd = binary
self._worktree = os.path.realpath(worktree or os.getcwd())
self._gitdir = os.path.realpath(gitdir) if gitdir else os.path.join(self._worktree, ".git")
self._remote = remote
self._branch = branch
@property
def current_rev_identifier(self):
return "HEAD"
@property
def worktree(self):
return self._worktree
@property
def commit_id(self):
return self._check_output(["rev-parse", "HEAD"], raise_type=Scm.LocalException)
@property
def server_url(self):
git_output = self._check_output(["remote", "--verbose"], raise_type=Scm.LocalException)
def origin_urls():
for line in git_output.splitlines():
name, url, action = line.split()
if name == "origin" and action == "(push)":
yield url
origins = list(origin_urls())
if len(origins) != 1:
raise Scm.LocalException(
f"Unable to find remote named 'origin' that accepts pushes "
"amongst:\n{git_output}"
)
return origins[0]
@property
def tag_name(self):
# Calls to git describe can have bad performance on large repos. Be aware
# of the performance hit if you use this property.
tag = self._check_output(["describe", "--tags", "--always"], raise_type=Scm.LocalException)
return None if "cannot" in tag else tag
@property
def branch_name(self):
branch = self._check_output(
["rev-parse", "--abbrev-ref", "HEAD"], raise_type=Scm.LocalException
)
return None if branch == "HEAD" else branch
def fix_git_relative_path(self, worktree_path, relative_to):
return os.path.relpath(os.path.join(self._worktree, worktree_path), relative_to)
def changed_files(self, from_commit=None, include_untracked=False, relative_to=None):
relative_to = relative_to or self._worktree
rel_suffix = ["--", relative_to]
uncommitted_changes = self._check_output(
["diff", "--name-only", "HEAD"] + rel_suffix, raise_type=Scm.LocalException
)
files = set(uncommitted_changes.splitlines())
if from_commit:
# Grab the diff from the merge-base to HEAD using ... syntax. This ensures we have just
# the changes that have occurred on the current branch.
committed_cmd = ["diff", "--name-only", from_commit + "...HEAD"] + rel_suffix
committed_changes = self._check_output(committed_cmd, raise_type=Scm.LocalException)
files.update(committed_changes.split())
if include_untracked:
untracked_cmd = [
"ls-files",
"--other",
"--exclude-standard",
"--full-name",
] + rel_suffix
untracked = self._check_output(untracked_cmd, raise_type=Scm.LocalException)
files.update(untracked.split())
# git will report changed files relative to the worktree: re-relativize to relative_to
return {self.fix_git_relative_path(f, relative_to) for f in files}
def changes_in(self, diffspec, relative_to=None):
relative_to = relative_to or self._worktree
cmd = ["diff-tree", "--no-commit-id", "--name-only", "-r", diffspec]
files = self._check_output(cmd, raise_type=Scm.LocalException).split()
return {self.fix_git_relative_path(f.strip(), relative_to) for f in files}
def changelog(self, from_commit=None, files=None):
# We force the log output encoding to be UTF-8 here since the user may have a git config that
# overrides the git UTF-8 default log output encoding.
args = [
"log",
"--encoding=UTF-8",
"--no-merges",
"--stat",
"--find-renames",
"--find-copies",
]
if from_commit:
args.append(from_commit + "..HEAD")
if files:
args.append("--")
args.extend(files)
# There are various circumstances that can lead to git logs that are not transcodeable to utf-8,
# for example: http://comments.gmane.org/gmane.comp.version-control.git/262685
# Git will not error in these cases and we do not wish to either. Here we direct byte sequences
# that can not be utf-8 decoded to be replaced with the utf-8 replacement character.
return self._check_output(args, raise_type=Scm.LocalException, errors="replace")
def merge_base(self, left="master", right="HEAD"):
"""Returns the merge-base of master and HEAD in bash: `git merge-base left right`"""
return self._check_output(["merge-base", left, right], raise_type=Scm.LocalException)
def refresh(self, leave_clean=False):
"""Attempt to pull-with-rebase from upstream.
This is implemented as fetch-plus-rebase so that we can distinguish between errors in the
fetch stage (likely network errors) and errors in the rebase stage (conflicts). If
leave_clean is true, then in the event of a rebase failure, the branch will be rolled back.
Otherwise, it will be left in the conflicted state.
"""
remote, merge = self._get_upstream()
self._check_call(["fetch", "--tags", remote, merge], raise_type=Scm.RemoteException)
try:
self._check_call(["rebase", "FETCH_HEAD"], raise_type=Scm.LocalException)
except Scm.LocalException as e:
if leave_clean:
logger.debug("Cleaning up after failed rebase")
try:
self._check_call(["rebase", "--abort"], raise_type=Scm.LocalException)
except Scm.LocalException as abort_exc:
logger.debug("Failed to up after failed rebase")
logger.debug(traceback.format_exc(abort_exc))
# But let the original exception propagate, since that's the more interesting one
raise e
def tag(self, name, message=None):
# We use -a here instead of --annotate to maintain maximum git compatibility.
# --annotate was only introduced in 1.7.8 via:
# https://github.com/git/git/commit/c97eff5a95d57a9561b7c7429e7fcc5d0e3a7f5d
self._check_call(
["tag", "-a", "--message=" + (message or ""), name], raise_type=Scm.LocalException
)
self.push("refs/tags/" + name)
def commit(self, message, verify=True):
cmd = ["commit", "--all", "--message=" + message]
if not verify:
cmd.append("--no-verify")
self._check_call(cmd, raise_type=Scm.LocalException)
def add(self, *paths):
self._check_call(["add"] + list(paths), raise_type=Scm.LocalException)
def commit_date(self, commit_reference):
return self._check_output(
["log", "-1", "--pretty=tformat:%ci", commit_reference], raise_type=Scm.LocalException
)
def push(self, *refs):
remote, merge = self._get_upstream()
self._check_call(["push", remote, merge] + list(refs), raise_type=Scm.RemoteException)
def set_state(self, rev):
self._check_call(["checkout", rev])
def _get_upstream(self):
"""Return the remote and remote merge branch for the current branch."""
if not self._remote or not self._branch:
branch = self.branch_name
if not branch:
raise Scm.LocalException("Failed to determine local branch")
def get_local_config(key):
value = self._check_output(
["config", "--local", "--get", key], raise_type=Scm.LocalException
)
return value.strip()
self._remote = self._remote or get_local_config(f"branch.{branch}.remote")
self._branch = self._branch or get_local_config(f"branch.{branch}.merge")
return self._remote, self._branch
def _check_call(self, args, failure_msg=None, raise_type=None):
cmd = self._create_git_cmdline(args)
self._log_call(cmd)
result = subprocess.call(cmd)
self._check_result(cmd, result, failure_msg, raise_type)
def _check_output(self, args, failure_msg=None, raise_type=None, errors="strict"):
cmd = self._create_git_cmdline(args)
self._log_call(cmd)
process, out = self._invoke(cmd)
self._check_result(cmd, process.returncode, failure_msg, raise_type)
return self._cleanse(out, errors=errors)
def _create_git_cmdline(self, args):
return [self._gitcmd, "--git-dir=" + self._gitdir, "--work-tree=" + self._worktree] + args
def _log_call(self, cmd):
logger.debug("Executing: " + " ".join(cmd))
def repo_reader(self, rev):
return GitRepositoryReader(self, rev)
class GitRepositoryReader:
"""Allows reading from files and directory information from an arbitrary git commit.
This is useful for pants-aware git sparse checkouts.
"""
def __init__(self, scm, rev):
self.scm = scm
self.rev = rev
self._cat_file_process = None
# Trees is a dict from path to [list of Dir, Symlink or File objects]
self._trees = {}
self._realpath_cache = {".": "./", "": "./"}
def _maybe_start_cat_file_process(self):
if not self._cat_file_process:
cmdline = self.scm._create_git_cmdline(["cat-file", "--batch"])
self._cat_file_process = subprocess.Popen(
cmdline, stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
class MissingFileException(Exception):
def __init__(self, rev, relpath):
self.relpath = relpath
self.rev = rev
def __str__(self):
return f"MissingFileException({self.relpath}, {self.rev})"
class IsDirException(Exception):
def __init__(self, rev, relpath):
self.relpath = relpath
self.rev = rev
def __str__(self):
return f"IsDirException({self.relpath}, {self.rev})"
class NotADirException(Exception):
def __init__(self, rev, relpath):
self.relpath = relpath
self.rev = rev
def __str__(self):
return f"NotADirException({self.relpath}, {self.rev})"
class SymlinkLoopException(Exception):
def __init__(self, rev, relpath):
self.relpath = relpath
self.rev = rev
def __str__(self):
return f"SymlinkLoop({self.relpath}, {self.rev})"
class ExternalSymlinkException(Exception):
def __init__(self, rev, relpath):
self.relpath = relpath
self.rev = rev
def __str__(self):
return f"ExternalSymlink({self.relpath}, {self.rev})"
class GitDiedException(Exception):
pass
class UnexpectedGitObjectTypeException(Exception):
# Programmer error
pass
def _safe_realpath(self, relpath):
try:
return self._realpath(relpath)
except self.MissingFileException:
return None
except self.NotADirException:
return None
def _safe_read_object(self, relpath, max_symlinks):
try:
return self._read_object(relpath, max_symlinks)
except self.MissingFileException:
return None, relpath
except self.NotADirException:
return None, relpath
def exists(self, relpath):
path = self._safe_realpath(relpath)
return bool(path)
def isfile(self, relpath):
path = self._safe_realpath(relpath)
if path:
return not path.endswith("/")
return False
def isdir(self, relpath):
path = self._safe_realpath(relpath)
if path:
return path.endswith("/")
return False
def lstat(self, relpath):
obj, _ = self._safe_read_object(relpath, max_symlinks=0)
return obj
def readlink(self, relpath):
# TODO: Relatively inefficient, but easier than changing read_object, unfortunately.
if type(self.lstat(relpath)) != self.Symlink:
return None
obj, path_so_far = self._safe_read_object(relpath, max_symlinks=1)
if obj == None:
return None
return path_so_far
class Symlink:
def __init__(self, name, sha):
self.name = name
self.sha = sha
class Dir:
def __init__(self, name, sha):
self.name = name
self.sha = sha
class File:
def __init__(self, name, sha):
self.name = name
self.sha = sha
def listdir(self, relpath):
"""Like os.listdir, but reads from the git repository.
:returns: a list of relative filenames
"""
path = self._realpath(relpath)
if not path.endswith("/"):
raise self.NotADirException(self.rev, relpath)
if path[0] == "/" or path.startswith("../"):
return os.listdir(path)
tree = self._read_tree(path[:-1])
return list(tree.keys())
@contextmanager
def open(self, relpath):
"""Read a file out of the repository at a certain revision.
This is complicated because, unlike vanilla git cat-file, this follows symlinks in the repo.
If a symlink points outside repo, the file is read from the filesystem; that's because
presumably whoever put that symlink there knew what they were doing.
"""
path = self._realpath(relpath)
if path.endswith("/"):
raise self.IsDirException(self.rev, relpath)
if path.startswith("../") or path[0] == "/":
yield open(path, "rb")
return
object_type, data = self._read_object_from_repo(rev=self.rev, relpath=path)
if object_type == b"tree":
raise self.IsDirException(self.rev, relpath)
assert object_type == b"blob"
yield io.BytesIO(data)
@memoized_method
def _realpath(self, relpath):
"""Follow symlinks to find the real path to a file or directory in the repo.
:returns: if the expanded path points to a file, the relative path
to that file; if a directory, the relative path + '/'; if
a symlink outside the repo, a path starting with / or ../.
"""
obj, path_so_far = self._read_object(relpath, MAX_SYMLINKS_IN_REALPATH)
if isinstance(obj, self.Symlink):
raise self.SymlinkLoopException(self.rev, relpath)
return path_so_far
def _read_object(self, relpath, max_symlinks):
path_so_far = ""
components = list(relpath.split(os.path.sep))
symlinks = 0
# Consume components to build path_so_far
while components:
component = components.pop(0)
if component == "" or component == ".":
continue
parent_tree = self._read_tree(path_so_far)
parent_path = path_so_far
if path_so_far != "":
path_so_far += "/"
path_so_far += component
try:
obj = parent_tree[component.encode()]
except KeyError:
raise self.MissingFileException(self.rev, relpath)
if isinstance(obj, self.File):
if components:
# We've encountered a file while searching for a directory
raise self.NotADirException(self.rev, relpath)
else:
return obj, path_so_far
elif isinstance(obj, self.Dir):
if not components:
return obj, path_so_far + "/"
# A dir is OK; we just descend from here
elif isinstance(obj, self.Symlink):
symlinks += 1
if symlinks > max_symlinks:
return obj, path_so_far
# A git symlink is stored as a blob containing the name of the target.
# Read that blob.
object_type, path_data = self._read_object_from_repo(sha=obj.sha)
assert object_type == b"blob"
if path_data[0] == b"/":
# Is absolute, thus likely points outside the repo.
raise self.ExternalSymlinkException(self.rev, relpath)
link_to = os.path.normpath(os.path.join(parent_path, path_data.decode()))
if link_to.startswith("../") or link_to[0] == "/":
# Points outside the repo.
raise self.ExternalSymlinkException(self.rev, relpath)
# Restart our search at the top with the new path.
# Git stores symlinks in terms of Unix paths, so split on '/' instead of os.path.sep
components = link_to.split("/") + components
path_so_far = ""
else:
# Programmer error
raise self.UnexpectedGitObjectTypeException()
return self.Dir("./", None), "./"
def _fixup_dot_relative(self, path):
"""Git doesn't understand dot-relative paths."""
if path.startswith("./"):
return path[2:]
elif path == ".":
return ""
return path
def _read_tree(self, path):
"""Given a revision and path, parse the tree data out of git cat-file output.
:returns: a dict from filename -> [list of Symlink, Dir, and File objects]
"""
path = self._fixup_dot_relative(path)
tree = self._trees.get(path)
if tree:
return tree
tree = {}
object_type, tree_data = self._read_object_from_repo(rev=self.rev, relpath=path)
assert object_type == b"tree"
# The tree data here is (mode ' ' filename \0 20-byte-sha)*
# It's transformed to a list of byte chars to allow iteration.
# See http://python-future.org/compatible_idioms.html#byte-string-literals.
tree_data = [bytes([b]) for b in tree_data]
i = 0
while i < len(tree_data):
start = i
while tree_data[i] != b" ":
i += 1
mode = b"".join(tree_data[start:i])
i += 1 # skip space
start = i
while tree_data[i] != NUL:
i += 1
name = b"".join(tree_data[start:i])
sha = b"".join(tree_data[i + 1 : i + 1 + GIT_HASH_LENGTH])
sha_hex = binascii.hexlify(sha)
i += 1 + GIT_HASH_LENGTH
if mode == b"120000":
tree[name] = self.Symlink(name, sha_hex)
elif mode == b"40000":
tree[name] = self.Dir(name, sha_hex)
else:
tree[name] = self.File(name, sha_hex)
self._trees[path] = tree
return tree
def _read_object_from_repo(self, rev=None, relpath=None, sha=None):
"""Read an object from the git repo.
This is implemented via a pipe to git cat-file --batch
"""
if sha:
spec = sha + b"\n"
else:
assert rev is not None
assert relpath is not None
rev = ensure_text(rev)
relpath = ensure_text(relpath)
relpath = self._fixup_dot_relative(relpath)
spec = f"{rev}:{relpath}\n".encode()
self._maybe_start_cat_file_process()
self._cat_file_process.stdin.write(spec)
self._cat_file_process.stdin.flush()
header = None
while not header:
header = self._cat_file_process.stdout.readline()
if self._cat_file_process.poll() is not None:
raise self.GitDiedException(f"Git cat-file died while trying to read '{spec}'.")
header = header.rstrip()
parts = header.rsplit(SPACE, 2)
if len(parts) == 2:
assert parts[1] == b"missing"
raise self.MissingFileException(rev, relpath)
_, object_type, object_len = parts
# Read the object data
blob = bytes(self._cat_file_process.stdout.read(int(object_len)))
# Read the trailing newline
assert self._cat_file_process.stdout.read(1) == b"\n"
assert len(blob) == int(object_len)
return object_type, blob
def __del__(self):
if self._cat_file_process:
self._cat_file_process.communicate()