Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / dulwich   python

Repository URL to install this package:

/ objects.py

# objects.py -- Access to base git objects
# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#

"""Access to base git objects."""

import binascii
from io import BytesIO
from collections import namedtuple
import os
import posixpath
import stat
import sys
import warnings
import zlib
from hashlib import sha1

from dulwich.errors import (
    ChecksumMismatch,
    NotBlobError,
    NotCommitError,
    NotTagError,
    NotTreeError,
    ObjectFormatException,
    EmptyFileException,
    )
from dulwich.file import GitFile


ZERO_SHA = b'0' * 40

# Header fields for commits
_TREE_HEADER = b'tree'
_PARENT_HEADER = b'parent'
_AUTHOR_HEADER = b'author'
_COMMITTER_HEADER = b'committer'
_ENCODING_HEADER = b'encoding'
_MERGETAG_HEADER = b'mergetag'
_GPGSIG_HEADER = b'gpgsig'

# Header fields for objects
_OBJECT_HEADER = b'object'
_TYPE_HEADER = b'type'
_TAG_HEADER = b'tag'
_TAGGER_HEADER = b'tagger'


S_IFGITLINK = 0o160000


MAX_TIME = 9223372036854775807  # (2**63) - 1 - signed long int max

BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----"


def S_ISGITLINK(m):
    """Check if a mode indicates a submodule.

    :param m: Mode to check
    :return: a ``boolean``
    """
    return (stat.S_IFMT(m) == S_IFGITLINK)


def _decompress(string):
    dcomp = zlib.decompressobj()
    dcomped = dcomp.decompress(string)
    dcomped += dcomp.flush()
    return dcomped


def sha_to_hex(sha):
    """Takes a string and returns the hex of the sha within"""
    hexsha = binascii.hexlify(sha)
    assert len(hexsha) == 40, "Incorrect length of sha1 string: %d" % hexsha
    return hexsha


def hex_to_sha(hex):
    """Takes a hex sha and returns a binary sha"""
    assert len(hex) == 40, "Incorrect length of hexsha: %s" % hex
    try:
        return binascii.unhexlify(hex)
    except TypeError as exc:
        if not isinstance(hex, bytes):
            raise
        raise ValueError(exc.args[0])


def valid_hexsha(hex):
    if len(hex) != 40:
        return False
    try:
        binascii.unhexlify(hex)
    except (TypeError, binascii.Error):
        return False
    else:
        return True


def hex_to_filename(path, hex):
    """Takes a hex sha and returns its filename relative to the given path."""
    # os.path.join accepts bytes or unicode, but all args must be of the same
    # type. Make sure that hex which is expected to be bytes, is the same type
    # as path.
    if getattr(path, 'encode', None) is not None:
        hex = hex.decode('ascii')
    dir = hex[:2]
    file = hex[2:]
    # Check from object dir
    return os.path.join(path, dir, file)


def filename_to_hex(filename):
    """Takes an object filename and returns its corresponding hex sha."""
    # grab the last (up to) two path components
    names = filename.rsplit(os.path.sep, 2)[-2:]
    errmsg = "Invalid object filename: %s" % filename
    assert len(names) == 2, errmsg
    base, rest = names
    assert len(base) == 2 and len(rest) == 38, errmsg
    hex = (base + rest).encode('ascii')
    hex_to_sha(hex)
    return hex


def object_header(num_type, length):
    """Return an object header for the given numeric type and text length."""
    return (object_class(num_type).type_name +
            b' ' + str(length).encode('ascii') + b'\0')


def serializable_property(name, docstring=None):
    """A property that helps tracking whether serialization is necessary.
    """
    def set(obj, value):
        setattr(obj, "_"+name, value)
        obj._needs_serialization = True

    def get(obj):
        return getattr(obj, "_"+name)
    return property(get, set, doc=docstring)


def object_class(type):
    """Get the object class corresponding to the given type.

    :param type: Either a type name string or a numeric type.
    :return: The ShaFile subclass corresponding to the given type, or None if
        type is not a valid type name/number.
    """
    return _TYPE_MAP.get(type, None)


def check_hexsha(hex, error_msg):
    """Check if a string is a valid hex sha string.

    :param hex: Hex string to check
    :param error_msg: Error message to use in exception
    :raise ObjectFormatException: Raised when the string is not valid
    """
    if not valid_hexsha(hex):
        raise ObjectFormatException("%s %s" % (error_msg, hex))


def check_identity(identity, error_msg):
    """Check if the specified identity is valid.

    This will raise an exception if the identity is not valid.

    :param identity: Identity string
    :param error_msg: Error message to use in exception
    """
    email_start = identity.find(b'<')
    email_end = identity.find(b'>')
    if (email_start < 0 or email_end < 0 or email_end <= email_start
            or identity.find(b'<', email_start + 1) >= 0
            or identity.find(b'>', email_end + 1) >= 0
            or not identity.endswith(b'>')):
        raise ObjectFormatException(error_msg)


def check_time(time_seconds):
    """Check if the specified time is not prone to overflow error.

    This will raise an exception if the time is not valid.

    :param time_info: author/committer/tagger info

    """
    # Prevent overflow error
    if time_seconds > MAX_TIME:
        raise ObjectFormatException(
            'Date field should not exceed %s' % MAX_TIME)


def git_line(*items):
    """Formats items into a space separated line."""
    return b' '.join(items) + b'\n'


class FixedSha(object):
    """SHA object that behaves like hashlib's but is given a fixed value."""

    __slots__ = ('_hexsha', '_sha')

    def __init__(self, hexsha):
        if getattr(hexsha, 'encode', None) is not None:
            hexsha = hexsha.encode('ascii')
        if not isinstance(hexsha, bytes):
            raise TypeError('Expected bytes for hexsha, got %r' % hexsha)
        self._hexsha = hexsha
        self._sha = hex_to_sha(hexsha)

    def digest(self):
        """Return the raw SHA digest."""
        return self._sha

    def hexdigest(self):
        """Return the hex SHA digest."""
        return self._hexsha.decode('ascii')


class ShaFile(object):
    """A git SHA file."""

    __slots__ = ('_chunked_text', '_sha', '_needs_serialization')

    @staticmethod
    def _parse_legacy_object_header(magic, f):
        """Parse a legacy object, creating it but not reading the file."""
        bufsize = 1024
        decomp = zlib.decompressobj()
        header = decomp.decompress(magic)
        start = 0
        end = -1
        while end < 0:
            extra = f.read(bufsize)
            header += decomp.decompress(extra)
            magic += extra
            end = header.find(b'\0', start)
            start = len(header)
        header = header[:end]
        type_name, size = header.split(b' ', 1)
        size = int(size)  # sanity check
        obj_class = object_class(type_name)
        if not obj_class:
            raise ObjectFormatException("Not a known type: %s" % type_name)
        return obj_class()

    def _parse_legacy_object(self, map):
        """Parse a legacy object, setting the raw string."""
        text = _decompress(map)
        header_end = text.find(b'\0')
        if header_end < 0:
            raise ObjectFormatException("Invalid object header, no \\0")
        self.set_raw_string(text[header_end+1:])

    def as_legacy_object_chunks(self):
        """Return chunks representing the object in the experimental format.

        :return: List of strings
        """
        compobj = zlib.compressobj()
        yield compobj.compress(self._header())
        for chunk in self.as_raw_chunks():
            yield compobj.compress(chunk)
        yield compobj.flush()

    def as_legacy_object(self):
        """Return string representing the object in the experimental format.
        """
        return b''.join(self.as_legacy_object_chunks())

    def as_raw_chunks(self):
        """Return chunks with serialization of the object.

        :return: List of strings, not necessarily one per line
        """
        if self._needs_serialization:
            self._sha = None
            self._chunked_text = self._serialize()
            self._needs_serialization = False
        return self._chunked_text

    def as_raw_string(self):
        """Return raw string with serialization of the object.

        :return: String object
        """
        return b''.join(self.as_raw_chunks())

    if sys.version_info[0] >= 3:
        def __bytes__(self):
            """Return raw string serialization of this object."""
            return self.as_raw_string()
    else:
        def __str__(self):
            """Return raw string serialization of this object."""
            return self.as_raw_string()

    def __hash__(self):
        """Return unique hash for this object."""
        return hash(self.id)

    def as_pretty_string(self):
        """Return a string representing this object, fit for display."""
        return self.as_raw_string()

    def set_raw_string(self, text, sha=None):
        """Set the contents of this object from a serialized string."""
        if not isinstance(text, bytes):
            raise TypeError('Expected bytes for text, got %r' % text)
        self.set_raw_chunks([text], sha)

    def set_raw_chunks(self, chunks, sha=None):
        """Set the contents of this object from a list of chunks."""
        self._chunked_text = chunks
        self._deserialize(chunks)
        if sha is None:
            self._sha = None
        else:
            self._sha = FixedSha(sha)
        self._needs_serialization = False

    @staticmethod
    def _parse_object_header(magic, f):
        """Parse a new style object, creating it but not reading the file."""
Loading ...