Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / dulwich   python

Repository URL to install this package:

/ refs.py

# refs.py -- For dealing with git refs
# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#


"""Ref handling.

"""
import errno
import os
import sys

from dulwich.errors import (
    PackedRefsException,
    RefFormatError,
    )
from dulwich.objects import (
    git_line,
    valid_hexsha,
    ZERO_SHA,
    )
from dulwich.file import (
    GitFile,
    ensure_dir_exists,
    )


SYMREF = b'ref: '
LOCAL_BRANCH_PREFIX = b'refs/heads/'
LOCAL_TAG_PREFIX = b'refs/tags/'
BAD_REF_CHARS = set(b'\177 ~^:?*[')
ANNOTATED_TAG_SUFFIX = b'^{}'


def parse_symref_value(contents):
    """Parse a symref value.

    :param contents: Contents to parse
    :return: Destination
    """
    if contents.startswith(SYMREF):
        return contents[len(SYMREF):].rstrip(b'\r\n')
    raise ValueError(contents)


def check_ref_format(refname):
    """Check if a refname is correctly formatted.

    Implements all the same rules as git-check-ref-format[1].

    [1]
    http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html

    :param refname: The refname to check
    :return: True if refname is valid, False otherwise
    """
    # These could be combined into one big expression, but are listed
    # separately to parallel [1].
    if b'/.' in refname or refname.startswith(b'.'):
        return False
    if b'/' not in refname:
        return False
    if b'..' in refname:
        return False
    for i, c in enumerate(refname):
        if ord(refname[i:i+1]) < 0o40 or c in BAD_REF_CHARS:
            return False
    if refname[-1] in b'/.':
        return False
    if refname.endswith(b'.lock'):
        return False
    if b'@{' in refname:
        return False
    if b'\\' in refname:
        return False
    return True


class RefsContainer(object):
    """A container for refs."""

    def __init__(self, logger=None):
        self._logger = logger

    def _log(self, ref, old_sha, new_sha, committer=None, timestamp=None,
             timezone=None, message=None):
        if self._logger is None:
            return
        if message is None:
            return
        self._logger(ref, old_sha, new_sha, committer, timestamp,
                     timezone, message)

    def set_symbolic_ref(self, name, other, committer=None, timestamp=None,
                         timezone=None, message=None):
        """Make a ref point at another ref.

        :param name: Name of the ref to set
        :param other: Name of the ref to point at
        :param message: Optional message
        """
        raise NotImplementedError(self.set_symbolic_ref)

    def get_packed_refs(self):
        """Get contents of the packed-refs file.

        :return: Dictionary mapping ref names to SHA1s

        :note: Will return an empty dictionary when no packed-refs file is
            present.
        """
        raise NotImplementedError(self.get_packed_refs)

    def get_peeled(self, name):
        """Return the cached peeled value of a ref, if available.

        :param name: Name of the ref to peel
        :return: The peeled value of the ref. If the ref is known not point to
            a tag, this will be the SHA the ref refers to. If the ref may point
            to a tag, but no cached information is available, None is returned.
        """
        return None

    def import_refs(self, base, other, committer=None, timestamp=None,
                    timezone=None, message=None, prune=False):
        if prune:
            to_delete = set(self.subkeys(base))
        else:
            to_delete = set()
        for name, value in other.items():
            self.set_if_equals(b'/'.join((base, name)), None, value,
                               message=message)
            if to_delete:
                try:
                    to_delete.remove(name)
                except KeyError:
                    pass
        for ref in to_delete:
            self.remove_if_equals(b'/'.join((base, ref)), None)

    def allkeys(self):
        """All refs present in this container."""
        raise NotImplementedError(self.allkeys)

    def keys(self, base=None):
        """Refs present in this container.

        :param base: An optional base to return refs under.
        :return: An unsorted set of valid refs in this container, including
            packed refs.
        """
        if base is not None:
            return self.subkeys(base)
        else:
            return self.allkeys()

    def subkeys(self, base):
        """Refs present in this container under a base.

        :param base: The base to return refs under.
        :return: A set of valid refs in this container under the base; the base
            prefix is stripped from the ref names returned.
        """
        keys = set()
        base_len = len(base) + 1
        for refname in self.allkeys():
            if refname.startswith(base):
                keys.add(refname[base_len:])
        return keys

    def as_dict(self, base=None):
        """Return the contents of this container as a dictionary.

        """
        ret = {}
        keys = self.keys(base)
        if base is None:
            base = b''
        else:
            base = base.rstrip(b'/')
        for key in keys:
            try:
                ret[key] = self[(base + b'/' + key).strip(b'/')]
            except KeyError:
                continue  # Unable to resolve

        return ret

    def _check_refname(self, name):
        """Ensure a refname is valid and lives in refs or is HEAD.

        HEAD is not a valid refname according to git-check-ref-format, but this
        class needs to be able to touch HEAD. Also, check_ref_format expects
        refnames without the leading 'refs/', but this class requires that
        so it cannot touch anything outside the refs dir (or HEAD).

        :param name: The name of the reference.
        :raises KeyError: if a refname is not HEAD or is otherwise not valid.
        """
        if name in (b'HEAD', b'refs/stash'):
            return
        if not name.startswith(b'refs/') or not check_ref_format(name[5:]):
            raise RefFormatError(name)

    def read_ref(self, refname):
        """Read a reference without following any references.

        :param refname: The name of the reference
        :return: The contents of the ref file, or None if it does
            not exist.
        """
        contents = self.read_loose_ref(refname)
        if not contents:
            contents = self.get_packed_refs().get(refname, None)
        return contents

    def read_loose_ref(self, name):
        """Read a loose reference and return its contents.

        :param name: the refname to read
        :return: The contents of the ref file, or None if it does
            not exist.
        """
        raise NotImplementedError(self.read_loose_ref)

    def follow(self, name):
        """Follow a reference name.

        :return: a tuple of (refnames, sha), wheres refnames are the names of
            references in the chain
        """
        contents = SYMREF + name
        depth = 0
        refnames = []
        while contents.startswith(SYMREF):
            refname = contents[len(SYMREF):]
            refnames.append(refname)
            contents = self.read_ref(refname)
            if not contents:
                break
            depth += 1
            if depth > 5:
                raise KeyError(name)
        return refnames, contents

    def _follow(self, name):
        import warnings
        warnings.warn(
            "RefsContainer._follow is deprecated. Use RefsContainer.follow "
            "instead.", DeprecationWarning)
        refnames, contents = self.follow(name)
        if not refnames:
            return (None, contents)
        return (refnames[-1], contents)

    def __contains__(self, refname):
        if self.read_ref(refname):
            return True
        return False

    def __getitem__(self, name):
        """Get the SHA1 for a reference name.

        This method follows all symbolic references.
        """
        _, sha = self.follow(name)
        if sha is None:
            raise KeyError(name)
        return sha

    def set_if_equals(self, name, old_ref, new_ref, committer=None,
                      timestamp=None, timezone=None, message=None):
        """Set a refname to new_ref only if it currently equals old_ref.

        This method follows all symbolic references if applicable for the
        subclass, and can be used to perform an atomic compare-and-swap
        operation.

        :param name: The refname to set.
        :param old_ref: The old sha the refname must refer to, or None to set
            unconditionally.
        :param new_ref: The new sha the refname will refer to.
        :param message: Message for reflog
        :return: True if the set was successful, False otherwise.
        """
        raise NotImplementedError(self.set_if_equals)

    def add_if_new(self, name, ref):
        """Add a new reference only if it does not already exist.

        :param name: Ref name
        :param ref: Ref value
        :param message: Message for reflog
        """
        raise NotImplementedError(self.add_if_new)

    def __setitem__(self, name, ref):
        """Set a reference name to point to the given SHA1.

        This method follows all symbolic references if applicable for the
        subclass.

        :note: This method unconditionally overwrites the contents of a
            reference. To update atomically only if the reference has not
            changed, use set_if_equals().
        :param name: The refname to set.
        :param ref: The new sha the refname will refer to.
        """
        self.set_if_equals(name, None, ref)

    def remove_if_equals(self, name, old_ref, committer=None,
                         timestamp=None, timezone=None, message=None):
        """Remove a refname only if it currently equals old_ref.

        This method does not follow symbolic references, even if applicable for
        the subclass. It can be used to perform an atomic compare-and-delete
        operation.

        :param name: The refname to delete.
        :param old_ref: The old sha the refname must refer to, or None to
            delete unconditionally.
        :param message: Message for reflog
        :return: True if the delete was successful, False otherwise.
        """
        raise NotImplementedError(self.remove_if_equals)

    def __delitem__(self, name):
        """Remove a refname.
Loading ...