Repository URL to install this package:
Version:
0.1.16-1 ▾
|
odigos-demo-inventory
/
opt
/
odigos-demo-inventory
/
site-packages
/
poetry
/
vcs
/
git
/
backend.py
|
---|
from __future__ import annotations
import dataclasses
import logging
import re
from pathlib import Path
from subprocess import CalledProcessError
from typing import TYPE_CHECKING
from urllib.parse import urljoin
from urllib.parse import urlparse
from urllib.parse import urlunparse
from dulwich import porcelain
from dulwich.client import HTTPUnauthorized
from dulwich.client import get_transport_and_path
from dulwich.config import ConfigFile
from dulwich.config import parse_submodules
from dulwich.errors import NotGitRepository
from dulwich.index import IndexEntry
from dulwich.refs import ANNOTATED_TAG_SUFFIX
from dulwich.repo import Repo
from poetry.console.exceptions import PoetryRuntimeError
from poetry.utils.authenticator import get_default_authenticator
from poetry.utils.helpers import remove_directory
if TYPE_CHECKING:
from dulwich.client import FetchPackResult
from dulwich.client import GitClient
logger = logging.getLogger(__name__)
# A relative URL by definition starts with ../ or ./
RELATIVE_SUBMODULE_REGEX = re.compile(r"^\.{1,2}/")
# Common error messages
ERROR_MESSAGE_NOTE = (
"<b>Note:</> This error arises from interacting with "
"the specified vcs source and is likely not a "
"Poetry issue."
)
ERROR_MESSAGE_PROBLEMS_SECTION_START = (
"This issue could be caused by any of the following;\n\n"
"- there are network issues in this environment"
)
ERROR_MESSAGE_BAD_REVISION = (
"- the revision ({revision}) you have specified\n"
" - was misspelled\n"
" - is invalid (must be a sha or symref)\n"
" - is not present on remote"
)
ERROR_MESSAGE_BAD_REMOTE = (
"- the remote ({remote}) you have specified\n"
" - was misspelled\n"
" - does not exist\n"
" - requires credentials that were either not configured or is incorrect\n"
" - is in accessible due to network issues"
)
def is_revision_sha(revision: str | None) -> bool:
return re.match(r"^\b[0-9a-f]{5,40}\b$", revision or "") is not None
def annotated_tag(ref: str | bytes) -> bytes:
if isinstance(ref, str):
ref = ref.encode("utf-8")
return ref + ANNOTATED_TAG_SUFFIX
@dataclasses.dataclass
class GitRefSpec:
branch: str | None = None
revision: str | None = None
tag: str | None = None
ref: bytes = dataclasses.field(default_factory=lambda: b"HEAD")
def resolve(self, remote_refs: FetchPackResult, repo: Repo) -> None:
"""
Resolve the ref using the provided remote refs.
"""
self._normalise(remote_refs=remote_refs, repo=repo)
self._set_head(remote_refs=remote_refs)
def _normalise(self, remote_refs: FetchPackResult, repo: Repo) -> None:
"""
Internal helper method to determine if given revision is
1. a branch or tag; if so, set corresponding properties.
2. a short sha; if so, resolve full sha and set as revision
"""
if self.revision:
ref = f"refs/tags/{self.revision}".encode()
if ref in remote_refs.refs or annotated_tag(ref) in remote_refs.refs:
# this is a tag, incorrectly specified as a revision, tags take priority
self.tag = self.revision
self.revision = None
elif (
self.revision.encode("utf-8") in remote_refs.refs
or f"refs/heads/{self.revision}".encode() in remote_refs.refs
):
# this is most likely a ref spec or a branch incorrectly specified
self.branch = self.revision
self.revision = None
elif (
self.branch
and f"refs/heads/{self.branch}".encode() not in remote_refs.refs
and (
f"refs/tags/{self.branch}".encode() in remote_refs.refs
or annotated_tag(f"refs/tags/{self.branch}") in remote_refs.refs
)
):
# this is a tag incorrectly specified as a branch
self.tag = self.branch
self.branch = None
if self.revision and self.is_sha_short:
# revision is a short sha, resolve to full sha
short_sha = self.revision.encode("utf-8")
for sha in remote_refs.refs.values():
if sha.startswith(short_sha):
self.revision = sha.decode("utf-8")
return
# no heads with such SHA, let's check all objects
for sha in repo.object_store.iter_prefix(short_sha):
self.revision = sha.decode("utf-8")
return
def _set_head(self, remote_refs: FetchPackResult) -> None:
"""
Internal helper method to populate ref and set it's sha as the remote's head
and default ref.
"""
self.ref = remote_refs.symrefs[b"HEAD"]
if self.revision:
head = self.revision.encode("utf-8")
else:
if self.tag:
ref = f"refs/tags/{self.tag}".encode()
annotated = annotated_tag(ref)
self.ref = annotated if annotated in remote_refs.refs else ref
elif self.branch:
self.ref = (
self.branch.encode("utf-8")
if self.is_ref
else f"refs/heads/{self.branch}".encode()
)
head = remote_refs.refs[self.ref]
remote_refs.refs[self.ref] = remote_refs.refs[b"HEAD"] = head
@property
def key(self) -> str:
return self.revision or self.branch or self.tag or self.ref.decode("utf-8")
@property
def is_sha(self) -> bool:
return is_revision_sha(revision=self.revision)
@property
def is_ref(self) -> bool:
return self.branch is not None and (
self.branch.startswith("refs/") or self.branch == "HEAD"
)
@property
def is_sha_short(self) -> bool:
return self.revision is not None and self.is_sha and len(self.revision) < 40
@dataclasses.dataclass
class GitRepoLocalInfo:
repo: dataclasses.InitVar[Repo | Path]
origin: str = dataclasses.field(init=False)
revision: str = dataclasses.field(init=False)
def __post_init__(self, repo: Repo | Path) -> None:
repo = Git.as_repo(repo=repo) if not isinstance(repo, Repo) else repo
self.origin = Git.get_remote_url(repo=repo, remote="origin")
self.revision = Git.get_revision(repo=repo)
class Git:
@staticmethod
def as_repo(repo: Path) -> Repo:
return Repo(str(repo))
@staticmethod
def get_remote_url(repo: Repo, remote: str = "origin") -> str:
with repo:
config = repo.get_config()
section = (b"remote", remote.encode("utf-8"))
url = ""
if config.has_section(section):
value = config.get(section, b"url")
url = value.decode("utf-8")
return url
@staticmethod
def get_revision(repo: Repo) -> str:
with repo:
return repo.get_peeled(b"HEAD").decode("utf-8")
@classmethod
def info(cls, repo: Repo | Path) -> GitRepoLocalInfo:
return GitRepoLocalInfo(repo=repo)
@staticmethod
def get_name_from_source_url(url: str) -> str:
return re.sub(r"(.git)?$", "", url.rstrip("/").rsplit("/", 1)[-1])
@classmethod
def _fetch_remote_refs(cls, url: str, local: Repo) -> FetchPackResult:
"""
Helper method to fetch remote refs.
"""
client: GitClient
path: str
kwargs: dict[str, str] = {}
credentials = get_default_authenticator().get_credentials_for_git_url(url=url)
if credentials.password and credentials.username:
# we do this conditionally as otherwise, dulwich might complain if these
# parameters are passed in for an ssh url
kwargs["username"] = credentials.username
kwargs["password"] = credentials.password
config = local.get_config_stack()
client, path = get_transport_and_path(url, config=config, **kwargs)
with local:
result: FetchPackResult = client.fetch(
path,
local,
determine_wants=local.object_store.determine_wants_all,
)
return result
@staticmethod
def _clone_legacy(url: str, refspec: GitRefSpec, target: Path) -> Repo:
"""
Helper method to facilitate fallback to using system provided git client via
subprocess calls.
"""
from poetry.vcs.git.system import SystemGit
logger.debug("Cloning '%s' using system git client", url)
if target.exists():
remove_directory(path=target, force=True)
revision = refspec.tag or refspec.branch or refspec.revision or "HEAD"
try:
SystemGit.clone(url, target)
except CalledProcessError as e:
raise PoetryRuntimeError.create(
reason=f"<error>Failed to clone <info>{url}</>, check your git configuration and permissions for this repository.</>",
exception=e,
info=[
ERROR_MESSAGE_NOTE,
ERROR_MESSAGE_PROBLEMS_SECTION_START,
ERROR_MESSAGE_BAD_REMOTE.format(remote=url),
],
)
if revision:
revision.replace("refs/head/", "")
revision.replace("refs/tags/", "")
try:
SystemGit.checkout(revision, target)
except CalledProcessError as e:
raise PoetryRuntimeError.create(
reason=f"<error>Failed to checkout {url} at '{revision}'.</>",
exception=e,
info=[
ERROR_MESSAGE_NOTE,
ERROR_MESSAGE_PROBLEMS_SECTION_START,
ERROR_MESSAGE_BAD_REVISION.format(revision=revision),
],
)
repo = Repo(str(target))
return repo
@classmethod
def _clone(cls, url: str, refspec: GitRefSpec, target: Path) -> Repo:
"""
Helper method to clone a remove repository at the given `url` at the specified
ref spec.
"""
local: Repo
if not target.exists():
local = Repo.init(str(target), mkdir=True)
porcelain.remote_add(local, "origin", url)
else:
local = Repo(str(target))
remote_refs = cls._fetch_remote_refs(url=url, local=local)
logger.debug(
"Cloning <c2>%s</> at '<c2>%s</>' to <c1>%s</>", url, refspec.key, target
)
try:
refspec.resolve(remote_refs=remote_refs, repo=local)
except KeyError: # branch / ref does not exist
raise PoetryRuntimeError.create(
reason=f"<error>Failed to clone {url} at '{refspec.key}', verify ref exists on remote.</>",
info=[
ERROR_MESSAGE_NOTE,
ERROR_MESSAGE_PROBLEMS_SECTION_START,
ERROR_MESSAGE_BAD_REVISION.format(revision=refspec.key),
],
)
try:
# ensure local HEAD matches remote
local.refs[b"HEAD"] = remote_refs.refs[b"HEAD"]
except ValueError:
raise PoetryRuntimeError.create(
reason=f"<error>Failed to clone {url} at '{refspec.key}', verify ref exists on remote.</>",
info=[
ERROR_MESSAGE_NOTE,
ERROR_MESSAGE_PROBLEMS_SECTION_START,
ERROR_MESSAGE_BAD_REVISION.format(revision=refspec.key),
f"\nThis particular error is prevalent when {refspec.key} could not be resolved to a specific commit sha.",
],
)
if refspec.is_ref:
# set ref to current HEAD
local.refs[refspec.ref] = local.refs[b"HEAD"]
for base, prefix in {
(b"refs/remotes/origin", b"refs/heads/"),
(b"refs/tags", b"refs/tags"),
}:
local.refs.import_refs(
base=base,
other={
n[len(prefix) :]: v
for (n, v) in remote_refs.refs.items()
if n.startswith(prefix) and not n.endswith(ANNOTATED_TAG_SUFFIX)
},
)
try:
with local:
local.reset_index()
except (AssertionError, KeyError) as e:
# this implies the ref we need does not exist or is invalid
if isinstance(e, KeyError):
# the local copy is at a bad state, lets remove it
logger.debug(
"Removing local clone (<c1>%s</>) of repository as it is in a"
" broken state.",
local.path,
)
remove_directory(Path(local.path), force=True)
if isinstance(e, AssertionError) and "Invalid object name" not in str(e):
raise
raise PoetryRuntimeError.create(
reason=f"<error>Failed to clone {url} at '{refspec.key}', verify ref exists on remote.</>",
info=[
ERROR_MESSAGE_NOTE,
ERROR_MESSAGE_PROBLEMS_SECTION_START,
ERROR_MESSAGE_BAD_REVISION.format(revision=refspec.key),
],
exception=e,
)
return local
@classmethod
def _clone_submodules(cls, repo: Repo) -> None:
"""
Helper method to identify configured submodules and clone them recursively.
"""
repo_root = Path(repo.path)
for submodule in cls._get_submodules(repo):
path_absolute = repo_root / submodule.path
source_root = path_absolute.parent
source_root.mkdir(parents=True, exist_ok=True)
cls.clone(
url=submodule.url,
source_root=source_root,
name=path_absolute.name,
revision=submodule.revision,
clean=path_absolute.exists()
and not path_absolute.joinpath(".git").is_dir(),
)
@classmethod
def _get_submodules(cls, repo: Repo) -> list[SubmoduleInfo]:
modules_config = Path(repo.path, ".gitmodules")
if not modules_config.exists():
return []
config = ConfigFile.from_path(str(modules_config))
submodules: list[SubmoduleInfo] = []
for path, url, name in parse_submodules(config):
url_str = url.decode("utf-8")
path_str = path.decode("utf-8")
name_str = name.decode("utf-8")
if RELATIVE_SUBMODULE_REGEX.search(url_str):
url_str = urlpathjoin(f"{cls.get_remote_url(repo)}/", url_str)
with repo:
index = repo.open_index()
try:
entry = index[path]
except KeyError:
logger.debug(
"Skip submodule %s in %s, path %s not found",
name,
repo.path,
path,
)
continue
assert isinstance(entry, IndexEntry)
revision = entry.sha.decode("utf-8")
submodules.append(
SubmoduleInfo(
path=path_str,
url=url_str,
name=name_str,
revision=revision,
)
)
return submodules
@staticmethod
def is_using_legacy_client() -> bool:
from poetry.config.config import Config
legacy_client: bool = Config.create().get("system-git-client", False)
return legacy_client
@staticmethod
def get_default_source_root() -> Path:
from poetry.config.config import Config
return Path(Config.create().get("cache-dir")) / "src"
@classmethod
def clone(
cls,
url: str,
name: str | None = None,
branch: str | None = None,
tag: str | None = None,
revision: str | None = None,
source_root: Path | None = None,
clean: bool = False,
) -> Repo:
source_root = source_root or cls.get_default_source_root()
source_root.mkdir(parents=True, exist_ok=True)
name = name or cls.get_name_from_source_url(url=url)
target = source_root / name
refspec = GitRefSpec(branch=branch, revision=revision, tag=tag)
if target.exists():
if clean:
# force clean the local copy if it exists, do not reuse
remove_directory(target, force=True)
else:
# check if the current local copy matches the requested ref spec
try:
current_repo = Repo(str(target))
with current_repo:
# we use peeled sha here to ensure tags are resolved consistently
current_sha = current_repo.get_peeled(b"HEAD").decode("utf-8")
except (NotGitRepository, AssertionError, KeyError):
# something is wrong with the current checkout, clean it
remove_directory(target, force=True)
else:
if not is_revision_sha(revision=current_sha):
# head is not a sha, this will cause issues later, lets reset
remove_directory(target, force=True)
elif (
refspec.is_sha
and refspec.revision is not None
and current_sha.startswith(refspec.revision)
):
# if revision is used short-circuit remote fetch head matches
return current_repo
try:
if not cls.is_using_legacy_client():
local = cls._clone(url=url, refspec=refspec, target=target)
cls._clone_submodules(repo=local)
return local
except HTTPUnauthorized:
# we do this here to handle http authenticated repositories as dulwich
# does not currently support using credentials from git-credential helpers.
# upstream issue: https://github.com/jelmer/dulwich/issues/873
#
# this is a little inefficient, however preferred as this is transparent
# without additional configuration or changes for existing projects that
# use http basic auth credentials.
logger.debug(
"Unable to fetch from private repository '%s', falling back to"
" system git",
url,
)
# fallback to legacy git client
return cls._clone_legacy(url=url, refspec=refspec, target=target)
def urlpathjoin(base: str, path: str) -> str:
"""
Allow any URL to be joined with a path
This works around an issue with urllib.parse.urljoin where it only handles
relative URLs for protocols contained in urllib.parse.uses_relative. As it
happens common protocols used with git, like ssh or git+ssh are not in that
list.
Thus we need to implement our own version of urljoin that handles all URLs
protocols. This is accomplished by using urlparse and urlunparse to split
the URL into its components, join the path, and then reassemble the URL.
See: https://github.com/python-poetry/poetry/issues/6499#issuecomment-1564712609
"""
parsed_base = urlparse(base)
new = parsed_base._replace(path=urljoin(parsed_base.path, path))
return urlunparse(new)
@dataclasses.dataclass
class SubmoduleInfo:
path: str
url: str
name: str
revision: str