Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / dulwich   python

Repository URL to install this package:

Version: 0.19.11 

/ tests / test_pack.py

# test_pack.py -- Tests for the handling of git packs.
# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
# Copyright (C) 2008 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#

"""Tests for Dulwich packs."""


from io import BytesIO
from hashlib import sha1
import os
import shutil
import tempfile
import zlib

from dulwich.errors import (
    ApplyDeltaError,
    ChecksumMismatch,
    )
from dulwich.file import (
    GitFile,
    )
from dulwich.object_store import (
    MemoryObjectStore,
    )
from dulwich.objects import (
    hex_to_sha,
    sha_to_hex,
    Commit,
    Tree,
    Blob,
    )
from dulwich.pack import (
    OFS_DELTA,
    REF_DELTA,
    MemoryPackIndex,
    Pack,
    PackData,
    apply_delta,
    create_delta,
    deltify_pack_objects,
    load_pack_index,
    UnpackedObject,
    read_zlib_chunks,
    write_pack_header,
    write_pack_index_v1,
    write_pack_index_v2,
    write_pack_object,
    write_pack,
    unpack_object,
    compute_file_sha,
    PackStreamReader,
    DeltaChainIterator,
    _delta_encode_size,
    _encode_copy_operation,
    )
from dulwich.tests import (
    TestCase,
    )
from dulwich.tests.utils import (
    make_object,
    build_pack,
    )

pack1_sha = b'bc63ddad95e7321ee734ea11a7a62d314e0d7481'

a_sha = b'6f670c0fb53f9463760b7295fbb814e965fb20c8'
tree_sha = b'b2a2766a2879c209ab1176e7e778b81ae422eeaa'
commit_sha = b'f18faa16531ac570a3fdc8c7ca16682548dafd12'


class PackTests(TestCase):
    """Base class for testing packs"""

    def setUp(self):
        super(PackTests, self).setUp()
        self.tempdir = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, self.tempdir)

    datadir = os.path.abspath(
            os.path.join(os.path.dirname(__file__), 'data/packs'))

    def get_pack_index(self, sha):
        """Returns a PackIndex from the datadir with the given sha"""
        return load_pack_index(
                os.path.join(self.datadir,
                             'pack-%s.idx' % sha.decode('ascii')))

    def get_pack_data(self, sha):
        """Returns a PackData object from the datadir with the given sha"""
        return PackData(
                os.path.join(
                        self.datadir, 'pack-%s.pack' % sha.decode('ascii')))

    def get_pack(self, sha):
        return Pack(
                os.path.join(self.datadir, 'pack-%s' % sha.decode('ascii')))

    def assertSucceeds(self, func, *args, **kwargs):
        try:
            func(*args, **kwargs)
        except ChecksumMismatch as e:
            self.fail(e)


class PackIndexTests(PackTests):
    """Class that tests the index of packfiles"""

    def test_object_index(self):
        """Tests that the correct object offset is returned from the index."""
        p = self.get_pack_index(pack1_sha)
        self.assertRaises(KeyError, p.object_index, pack1_sha)
        self.assertEqual(p.object_index(a_sha), 178)
        self.assertEqual(p.object_index(tree_sha), 138)
        self.assertEqual(p.object_index(commit_sha), 12)

    def test_object_sha1(self):
        """Tests that the correct object offset is returned from the index."""
        p = self.get_pack_index(pack1_sha)
        self.assertRaises(KeyError, p.object_sha1, 876)
        self.assertEqual(p.object_sha1(178), hex_to_sha(a_sha))
        self.assertEqual(p.object_sha1(138), hex_to_sha(tree_sha))
        self.assertEqual(p.object_sha1(12), hex_to_sha(commit_sha))

    def test_index_len(self):
        p = self.get_pack_index(pack1_sha)
        self.assertEqual(3, len(p))

    def test_get_stored_checksum(self):
        p = self.get_pack_index(pack1_sha)
        self.assertEqual(b'f2848e2ad16f329ae1c92e3b95e91888daa5bd01',
                         sha_to_hex(p.get_stored_checksum()))
        self.assertEqual(b'721980e866af9a5f93ad674144e1459b8ba3e7b7',
                         sha_to_hex(p.get_pack_checksum()))

    def test_index_check(self):
        p = self.get_pack_index(pack1_sha)
        self.assertSucceeds(p.check)

    def test_iterentries(self):
        p = self.get_pack_index(pack1_sha)
        entries = [(sha_to_hex(s), o, c) for s, o, c in p.iterentries()]
        self.assertEqual([
            (b'6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, None),
            (b'b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, None),
            (b'f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, None)
        ], entries)

    def test_iter(self):
        p = self.get_pack_index(pack1_sha)
        self.assertEqual(set([tree_sha, commit_sha, a_sha]), set(p))


class TestPackDeltas(TestCase):

    test_string1 = b'The answer was flailing in the wind'
    test_string2 = b'The answer was falling down the pipe'
    test_string3 = b'zzzzz'

    test_string_empty = b''
    test_string_big = b'Z' * 8192
    test_string_huge = b'Z' * 100000

    def _test_roundtrip(self, base, target):
        self.assertEqual(
                target,
                b''.join(apply_delta(base, create_delta(base, target))))

    def test_nochange(self):
        self._test_roundtrip(self.test_string1, self.test_string1)

    def test_nochange_huge(self):
        self._test_roundtrip(self.test_string_huge, self.test_string_huge)

    def test_change(self):
        self._test_roundtrip(self.test_string1, self.test_string2)

    def test_rewrite(self):
        self._test_roundtrip(self.test_string1, self.test_string3)

    def test_empty_to_big(self):
        self._test_roundtrip(self.test_string_empty, self.test_string_big)

    def test_empty_to_huge(self):
        self._test_roundtrip(self.test_string_empty, self.test_string_huge)

    def test_huge_copy(self):
        self._test_roundtrip(self.test_string_huge + self.test_string1,
                             self.test_string_huge + self.test_string2)

    def test_dest_overflow(self):
        self.assertRaises(ApplyDeltaError, apply_delta,
                          b'a'*0x10000, b'\x80\x80\x04\x80\x80\x04\x80' +
                          b'a'*0x10000)
        self.assertRaises(
            ApplyDeltaError,
            apply_delta, b'', b'\x00\x80\x02\xb0\x11\x11')

    def test_pypy_issue(self):
        # Test for https://github.com/jelmer/dulwich/issues/509 /
        # https://bitbucket.org/pypy/pypy/issues/2499/cpyext-pystring_asstring-doesnt-work
        chunks = [
            b'tree 03207ccf58880a748188836155ceed72f03d65d6\n'
            b'parent 408fbab530fd4abe49249a636a10f10f44d07a21\n'
            b'author Victor Stinner <victor.stinner@gmail.com> '
            b'1421355207 +0100\n'
            b'committer Victor Stinner <victor.stinner@gmail.com> '
            b'1421355207 +0100\n'
            b'\n'
            b'Backout changeset 3a06020af8cf\n'
            b'\nStreamWriter: close() now clears the reference to the '
            b'transport\n'
            b'\nStreamWriter now raises an exception if it is closed: '
            b'write(), writelines(),\n'
            b'write_eof(), can_write_eof(), get_extra_info(), drain().\n']
        delta = [
            b'\xcd\x03\xad\x03]tree ff3c181a393d5a7270cddc01ea863818a8621ca8\n'
            b'parent 20a103cc90135494162e819f98d0edfc1f1fba6b\x91]7\x0510738'
            b'\x91\x99@\x0b10738 +0100\x93\x04\x01\xc9']
        res = apply_delta(chunks, delta)
        expected = [
            b'tree ff3c181a393d5a7270cddc01ea863818a8621ca8\n'
            b'parent 20a103cc90135494162e819f98d0edfc1f1fba6b',
            b'\nauthor Victor Stinner <victor.stinner@gmail.com> 14213',
            b'10738',
            b' +0100\ncommitter Victor Stinner <victor.stinner@gmail.com> '
            b'14213',
            b'10738 +0100',
            b'\n\nStreamWriter: close() now clears the reference to the '
            b'transport\n\n'
            b'StreamWriter now raises an exception if it is closed: '
            b'write(), writelines(),\n'
            b'write_eof(), can_write_eof(), get_extra_info(), drain().\n']
        self.assertEqual(b''.join(expected), b''.join(res))


class TestPackData(PackTests):
    """Tests getting the data from the packfile."""

    def test_create_pack(self):
        self.get_pack_data(pack1_sha).close()

    def test_from_file(self):
        path = os.path.join(self.datadir,
                            'pack-%s.pack' % pack1_sha.decode('ascii'))
        with open(path, 'rb') as f:
            PackData.from_file(f, os.path.getsize(path))

    def test_pack_len(self):
        with self.get_pack_data(pack1_sha) as p:
            self.assertEqual(3, len(p))

    def test_index_check(self):
        with self.get_pack_data(pack1_sha) as p:
            self.assertSucceeds(p.check)

    def test_iterobjects(self):
        with self.get_pack_data(pack1_sha) as p:
            commit_data = (
                    b'tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n'
                    b'author James Westby <jw+debian@jameswestby.net> '
                    b'1174945067 +0100\n'
                    b'committer James Westby <jw+debian@jameswestby.net> '
                    b'1174945067 +0100\n'
                    b'\n'
                    b'Test commit\n')
            blob_sha = b'6f670c0fb53f9463760b7295fbb814e965fb20c8'
            tree_data = b'100644 a\0' + hex_to_sha(blob_sha)
            actual = []
            for offset, type_num, chunks, crc32 in p.iterobjects():
                actual.append((offset, type_num, b''.join(chunks), crc32))
            self.assertEqual([
                (12, 1, commit_data, 3775879613),
                (138, 2, tree_data, 912998690),
                (178, 3, b'test 1\n', 1373561701)
                ], actual)

    def test_iterentries(self):
        with self.get_pack_data(pack1_sha) as p:
            entries = set((sha_to_hex(s), o, c) for s, o, c in p.iterentries())
            self.assertEqual(set([
              (b'6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, 1373561701),
              (b'b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, 912998690),
              (b'f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, 3775879613),
              ]), entries)

    def test_create_index_v1(self):
        with self.get_pack_data(pack1_sha) as p:
            filename = os.path.join(self.tempdir, 'v1test.idx')
            p.create_index_v1(filename)
            idx1 = load_pack_index(filename)
            idx2 = self.get_pack_index(pack1_sha)
            self.assertEqual(idx1, idx2)

    def test_create_index_v2(self):
        with self.get_pack_data(pack1_sha) as p:
            filename = os.path.join(self.tempdir, 'v2test.idx')
            p.create_index_v2(filename)
            idx1 = load_pack_index(filename)
            idx2 = self.get_pack_index(pack1_sha)
            self.assertEqual(idx1, idx2)

    def test_compute_file_sha(self):
        f = BytesIO(b'abcd1234wxyz')
        self.assertEqual(sha1(b'abcd1234wxyz').hexdigest(),
                         compute_file_sha(f).hexdigest())
        self.assertEqual(sha1(b'abcd1234wxyz').hexdigest(),
                         compute_file_sha(f, buffer_size=5).hexdigest())
        self.assertEqual(sha1(b'abcd1234').hexdigest(),
                         compute_file_sha(f, end_ofs=-4).hexdigest())
        self.assertEqual(sha1(b'1234wxyz').hexdigest(),
                         compute_file_sha(f, start_ofs=4).hexdigest())
        self.assertEqual(
            sha1(b'1234').hexdigest(),
            compute_file_sha(f, start_ofs=4, end_ofs=-4).hexdigest())

    def test_compute_file_sha_short_file(self):
        f = BytesIO(b'abcd1234wxyz')
        self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=-20)
        self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=20)
        self.assertRaises(AssertionError, compute_file_sha, f, start_ofs=10,
                          end_ofs=-12)


class TestPack(PackTests):

    def test_len(self):
        with self.get_pack(pack1_sha) as p:
            self.assertEqual(3, len(p))

    def test_contains(self):
        with self.get_pack(pack1_sha) as p:
            self.assertTrue(tree_sha in p)

    def test_get(self):
        with self.get_pack(pack1_sha) as p:
            self.assertEqual(type(p[tree_sha]), Tree)

    def test_iter(self):
        with self.get_pack(pack1_sha) as p:
            self.assertEqual(set([tree_sha, commit_sha, a_sha]), set(p))

    def test_iterobjects(self):
        with self.get_pack(pack1_sha) as p:
            expected = set([p[s] for s in [commit_sha, tree_sha, a_sha]])
            self.assertEqual(expected, set(list(p.iterobjects())))

    def test_pack_tuples(self):
        with self.get_pack(pack1_sha) as p:
            tuples = p.pack_tuples()
            expected = set(
                    [(p[s], None) for s in [commit_sha, tree_sha, a_sha]])
            self.assertEqual(expected, set(list(tuples)))
            self.assertEqual(expected, set(list(tuples)))
            self.assertEqual(3, len(tuples))

    def test_get_object_at(self):
        """Tests random access for non-delta objects"""
        with self.get_pack(pack1_sha) as p:
            obj = p[a_sha]
            self.assertEqual(obj.type_name, b'blob')
            self.assertEqual(obj.sha().hexdigest().encode('ascii'), a_sha)
            obj = p[tree_sha]
            self.assertEqual(obj.type_name, b'tree')
            self.assertEqual(obj.sha().hexdigest().encode('ascii'), tree_sha)
            obj = p[commit_sha]
            self.assertEqual(obj.type_name, b'commit')
            self.assertEqual(obj.sha().hexdigest().encode('ascii'), commit_sha)

    def test_copy(self):
        with self.get_pack(pack1_sha) as origpack:
            self.assertSucceeds(origpack.index.check)
            basename = os.path.join(self.tempdir, 'Elch')
            write_pack(basename, origpack.pack_tuples())

            with Pack(basename) as newpack:
                self.assertEqual(origpack, newpack)
                self.assertSucceeds(newpack.index.check)
                self.assertEqual(origpack.name(), newpack.name())
                self.assertEqual(origpack.index.get_pack_checksum(),
                                 newpack.index.get_pack_checksum())

                wrong_version = origpack.index.version != newpack.index.version
                orig_checksum = origpack.index.get_stored_checksum()
                new_checksum = newpack.index.get_stored_checksum()
                self.assertTrue(wrong_version or orig_checksum == new_checksum)

    def test_commit_obj(self):
        with self.get_pack(pack1_sha) as p:
            commit = p[commit_sha]
            self.assertEqual(b'James Westby <jw+debian@jameswestby.net>',
                             commit.author)
            self.assertEqual([], commit.parents)

    def _copy_pack(self, origpack):
        basename = os.path.join(self.tempdir, 'somepack')
        write_pack(basename, origpack.pack_tuples())
        return Pack(basename)

    def test_keep_no_message(self):
        with self.get_pack(pack1_sha) as p:
            p = self._copy_pack(p)

        with p:
            keepfile_name = p.keep()

        # file should exist
        self.assertTrue(os.path.exists(keepfile_name))

        with open(keepfile_name, 'r') as f:
            buf = f.read()
            self.assertEqual('', buf)

    def test_keep_message(self):
        with self.get_pack(pack1_sha) as p:
            p = self._copy_pack(p)

        msg = b'some message'
        with p:
            keepfile_name = p.keep(msg)

        # file should exist
        self.assertTrue(os.path.exists(keepfile_name))

        # and contain the right message, with a linefeed
        with open(keepfile_name, 'rb') as f:
            buf = f.read()
            self.assertEqual(msg + b'\n', buf)

    def test_name(self):
        with self.get_pack(pack1_sha) as p:
            self.assertEqual(pack1_sha, p.name())

    def test_length_mismatch(self):
        with self.get_pack_data(pack1_sha) as data:
            index = self.get_pack_index(pack1_sha)
            Pack.from_objects(data, index).check_length_and_checksum()

            data._file.seek(12)
            bad_file = BytesIO()
            write_pack_header(bad_file, 9999)
            bad_file.write(data._file.read())
            bad_file = BytesIO(bad_file.getvalue())
            bad_data = PackData('', file=bad_file)
            bad_pack = Pack.from_lazy_objects(lambda: bad_data, lambda: index)
            self.assertRaises(AssertionError, lambda: bad_pack.data)
            self.assertRaises(AssertionError,
                              lambda: bad_pack.check_length_and_checksum())

    def test_checksum_mismatch(self):
        with self.get_pack_data(pack1_sha) as data:
            index = self.get_pack_index(pack1_sha)
            Pack.from_objects(data, index).check_length_and_checksum()

            data._file.seek(0)
            bad_file = BytesIO(data._file.read()[:-20] + (b'\xff' * 20))
            bad_data = PackData('', file=bad_file)
            bad_pack = Pack.from_lazy_objects(lambda: bad_data, lambda: index)
            self.assertRaises(ChecksumMismatch, lambda: bad_pack.data)
            self.assertRaises(ChecksumMismatch, lambda:
                              bad_pack.check_length_and_checksum())

    def test_iterobjects_2(self):
        with self.get_pack(pack1_sha) as p:
            objs = dict((o.id, o) for o in p.iterobjects())
            self.assertEqual(3, len(objs))
            self.assertEqual(sorted(objs), sorted(p.index))
            self.assertTrue(isinstance(objs[a_sha], Blob))
            self.assertTrue(isinstance(objs[tree_sha], Tree))
            self.assertTrue(isinstance(objs[commit_sha], Commit))


class TestThinPack(PackTests):

    def setUp(self):
        super(TestThinPack, self).setUp()
        self.store = MemoryObjectStore()
        self.blobs = {}
        for blob in (b'foo', b'bar', b'foo1234', b'bar2468'):
            self.blobs[blob] = make_object(Blob, data=blob)
        self.store.add_object(self.blobs[b'foo'])
        self.store.add_object(self.blobs[b'bar'])

        # Build a thin pack. 'foo' is as an external reference, 'bar' an
        # internal reference.
        self.pack_dir = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, self.pack_dir)
        self.pack_prefix = os.path.join(self.pack_dir, 'pack')

        with open(self.pack_prefix + '.pack', 'wb') as f:
            build_pack(f, [
                (REF_DELTA, (self.blobs[b'foo'].id, b'foo1234')),
                (Blob.type_num, b'bar'),
                (REF_DELTA, (self.blobs[b'bar'].id, b'bar2468'))],
                store=self.store)

        # Index the new pack.
        with self.make_pack(True) as pack:
            with PackData(pack._data_path) as data:
                data.pack = pack
                data.create_index(self.pack_prefix + '.idx')

        del self.store[self.blobs[b'bar'].id]

    def make_pack(self, resolve_ext_ref):
        return Pack(
            self.pack_prefix,
            resolve_ext_ref=self.store.get_raw if resolve_ext_ref else None)

    def test_get_raw(self):
        with self.make_pack(False) as p:
            self.assertRaises(
                KeyError, p.get_raw, self.blobs[b'foo1234'].id)
        with self.make_pack(True) as p:
            self.assertEqual(
                (3, b'foo1234'),
                p.get_raw(self.blobs[b'foo1234'].id))

    def test_get_raw_unresolved(self):
        with self.make_pack(False) as p:
            self.assertEqual(
                (7,
                 b'\x19\x10(\x15f=#\xf8\xb7ZG\xe7\xa0\x19e\xdc\xdc\x96F\x8c',
                 [b'x\x9ccf\x9f\xc0\xccbhdl\x02\x00\x06f\x01l']),
                p.get_raw_unresolved(self.blobs[b'foo1234'].id))
        with self.make_pack(True) as p:
            self.assertEqual(
                (7,
                 b'\x19\x10(\x15f=#\xf8\xb7ZG\xe7\xa0\x19e\xdc\xdc\x96F\x8c',
                 [b'x\x9ccf\x9f\xc0\xccbhdl\x02\x00\x06f\x01l']),
                p.get_raw_unresolved(self.blobs[b'foo1234'].id))

    def test_iterobjects(self):
        with self.make_pack(False) as p:
            self.assertRaises(KeyError, list, p.iterobjects())
        with self.make_pack(True) as p:
            self.assertEqual(
                sorted([self.blobs[b'foo1234'].id, self.blobs[b'bar'].id,
                        self.blobs[b'bar2468'].id]),
                sorted(o.id for o in p.iterobjects()))


class WritePackTests(TestCase):

    def test_write_pack_header(self):
        f = BytesIO()
        write_pack_header(f, 42)
        self.assertEqual(b'PACK\x00\x00\x00\x02\x00\x00\x00*',
                         f.getvalue())

    def test_write_pack_object(self):
        f = BytesIO()
        f.write(b'header')
        offset = f.tell()
        crc32 = write_pack_object(f, Blob.type_num, b'blob')
        self.assertEqual(crc32, zlib.crc32(f.getvalue()[6:]) & 0xffffffff)

        f.write(b'x')  # unpack_object needs extra trailing data.
        f.seek(offset)
        unpacked, unused = unpack_object(f.read, compute_crc32=True)
        self.assertEqual(Blob.type_num, unpacked.pack_type_num)
        self.assertEqual(Blob.type_num, unpacked.obj_type_num)
        self.assertEqual([b'blob'], unpacked.decomp_chunks)
        self.assertEqual(crc32, unpacked.crc32)
        self.assertEqual(b'x', unused)

    def test_write_pack_object_sha(self):
        f = BytesIO()
        f.write(b'header')
        offset = f.tell()
        sha_a = sha1(b'foo')
        sha_b = sha_a.copy()
        write_pack_object(f, Blob.type_num, b'blob', sha=sha_a)
        self.assertNotEqual(sha_a.digest(), sha_b.digest())
        sha_b.update(f.getvalue()[offset:])
        self.assertEqual(sha_a.digest(), sha_b.digest())


pack_checksum = hex_to_sha('721980e866af9a5f93ad674144e1459b8ba3e7b7')


class BaseTestPackIndexWriting(object):

    def assertSucceeds(self, func, *args, **kwargs):
        try:
            func(*args, **kwargs)
        except ChecksumMismatch as e:
            self.fail(e)

    def index(self, filename, entries, pack_checksum):
        raise NotImplementedError(self.index)

    def test_empty(self):
        idx = self.index('empty.idx', [], pack_checksum)
        self.assertEqual(idx.get_pack_checksum(), pack_checksum)
        self.assertEqual(0, len(idx))

    def test_large(self):
        entry1_sha = hex_to_sha('4e6388232ec39792661e2e75db8fb117fc869ce6')
        entry2_sha = hex_to_sha('e98f071751bd77f59967bfa671cd2caebdccc9a2')
        entries = [(entry1_sha, 0xf2972d0830529b87, 24),
                   (entry2_sha, (~0xf2972d0830529b87) & (2 ** 64 - 1), 92)]
        if not self._supports_large:
            self.assertRaises(TypeError, self.index, 'single.idx',
                              entries, pack_checksum)
            return
        idx = self.index('single.idx', entries, pack_checksum)
        self.assertEqual(idx.get_pack_checksum(), pack_checksum)
        self.assertEqual(2, len(idx))
        actual_entries = list(idx.iterentries())
        self.assertEqual(len(entries), len(actual_entries))
        for mine, actual in zip(entries, actual_entries):
            my_sha, my_offset, my_crc = mine
            actual_sha, actual_offset, actual_crc = actual
            self.assertEqual(my_sha, actual_sha)
            self.assertEqual(my_offset, actual_offset)
            if self._has_crc32_checksum:
                self.assertEqual(my_crc, actual_crc)
            else:
                self.assertTrue(actual_crc is None)

    def test_single(self):
        entry_sha = hex_to_sha('6f670c0fb53f9463760b7295fbb814e965fb20c8')
        my_entries = [(entry_sha, 178, 42)]
        idx = self.index('single.idx', my_entries, pack_checksum)
        self.assertEqual(idx.get_pack_checksum(), pack_checksum)
        self.assertEqual(1, len(idx))
        actual_entries = list(idx.iterentries())
        self.assertEqual(len(my_entries), len(actual_entries))
        for mine, actual in zip(my_entries, actual_entries):
            my_sha, my_offset, my_crc = mine
            actual_sha, actual_offset, actual_crc = actual
            self.assertEqual(my_sha, actual_sha)
            self.assertEqual(my_offset, actual_offset)
            if self._has_crc32_checksum:
                self.assertEqual(my_crc, actual_crc)
            else:
                self.assertTrue(actual_crc is None)


class BaseTestFilePackIndexWriting(BaseTestPackIndexWriting):

    def setUp(self):
        self.tempdir = tempfile.mkdtemp()

    def tearDown(self):
        shutil.rmtree(self.tempdir)

    def index(self, filename, entries, pack_checksum):
        path = os.path.join(self.tempdir, filename)
        self.writeIndex(path, entries, pack_checksum)
        idx = load_pack_index(path)
        self.assertSucceeds(idx.check)
        self.assertEqual(idx.version, self._expected_version)
        return idx

    def writeIndex(self, filename, entries, pack_checksum):
        # FIXME: Write to BytesIO instead rather than hitting disk ?
        with GitFile(filename, "wb") as f:
            self._write_fn(f, entries, pack_checksum)


class TestMemoryIndexWriting(TestCase, BaseTestPackIndexWriting):

    def setUp(self):
        TestCase.setUp(self)
        self._has_crc32_checksum = True
        self._supports_large = True

    def index(self, filename, entries, pack_checksum):
        return MemoryPackIndex(entries, pack_checksum)

    def tearDown(self):
        TestCase.tearDown(self)


class TestPackIndexWritingv1(TestCase, BaseTestFilePackIndexWriting):

    def setUp(self):
        TestCase.setUp(self)
        BaseTestFilePackIndexWriting.setUp(self)
        self._has_crc32_checksum = False
        self._expected_version = 1
        self._supports_large = False
        self._write_fn = write_pack_index_v1

    def tearDown(self):
        TestCase.tearDown(self)
        BaseTestFilePackIndexWriting.tearDown(self)


class TestPackIndexWritingv2(TestCase, BaseTestFilePackIndexWriting):

    def setUp(self):
        TestCase.setUp(self)
        BaseTestFilePackIndexWriting.setUp(self)
        self._has_crc32_checksum = True
        self._supports_large = True
        self._expected_version = 2
        self._write_fn = write_pack_index_v2

    def tearDown(self):
        TestCase.tearDown(self)
        BaseTestFilePackIndexWriting.tearDown(self)


class ReadZlibTests(TestCase):

    decomp = (
      b'tree 4ada885c9196b6b6fa08744b5862bf92896fc002\n'
      b'parent None\n'
      b'author Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
      b'committer Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
      b'\n'
      b"Provide replacement for mmap()'s offset argument.")
    comp = zlib.compress(decomp)
    extra = b'nextobject'

    def setUp(self):
        super(ReadZlibTests, self).setUp()
        self.read = BytesIO(self.comp + self.extra).read
        self.unpacked = UnpackedObject(
                Tree.type_num, None, len(self.decomp), 0)

    def test_decompress_size(self):
        good_decomp_len = len(self.decomp)
        self.unpacked.decomp_len = -1
        self.assertRaises(ValueError, read_zlib_chunks, self.read,
                          self.unpacked)
        self.unpacked.decomp_len = good_decomp_len - 1
        self.assertRaises(zlib.error, read_zlib_chunks, self.read,
                          self.unpacked)
        self.unpacked.decomp_len = good_decomp_len + 1
        self.assertRaises(zlib.error, read_zlib_chunks, self.read,
                          self.unpacked)

    def test_decompress_truncated(self):
        read = BytesIO(self.comp[:10]).read
        self.assertRaises(zlib.error, read_zlib_chunks, read, self.unpacked)

        read = BytesIO(self.comp).read
        self.assertRaises(zlib.error, read_zlib_chunks, read, self.unpacked)

    def test_decompress_empty(self):
        unpacked = UnpackedObject(Tree.type_num, None, 0, None)
        comp = zlib.compress(b'')
        read = BytesIO(comp + self.extra).read
        unused = read_zlib_chunks(read, unpacked)
        self.assertEqual(b'', b''.join(unpacked.decomp_chunks))
        self.assertNotEqual(b'', unused)
        self.assertEqual(self.extra, unused + read())

    def test_decompress_no_crc32(self):
        self.unpacked.crc32 = None
        read_zlib_chunks(self.read, self.unpacked)
        self.assertEqual(None, self.unpacked.crc32)

    def _do_decompress_test(self, buffer_size, **kwargs):
        unused = read_zlib_chunks(self.read, self.unpacked,
                                  buffer_size=buffer_size, **kwargs)
        self.assertEqual(self.decomp, b''.join(self.unpacked.decomp_chunks))
        self.assertEqual(zlib.crc32(self.comp), self.unpacked.crc32)
        self.assertNotEqual(b'', unused)
        self.assertEqual(self.extra, unused + self.read())

    def test_simple_decompress(self):
        self._do_decompress_test(4096)
        self.assertEqual(None, self.unpacked.comp_chunks)

    # These buffer sizes are not intended to be realistic, but rather simulate
    # larger buffer sizes that may end at various places.
    def test_decompress_buffer_size_1(self):
        self._do_decompress_test(1)

    def test_decompress_buffer_size_2(self):
        self._do_decompress_test(2)

    def test_decompress_buffer_size_3(self):
        self._do_decompress_test(3)

    def test_decompress_buffer_size_4(self):
        self._do_decompress_test(4)

    def test_decompress_include_comp(self):
        self._do_decompress_test(4096, include_comp=True)
        self.assertEqual(self.comp, b''.join(self.unpacked.comp_chunks))


class DeltifyTests(TestCase):

    def test_empty(self):
        self.assertEqual([], list(deltify_pack_objects([])))

    def test_single(self):
        b = Blob.from_string(b"foo")
        self.assertEqual(
            [(b.type_num, b.sha().digest(), None, b.as_raw_string())],
            list(deltify_pack_objects([(b, b"")])))

    def test_simple_delta(self):
        b1 = Blob.from_string(b"a" * 101)
        b2 = Blob.from_string(b"a" * 100)
        delta = create_delta(b1.as_raw_string(), b2.as_raw_string())
        self.assertEqual([
            (b1.type_num, b1.sha().digest(), None, b1.as_raw_string()),
            (b2.type_num, b2.sha().digest(), b1.sha().digest(), delta)
            ],
            list(deltify_pack_objects([(b1, b""), (b2, b"")])))


class TestPackStreamReader(TestCase):

    def test_read_objects_emtpy(self):
        f = BytesIO()
        build_pack(f, [])
        reader = PackStreamReader(f.read)
        self.assertEqual(0, len(list(reader.read_objects())))

    def test_read_objects(self):
        f = BytesIO()
        entries = build_pack(f, [
            (Blob.type_num, b'blob'),
            (OFS_DELTA, (0, b'blob1')),
        ])
        reader = PackStreamReader(f.read)
        objects = list(reader.read_objects(compute_crc32=True))
        self.assertEqual(2, len(objects))

        unpacked_blob, unpacked_delta = objects

        self.assertEqual(entries[0][0], unpacked_blob.offset)
        self.assertEqual(Blob.type_num, unpacked_blob.pack_type_num)
        self.assertEqual(Blob.type_num, unpacked_blob.obj_type_num)
        self.assertEqual(None, unpacked_blob.delta_base)
        self.assertEqual(b'blob', b''.join(unpacked_blob.decomp_chunks))
        self.assertEqual(entries[0][4], unpacked_blob.crc32)

        self.assertEqual(entries[1][0], unpacked_delta.offset)
        self.assertEqual(OFS_DELTA, unpacked_delta.pack_type_num)
        self.assertEqual(None, unpacked_delta.obj_type_num)
        self.assertEqual(unpacked_delta.offset - unpacked_blob.offset,
                         unpacked_delta.delta_base)
        delta = create_delta(b'blob', b'blob1')
        self.assertEqual(delta, b''.join(unpacked_delta.decomp_chunks))
        self.assertEqual(entries[1][4], unpacked_delta.crc32)

    def test_read_objects_buffered(self):
        f = BytesIO()
        build_pack(f, [
            (Blob.type_num, b'blob'),
            (OFS_DELTA, (0, b'blob1')),
        ])
        reader = PackStreamReader(f.read, zlib_bufsize=4)
        self.assertEqual(2, len(list(reader.read_objects())))

    def test_read_objects_empty(self):
        reader = PackStreamReader(BytesIO().read)
        self.assertEqual([], list(reader.read_objects()))


class TestPackIterator(DeltaChainIterator):

    _compute_crc32 = True

    def __init__(self, *args, **kwargs):
        super(TestPackIterator, self).__init__(*args, **kwargs)
        self._unpacked_offsets = set()

    def _result(self, unpacked):
        """Return entries in the same format as build_pack."""
        return (unpacked.offset, unpacked.obj_type_num,
                b''.join(unpacked.obj_chunks), unpacked.sha(), unpacked.crc32)

    def _resolve_object(self, offset, pack_type_num, base_chunks):
        assert offset not in self._unpacked_offsets, (
                'Attempted to re-inflate offset %i' % offset)
        self._unpacked_offsets.add(offset)
        return super(TestPackIterator, self)._resolve_object(
          offset, pack_type_num, base_chunks)


class DeltaChainIteratorTests(TestCase):

    def setUp(self):
        super(DeltaChainIteratorTests, self).setUp()
        self.store = MemoryObjectStore()
        self.fetched = set()

    def store_blobs(self, blobs_data):
        blobs = []
        for data in blobs_data:
            blob = make_object(Blob, data=data)
            blobs.append(blob)
            self.store.add_object(blob)
        return blobs

    def get_raw_no_repeat(self, bin_sha):
        """Wrapper around store.get_raw that doesn't allow repeat lookups."""
        hex_sha = sha_to_hex(bin_sha)
        self.assertFalse(hex_sha in self.fetched,
                         'Attempted to re-fetch object %s' % hex_sha)
        self.fetched.add(hex_sha)
        return self.store.get_raw(hex_sha)

    def make_pack_iter(self, f, thin=None):
        if thin is None:
            thin = bool(list(self.store))
        resolve_ext_ref = thin and self.get_raw_no_repeat or None
        data = PackData('test.pack', file=f)
        return TestPackIterator.for_pack_data(
          data, resolve_ext_ref=resolve_ext_ref)

    def assertEntriesMatch(self, expected_indexes, entries, pack_iter):
        expected = [entries[i] for i in expected_indexes]
        self.assertEqual(expected, list(pack_iter._walk_all_chains()))

    def test_no_deltas(self):
        f = BytesIO()
        entries = build_pack(f, [
            (Commit.type_num, b'commit'),
            (Blob.type_num, b'blob'),
            (Tree.type_num, b'tree'),
        ])
        self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))

    def test_ofs_deltas(self):
        f = BytesIO()
        entries = build_pack(f, [
            (Blob.type_num, b'blob'),
            (OFS_DELTA, (0, b'blob1')),
            (OFS_DELTA, (0, b'blob2')),
        ])
        self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))

    def test_ofs_deltas_chain(self):
        f = BytesIO()
        entries = build_pack(f, [
            (Blob.type_num, b'blob'),
            (OFS_DELTA, (0, b'blob1')),
            (OFS_DELTA, (1, b'blob2')),
        ])
        self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))

    def test_ref_deltas(self):
        f = BytesIO()
        entries = build_pack(f, [
            (REF_DELTA, (1, b'blob1')),
            (Blob.type_num, (b'blob')),
            (REF_DELTA, (1, b'blob2')),
        ])
        self.assertEntriesMatch([1, 0, 2], entries, self.make_pack_iter(f))

    def test_ref_deltas_chain(self):
        f = BytesIO()
        entries = build_pack(f, [
            (REF_DELTA, (2, b'blob1')),
            (Blob.type_num, (b'blob')),
            (REF_DELTA, (1, b'blob2')),
        ])
        self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f))

    def test_ofs_and_ref_deltas(self):
        # Deltas pending on this offset are popped before deltas depending on
        # this ref.
        f = BytesIO()
        entries = build_pack(f, [
            (REF_DELTA, (1, b'blob1')),
            (Blob.type_num, (b'blob')),
            (OFS_DELTA, (1, b'blob2')),
        ])
        self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f))

    def test_mixed_chain(self):
        f = BytesIO()
        entries = build_pack(f, [
            (Blob.type_num, b'blob'),
            (REF_DELTA, (2, b'blob2')),
            (OFS_DELTA, (0, b'blob1')),
            (OFS_DELTA, (1, b'blob3')),
            (OFS_DELTA, (0, b'bob')),
        ])
        self.assertEntriesMatch([0, 2, 4, 1, 3], entries,
                                self.make_pack_iter(f))

    def test_long_chain(self):
        n = 100
        objects_spec = [(Blob.type_num, b'blob')]
        for i in range(n):
            objects_spec.append(
                    (OFS_DELTA, (i, b'blob' + str(i).encode('ascii'))))
        f = BytesIO()
        entries = build_pack(f, objects_spec)
        self.assertEntriesMatch(range(n + 1), entries, self.make_pack_iter(f))

    def test_branchy_chain(self):
        n = 100
        objects_spec = [(Blob.type_num, b'blob')]
        for i in range(n):
            objects_spec.append(
                    (OFS_DELTA, (0, b'blob' + str(i).encode('ascii'))))
        f = BytesIO()
        entries = build_pack(f, objects_spec)
        self.assertEntriesMatch(range(n + 1), entries, self.make_pack_iter(f))

    def test_ext_ref(self):
        blob, = self.store_blobs([b'blob'])
        f = BytesIO()
        entries = build_pack(f, [(REF_DELTA, (blob.id, b'blob1'))],
                             store=self.store)
        pack_iter = self.make_pack_iter(f)
        self.assertEntriesMatch([0], entries, pack_iter)
        self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())

    def test_ext_ref_chain(self):
        blob, = self.store_blobs([b'blob'])
        f = BytesIO()
        entries = build_pack(f, [
            (REF_DELTA, (1, b'blob2')),
            (REF_DELTA, (blob.id, b'blob1')),
        ], store=self.store)
        pack_iter = self.make_pack_iter(f)
        self.assertEntriesMatch([1, 0], entries, pack_iter)
        self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())

    def test_ext_ref_chain_degenerate(self):
        # Test a degenerate case where the sender is sending a REF_DELTA
        # object that expands to an object already in the repository.
        blob, = self.store_blobs([b'blob'])
        blob2, = self.store_blobs([b'blob2'])
        assert blob.id < blob2.id

        f = BytesIO()
        entries = build_pack(f, [
          (REF_DELTA, (blob.id, b'blob2')),
          (REF_DELTA, (0, b'blob3')),
          ], store=self.store)
        pack_iter = self.make_pack_iter(f)
        self.assertEntriesMatch([0, 1], entries, pack_iter)
        self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())

    def test_ext_ref_multiple_times(self):
        blob, = self.store_blobs([b'blob'])
        f = BytesIO()
        entries = build_pack(f, [
            (REF_DELTA, (blob.id, b'blob1')),
            (REF_DELTA, (blob.id, b'blob2')),
        ], store=self.store)
        pack_iter = self.make_pack_iter(f)
        self.assertEntriesMatch([0, 1], entries, pack_iter)
        self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())

    def test_multiple_ext_refs(self):
        b1, b2 = self.store_blobs([b'foo', b'bar'])
        f = BytesIO()
        entries = build_pack(f, [
            (REF_DELTA, (b1.id, b'foo1')),
            (REF_DELTA, (b2.id, b'bar2')),
        ], store=self.store)
        pack_iter = self.make_pack_iter(f)
        self.assertEntriesMatch([0, 1], entries, pack_iter)
        self.assertEqual([hex_to_sha(b1.id), hex_to_sha(b2.id)],
                         pack_iter.ext_refs())

    def test_bad_ext_ref_non_thin_pack(self):
        blob, = self.store_blobs([b'blob'])
        f = BytesIO()
        build_pack(f, [(REF_DELTA, (blob.id, b'blob1'))], store=self.store)
        pack_iter = self.make_pack_iter(f, thin=False)
        try:
            list(pack_iter._walk_all_chains())
            self.fail()
        except KeyError as e:
            self.assertEqual(([blob.id],), e.args)

    def test_bad_ext_ref_thin_pack(self):
        b1, b2, b3 = self.store_blobs([b'foo', b'bar', b'baz'])
        f = BytesIO()
        build_pack(f, [
          (REF_DELTA, (1, b'foo99')),
          (REF_DELTA, (b1.id, b'foo1')),
          (REF_DELTA, (b2.id, b'bar2')),
          (REF_DELTA, (b3.id, b'baz3')),
          ], store=self.store)
        del self.store[b2.id]
        del self.store[b3.id]
        pack_iter = self.make_pack_iter(f)
        try:
            list(pack_iter._walk_all_chains())
            self.fail()
        except KeyError as e:
            self.assertEqual((sorted([b2.id, b3.id]),), (sorted(e.args[0]),))


class DeltaEncodeSizeTests(TestCase):

    def test_basic(self):
        self.assertEqual(b'\x00', _delta_encode_size(0))
        self.assertEqual(b'\x01', _delta_encode_size(1))
        self.assertEqual(b'\xfa\x01', _delta_encode_size(250))
        self.assertEqual(b'\xe8\x07', _delta_encode_size(1000))
        self.assertEqual(b'\xa0\x8d\x06', _delta_encode_size(100000))


class EncodeCopyOperationTests(TestCase):

    def test_basic(self):
        self.assertEqual(b'\x80', _encode_copy_operation(0, 0))
        self.assertEqual(b'\x91\x01\x0a', _encode_copy_operation(1, 10))
        self.assertEqual(b'\xb1\x64\xe8\x03',
                         _encode_copy_operation(100, 1000))
        self.assertEqual(b'\x93\xe8\x03\x01',
                         _encode_copy_operation(1000, 1))