Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / dulwich   python

Repository URL to install this package:

/ objects.py

# objects.py -- Access to base git objects
# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.

"""Access to base git objects."""

import binascii
from io import BytesIO
from collections import namedtuple
import os
import posixpath
import stat
import sys
import warnings
import zlib
from hashlib import sha1

from dulwich.errors import (
from dulwich.file import GitFile

ZERO_SHA = b'0' * 40

# Header fields for commits
_TREE_HEADER = b'tree'
_PARENT_HEADER = b'parent'
_AUTHOR_HEADER = b'author'
_COMMITTER_HEADER = b'committer'
_ENCODING_HEADER = b'encoding'
_MERGETAG_HEADER = b'mergetag'
_GPGSIG_HEADER = b'gpgsig'

# Header fields for objects
_OBJECT_HEADER = b'object'
_TYPE_HEADER = b'type'
_TAG_HEADER = b'tag'
_TAGGER_HEADER = b'tagger'

S_IFGITLINK = 0o160000

MAX_TIME = 9223372036854775807  # (2**63) - 1 - signed long int max


    """Check if a mode indicates a submodule.

    :param m: Mode to check
    :return: a ``boolean``
    return (stat.S_IFMT(m) == S_IFGITLINK)

def _decompress(string):
    dcomp = zlib.decompressobj()
    dcomped = dcomp.decompress(string)
    dcomped += dcomp.flush()
    return dcomped

def sha_to_hex(sha):
    """Takes a string and returns the hex of the sha within"""
    hexsha = binascii.hexlify(sha)
    assert len(hexsha) == 40, "Incorrect length of sha1 string: %d" % hexsha
    return hexsha

def hex_to_sha(hex):
    """Takes a hex sha and returns a binary sha"""
    assert len(hex) == 40, "Incorrect length of hexsha: %s" % hex
        return binascii.unhexlify(hex)
    except TypeError as exc:
        if not isinstance(hex, bytes):
        raise ValueError(exc.args[0])

def valid_hexsha(hex):
    if len(hex) != 40:
        return False
    except (TypeError, binascii.Error):
        return False
        return True

def hex_to_filename(path, hex):
    """Takes a hex sha and returns its filename relative to the given path."""
    # os.path.join accepts bytes or unicode, but all args must be of the same
    # type. Make sure that hex which is expected to be bytes, is the same type
    # as path.
    if getattr(path, 'encode', None) is not None:
        hex = hex.decode('ascii')
    dir = hex[:2]
    file = hex[2:]
    # Check from object dir
    return os.path.join(path, dir, file)

def filename_to_hex(filename):
    """Takes an object filename and returns its corresponding hex sha."""
    # grab the last (up to) two path components
    names = filename.rsplit(os.path.sep, 2)[-2:]
    errmsg = "Invalid object filename: %s" % filename
    assert len(names) == 2, errmsg
    base, rest = names
    assert len(base) == 2 and len(rest) == 38, errmsg
    hex = (base + rest).encode('ascii')
    return hex

def object_header(num_type, length):
    """Return an object header for the given numeric type and text length."""
    return (object_class(num_type).type_name +
            b' ' + str(length).encode('ascii') + b'\0')

def serializable_property(name, docstring=None):
    """A property that helps tracking whether serialization is necessary.
    def set(obj, value):
        setattr(obj, "_"+name, value)
        obj._needs_serialization = True

    def get(obj):
        return getattr(obj, "_"+name)
    return property(get, set, doc=docstring)

def object_class(type):
    """Get the object class corresponding to the given type.

    :param type: Either a type name string or a numeric type.
    :return: The ShaFile subclass corresponding to the given type, or None if
        type is not a valid type name/number.
    return _TYPE_MAP.get(type, None)

def check_hexsha(hex, error_msg):
    """Check if a string is a valid hex sha string.

    :param hex: Hex string to check
    :param error_msg: Error message to use in exception
    :raise ObjectFormatException: Raised when the string is not valid
    if not valid_hexsha(hex):
        raise ObjectFormatException("%s %s" % (error_msg, hex))

def check_identity(identity, error_msg):
    """Check if the specified identity is valid.

    This will raise an exception if the identity is not valid.

    :param identity: Identity string
    :param error_msg: Error message to use in exception
    email_start = identity.find(b'<')
    email_end = identity.find(b'>')
    if (email_start < 0 or email_end < 0 or email_end <= email_start
            or identity.find(b'<', email_start + 1) >= 0
            or identity.find(b'>', email_end + 1) >= 0
            or not identity.endswith(b'>')):
        raise ObjectFormatException(error_msg)

def check_time(time_seconds):
    """Check if the specified time is not prone to overflow error.

    This will raise an exception if the time is not valid.

    :param time_info: author/committer/tagger info

    # Prevent overflow error
    if time_seconds > MAX_TIME:
        raise ObjectFormatException(
            'Date field should not exceed %s' % MAX_TIME)

def git_line(*items):
    """Formats items into a space separated line."""
    return b' '.join(items) + b'\n'

class FixedSha(object):
    """SHA object that behaves like hashlib's but is given a fixed value."""

    __slots__ = ('_hexsha', '_sha')

    def __init__(self, hexsha):
        if getattr(hexsha, 'encode', None) is not None:
            hexsha = hexsha.encode('ascii')
        if not isinstance(hexsha, bytes):
            raise TypeError('Expected bytes for hexsha, got %r' % hexsha)
        self._hexsha = hexsha
        self._sha = hex_to_sha(hexsha)

    def digest(self):
        """Return the raw SHA digest."""
        return self._sha

    def hexdigest(self):
        """Return the hex SHA digest."""
        return self._hexsha.decode('ascii')

class ShaFile(object):
    """A git SHA file."""

    __slots__ = ('_chunked_text', '_sha', '_needs_serialization')

    def _parse_legacy_object_header(magic, f):
        """Parse a legacy object, creating it but not reading the file."""
        bufsize = 1024
        decomp = zlib.decompressobj()
        header = decomp.decompress(magic)
        start = 0
        end = -1
        while end < 0:
            extra = f.read(bufsize)
            header += decomp.decompress(extra)
            magic += extra
            end = header.find(b'\0', start)
            start = len(header)
        header = header[:end]
        type_name, size = header.split(b' ', 1)
        size = int(size)  # sanity check
        obj_class = object_class(type_name)
        if not obj_class:
            raise ObjectFormatException("Not a known type: %s" % type_name)
        return obj_class()

    def _parse_legacy_object(self, map):
        """Parse a legacy object, setting the raw string."""
        text = _decompress(map)
        header_end = text.find(b'\0')
        if header_end < 0:
            raise ObjectFormatException("Invalid object header, no \\0")

    def as_legacy_object_chunks(self):
        """Return chunks representing the object in the experimental format.

        :return: List of strings
        compobj = zlib.compressobj()
        yield compobj.compress(self._header())
        for chunk in self.as_raw_chunks():
            yield compobj.compress(chunk)
        yield compobj.flush()

    def as_legacy_object(self):
        """Return string representing the object in the experimental format.
        return b''.join(self.as_legacy_object_chunks())

    def as_raw_chunks(self):
        """Return chunks with serialization of the object.

        :return: List of strings, not necessarily one per line
        if self._needs_serialization:
            self._sha = None
            self._chunked_text = self._serialize()
            self._needs_serialization = False
        return self._chunked_text

    def as_raw_string(self):
        """Return raw string with serialization of the object.

        :return: String object
        return b''.join(self.as_raw_chunks())

    if sys.version_info[0] >= 3:
        def __bytes__(self):
            """Return raw string serialization of this object."""
            return self.as_raw_string()
        def __str__(self):
            """Return raw string serialization of this object."""
            return self.as_raw_string()

    def __hash__(self):
        """Return unique hash for this object."""
        return hash(self.id)

    def as_pretty_string(self):
        """Return a string representing this object, fit for display."""
        return self.as_raw_string()

    def set_raw_string(self, text, sha=None):
        """Set the contents of this object from a serialized string."""
        if not isinstance(text, bytes):
            raise TypeError('Expected bytes for text, got %r' % text)
        self.set_raw_chunks([text], sha)

    def set_raw_chunks(self, chunks, sha=None):
        """Set the contents of this object from a list of chunks."""
        self._chunked_text = chunks
        if sha is None:
            self._sha = None
            self._sha = FixedSha(sha)
        self._needs_serialization = False

    def _parse_object_header(magic, f):
        """Parse a new style object, creating it but not reading the file."""
Loading ...