"""Handles all VCS (version control) support"""
from __future__ import absolute_import
import errno
import logging
import os
import shutil
import sys
from pip._vendor.six.moves.urllib import parse as urllib_parse
from pip._internal.exceptions import BadCommand
from pip._internal.utils.misc import (
display_path, backup_dir, call_subprocess, rmtree, ask_path_exists,
)
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import Dict, Optional, Tuple # noqa: F401
from pip._internal.cli.base_command import Command # noqa: F401
__all__ = ['vcs', 'get_src_requirement']
logger = logging.getLogger(__name__)
class RevOptions(object):
"""
Encapsulates a VCS-specific revision to install, along with any VCS
install options.
Instances of this class should be treated as if immutable.
"""
def __init__(self, vcs, rev=None, extra_args=None):
"""
Args:
vcs: a VersionControl object.
rev: the name of the revision to install.
extra_args: a list of extra options.
"""
if extra_args is None:
extra_args = []
self.extra_args = extra_args
self.rev = rev
self.vcs = vcs
def __repr__(self):
return '<RevOptions {}: rev={!r}>'.format(self.vcs.name, self.rev)
@property
def arg_rev(self):
if self.rev is None:
return self.vcs.default_arg_rev
return self.rev
def to_args(self):
"""
Return the VCS-specific command arguments.
"""
args = []
rev = self.arg_rev
if rev is not None:
args += self.vcs.get_base_rev_args(rev)
args += self.extra_args
return args
def to_display(self):
if not self.rev:
return ''
return ' (to revision {})'.format(self.rev)
def make_new(self, rev):
"""
Make a copy of the current instance, but with a new rev.
Args:
rev: the name of the revision for the new object.
"""
return self.vcs.make_rev_options(rev, extra_args=self.extra_args)
class VcsSupport(object):
_registry = {} # type: Dict[str, Command]
schemes = ['ssh', 'git', 'hg', 'bzr', 'sftp', 'svn']
def __init__(self):
# Register more schemes with urlparse for various version control
# systems
urllib_parse.uses_netloc.extend(self.schemes)
# Python >= 2.7.4, 3.3 doesn't have uses_fragment
if getattr(urllib_parse, 'uses_fragment', None):
urllib_parse.uses_fragment.extend(self.schemes)
super(VcsSupport, self).__init__()
def __iter__(self):
return self._registry.__iter__()
@property
def backends(self):
return list(self._registry.values())
@property
def dirnames(self):
return [backend.dirname for backend in self.backends]
@property
def all_schemes(self):
schemes = []
for backend in self.backends:
schemes.extend(backend.schemes)
return schemes
def register(self, cls):
if not hasattr(cls, 'name'):
logger.warning('Cannot register VCS %s', cls.__name__)
return
if cls.name not in self._registry:
self._registry[cls.name] = cls
logger.debug('Registered VCS backend: %s', cls.name)
def unregister(self, cls=None, name=None):
if name in self._registry:
del self._registry[name]
elif cls in self._registry.values():
del self._registry[cls.name]
else:
logger.warning('Cannot unregister because no class or name given')
def get_backend_name(self, location):
"""
Return the name of the version control backend if found at given
location, e.g. vcs.get_backend_name('/path/to/vcs/checkout')
"""
for vc_type in self._registry.values():
if vc_type.controls_location(location):
logger.debug('Determine that %s uses VCS: %s',
location, vc_type.name)
return vc_type.name
return None
def get_backend(self, name):
name = name.lower()
if name in self._registry:
return self._registry[name]
def get_backend_from_location(self, location):
vc_type = self.get_backend_name(location)
if vc_type:
return self.get_backend(vc_type)
return None
vcs = VcsSupport()
class VersionControl(object):
name = ''
dirname = ''
# List of supported schemes for this Version Control
schemes = () # type: Tuple[str, ...]
# Iterable of environment variable names to pass to call_subprocess().
unset_environ = () # type: Tuple[str, ...]
default_arg_rev = None # type: Optional[str]
def __init__(self, url=None, *args, **kwargs):
self.url = url
super(VersionControl, self).__init__(*args, **kwargs)
def get_base_rev_args(self, rev):
"""
Return the base revision arguments for a vcs command.
Args:
rev: the name of a revision to install. Cannot be None.
"""
raise NotImplementedError
def make_rev_options(self, rev=None, extra_args=None):
"""
Return a RevOptions object.
Args:
rev: the name of a revision to install.
extra_args: a list of extra options.
"""
return RevOptions(self, rev, extra_args=extra_args)
def _is_local_repository(self, repo):
"""
posix absolute paths start with os.path.sep,
win32 ones start with drive (like c:\\folder)
"""
drive, tail = os.path.splitdrive(repo)
return repo.startswith(os.path.sep) or drive
def export(self, location):
"""
Export the repository at the url to the destination location
i.e. only download the files, without vcs informations
"""
raise NotImplementedError
def get_netloc_and_auth(self, netloc, scheme):
"""
Parse the repository URL's netloc, and return the new netloc to use
along with auth information.
Args:
netloc: the original repository URL netloc.
scheme: the repository URL's scheme without the vcs prefix.
This is mainly for the Subversion class to override, so that auth
information can be provided via the --username and --password options
instead of through the URL. For other subclasses like Git without
such an option, auth information must stay in the URL.
Returns: (netloc, (username, password)).
"""
return netloc, (None, None)
def get_url_rev_and_auth(self, url):
"""
Parse the repository URL to use, and return the URL, revision,
and auth info to use.
Returns: (url, rev, (username, password)).
"""
scheme, netloc, path, query, frag = urllib_parse.urlsplit(url)
if '+' not in scheme:
raise ValueError(
"Sorry, {!r} is a malformed VCS url. "
"The format is <vcs>+<protocol>://<url>, "
"e.g. svn+http://myrepo/svn/MyApp#egg=MyApp".format(url)
)
# Remove the vcs prefix.
scheme = scheme.split('+', 1)[1]
netloc, user_pass = self.get_netloc_and_auth(netloc, scheme)
rev = None
if '@' in path:
path, rev = path.rsplit('@', 1)
url = urllib_parse.urlunsplit((scheme, netloc, path, query, ''))
return url, rev, user_pass
def make_rev_args(self, username, password):
"""
Return the RevOptions "extra arguments" to use in obtain().
"""
return []
def get_url_rev_options(self, url):
"""
Return the URL and RevOptions object to use in obtain() and in
some cases export(), as a tuple (url, rev_options).
"""
url, rev, user_pass = self.get_url_rev_and_auth(url)
username, password = user_pass
extra_args = self.make_rev_args(username, password)
rev_options = self.make_rev_options(rev, extra_args=extra_args)
return url, rev_options
def normalize_url(self, url):
"""
Normalize a URL for comparison by unquoting it and removing any
trailing slash.
"""
return urllib_parse.unquote(url).rstrip('/')
def compare_urls(self, url1, url2):
"""
Compare two repo URLs for identity, ignoring incidental differences.
"""
return (self.normalize_url(url1) == self.normalize_url(url2))
def fetch_new(self, dest, url, rev_options):
"""
Fetch a revision from a repository, in the case that this is the
first fetch from the repository.
Args:
dest: the directory to fetch the repository to.
rev_options: a RevOptions object.
"""
raise NotImplementedError
def switch(self, dest, url, rev_options):
"""
Switch the repo at ``dest`` to point to ``URL``.
Args:
rev_options: a RevOptions object.
"""
raise NotImplementedError
def update(self, dest, url, rev_options):
"""
Update an already-existing repo to the given ``rev_options``.
Args:
rev_options: a RevOptions object.
"""
raise NotImplementedError
def is_commit_id_equal(self, dest, name):
"""
Return whether the id of the current commit equals the given name.
Args:
dest: the repository directory.
name: a string name.
"""
raise NotImplementedError
def obtain(self, dest):
"""
Install or update in editable mode the package represented by this
VersionControl object.
Args:
dest: the repository directory in which to install or update.
"""
url, rev_options = self.get_url_rev_options(self.url)
if not os.path.exists(dest):
self.fetch_new(dest, url, rev_options)
return
rev_display = rev_options.to_display()
if self.is_repository_directory(dest):
existing_url = self.get_url(dest)
if self.compare_urls(existing_url, url):
logger.debug(
'%s in %s exists, and has correct URL (%s)',
self.repo_name.title(),
display_path(dest),
url,
)
if not self.is_commit_id_equal(dest, rev_options.rev):
Loading ...