# refs.py -- For dealing with git refs
# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#
"""Ref handling.
"""
import errno
import os
import sys
from dulwich.errors import (
PackedRefsException,
RefFormatError,
)
from dulwich.objects import (
git_line,
valid_hexsha,
ZERO_SHA,
)
from dulwich.file import (
GitFile,
ensure_dir_exists,
)
SYMREF = b'ref: '
LOCAL_BRANCH_PREFIX = b'refs/heads/'
LOCAL_TAG_PREFIX = b'refs/tags/'
BAD_REF_CHARS = set(b'\177 ~^:?*[')
ANNOTATED_TAG_SUFFIX = b'^{}'
def parse_symref_value(contents):
"""Parse a symref value.
:param contents: Contents to parse
:return: Destination
"""
if contents.startswith(SYMREF):
return contents[len(SYMREF):].rstrip(b'\r\n')
raise ValueError(contents)
def check_ref_format(refname):
"""Check if a refname is correctly formatted.
Implements all the same rules as git-check-ref-format[1].
[1]
http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
:param refname: The refname to check
:return: True if refname is valid, False otherwise
"""
# These could be combined into one big expression, but are listed
# separately to parallel [1].
if b'/.' in refname or refname.startswith(b'.'):
return False
if b'/' not in refname:
return False
if b'..' in refname:
return False
for i, c in enumerate(refname):
if ord(refname[i:i+1]) < 0o40 or c in BAD_REF_CHARS:
return False
if refname[-1] in b'/.':
return False
if refname.endswith(b'.lock'):
return False
if b'@{' in refname:
return False
if b'\\' in refname:
return False
return True
class RefsContainer(object):
"""A container for refs."""
def __init__(self, logger=None):
self._logger = logger
def _log(self, ref, old_sha, new_sha, committer=None, timestamp=None,
timezone=None, message=None):
if self._logger is None:
return
if message is None:
return
self._logger(ref, old_sha, new_sha, committer, timestamp,
timezone, message)
def set_symbolic_ref(self, name, other, committer=None, timestamp=None,
timezone=None, message=None):
"""Make a ref point at another ref.
:param name: Name of the ref to set
:param other: Name of the ref to point at
:param message: Optional message
"""
raise NotImplementedError(self.set_symbolic_ref)
def get_packed_refs(self):
"""Get contents of the packed-refs file.
:return: Dictionary mapping ref names to SHA1s
:note: Will return an empty dictionary when no packed-refs file is
present.
"""
raise NotImplementedError(self.get_packed_refs)
def get_peeled(self, name):
"""Return the cached peeled value of a ref, if available.
:param name: Name of the ref to peel
:return: The peeled value of the ref. If the ref is known not point to
a tag, this will be the SHA the ref refers to. If the ref may point
to a tag, but no cached information is available, None is returned.
"""
return None
def import_refs(self, base, other, committer=None, timestamp=None,
timezone=None, message=None, prune=False):
if prune:
to_delete = set(self.subkeys(base))
else:
to_delete = set()
for name, value in other.items():
self.set_if_equals(b'/'.join((base, name)), None, value,
message=message)
if to_delete:
try:
to_delete.remove(name)
except KeyError:
pass
for ref in to_delete:
self.remove_if_equals(b'/'.join((base, ref)), None)
def allkeys(self):
"""All refs present in this container."""
raise NotImplementedError(self.allkeys)
def keys(self, base=None):
"""Refs present in this container.
:param base: An optional base to return refs under.
:return: An unsorted set of valid refs in this container, including
packed refs.
"""
if base is not None:
return self.subkeys(base)
else:
return self.allkeys()
def subkeys(self, base):
"""Refs present in this container under a base.
:param base: The base to return refs under.
:return: A set of valid refs in this container under the base; the base
prefix is stripped from the ref names returned.
"""
keys = set()
base_len = len(base) + 1
for refname in self.allkeys():
if refname.startswith(base):
keys.add(refname[base_len:])
return keys
def as_dict(self, base=None):
"""Return the contents of this container as a dictionary.
"""
ret = {}
keys = self.keys(base)
if base is None:
base = b''
else:
base = base.rstrip(b'/')
for key in keys:
try:
ret[key] = self[(base + b'/' + key).strip(b'/')]
except KeyError:
continue # Unable to resolve
return ret
def _check_refname(self, name):
"""Ensure a refname is valid and lives in refs or is HEAD.
HEAD is not a valid refname according to git-check-ref-format, but this
class needs to be able to touch HEAD. Also, check_ref_format expects
refnames without the leading 'refs/', but this class requires that
so it cannot touch anything outside the refs dir (or HEAD).
:param name: The name of the reference.
:raises KeyError: if a refname is not HEAD or is otherwise not valid.
"""
if name in (b'HEAD', b'refs/stash'):
return
if not name.startswith(b'refs/') or not check_ref_format(name[5:]):
raise RefFormatError(name)
def read_ref(self, refname):
"""Read a reference without following any references.
:param refname: The name of the reference
:return: The contents of the ref file, or None if it does
not exist.
"""
contents = self.read_loose_ref(refname)
if not contents:
contents = self.get_packed_refs().get(refname, None)
return contents
def read_loose_ref(self, name):
"""Read a loose reference and return its contents.
:param name: the refname to read
:return: The contents of the ref file, or None if it does
not exist.
"""
raise NotImplementedError(self.read_loose_ref)
def follow(self, name):
"""Follow a reference name.
:return: a tuple of (refnames, sha), wheres refnames are the names of
references in the chain
"""
contents = SYMREF + name
depth = 0
refnames = []
while contents.startswith(SYMREF):
refname = contents[len(SYMREF):]
refnames.append(refname)
contents = self.read_ref(refname)
if not contents:
break
depth += 1
if depth > 5:
raise KeyError(name)
return refnames, contents
def _follow(self, name):
import warnings
warnings.warn(
"RefsContainer._follow is deprecated. Use RefsContainer.follow "
"instead.", DeprecationWarning)
refnames, contents = self.follow(name)
if not refnames:
return (None, contents)
return (refnames[-1], contents)
def __contains__(self, refname):
if self.read_ref(refname):
return True
return False
def __getitem__(self, name):
"""Get the SHA1 for a reference name.
This method follows all symbolic references.
"""
_, sha = self.follow(name)
if sha is None:
raise KeyError(name)
return sha
def set_if_equals(self, name, old_ref, new_ref, committer=None,
timestamp=None, timezone=None, message=None):
"""Set a refname to new_ref only if it currently equals old_ref.
This method follows all symbolic references if applicable for the
subclass, and can be used to perform an atomic compare-and-swap
operation.
:param name: The refname to set.
:param old_ref: The old sha the refname must refer to, or None to set
unconditionally.
:param new_ref: The new sha the refname will refer to.
:param message: Message for reflog
:return: True if the set was successful, False otherwise.
"""
raise NotImplementedError(self.set_if_equals)
def add_if_new(self, name, ref):
"""Add a new reference only if it does not already exist.
:param name: Ref name
:param ref: Ref value
:param message: Message for reflog
"""
raise NotImplementedError(self.add_if_new)
def __setitem__(self, name, ref):
"""Set a reference name to point to the given SHA1.
This method follows all symbolic references if applicable for the
subclass.
:note: This method unconditionally overwrites the contents of a
reference. To update atomically only if the reference has not
changed, use set_if_equals().
:param name: The refname to set.
:param ref: The new sha the refname will refer to.
"""
self.set_if_equals(name, None, ref)
def remove_if_equals(self, name, old_ref, committer=None,
timestamp=None, timezone=None, message=None):
"""Remove a refname only if it currently equals old_ref.
This method does not follow symbolic references, even if applicable for
the subclass. It can be used to perform an atomic compare-and-delete
operation.
:param name: The refname to delete.
:param old_ref: The old sha the refname must refer to, or None to
delete unconditionally.
:param message: Message for reflog
:return: True if the delete was successful, False otherwise.
"""
raise NotImplementedError(self.remove_if_equals)
def __delitem__(self, name):
"""Remove a refname.
Loading ...