Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / dulwich   python

Repository URL to install this package:

/ tests / test_walk.py

# test_walk.py -- Tests for commit walking functionality.
# Copyright (C) 2010 Google, Inc.
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#

"""Tests for commit walking functionality."""

from itertools import (
    permutations,
    )

from dulwich.diff_tree import (
    CHANGE_MODIFY,
    CHANGE_RENAME,
    TreeChange,
    RenameDetector,
    )
from dulwich.errors import (
    MissingCommitError,
    )
from dulwich.object_store import (
    MemoryObjectStore,
    )
from dulwich.objects import (
    Commit,
    Blob,
    )
from dulwich.walk import (
    ORDER_TOPO,
    WalkEntry,
    Walker,
    _topo_reorder
    )
from dulwich.tests import TestCase
from dulwich.tests.utils import (
    F,
    make_object,
    make_tag,
    build_commit_graph,
    )


class TestWalkEntry(object):

    def __init__(self, commit, changes):
        self.commit = commit
        self.changes = changes

    def __repr__(self):
        return '<TestWalkEntry commit=%s, changes=%r>' % (
          self.commit.id, self.changes)

    def __eq__(self, other):
        if not isinstance(other, WalkEntry) or self.commit != other.commit:
            return False
        if self.changes is None:
            return True
        return self.changes == other.changes()


class WalkerTest(TestCase):

    def setUp(self):
        super(WalkerTest, self).setUp()
        self.store = MemoryObjectStore()

    def make_commits(self, commit_spec, **kwargs):
        times = kwargs.pop('times', [])
        attrs = kwargs.pop('attrs', {})
        for i, t in enumerate(times):
            attrs.setdefault(i + 1, {})['commit_time'] = t
        return build_commit_graph(self.store, commit_spec, attrs=attrs,
                                  **kwargs)

    def make_linear_commits(self, num_commits, **kwargs):
        commit_spec = []
        for i in range(1, num_commits + 1):
            c = [i]
            if i > 1:
                c.append(i - 1)
            commit_spec.append(c)
        return self.make_commits(commit_spec, **kwargs)

    def assertWalkYields(self, expected, *args, **kwargs):
        walker = Walker(self.store, *args, **kwargs)
        expected = list(expected)
        for i, entry in enumerate(expected):
            if isinstance(entry, Commit):
                expected[i] = TestWalkEntry(entry, None)
        actual = list(walker)
        self.assertEqual(expected, actual)

    def test_tag(self):
        c1, c2, c3 = self.make_linear_commits(3)
        t2 = make_tag(target=c2)
        self.store.add_object(t2)
        self.assertWalkYields([c2, c1], [t2.id])

    def test_linear(self):
        c1, c2, c3 = self.make_linear_commits(3)
        self.assertWalkYields([c1], [c1.id])
        self.assertWalkYields([c2, c1], [c2.id])
        self.assertWalkYields([c3, c2, c1], [c3.id])
        self.assertWalkYields([c3, c2, c1], [c3.id, c1.id])
        self.assertWalkYields([c3, c2], [c3.id], exclude=[c1.id])
        self.assertWalkYields([c3, c2], [c3.id, c1.id], exclude=[c1.id])
        self.assertWalkYields([c3], [c3.id, c1.id], exclude=[c2.id])

    def test_missing(self):
        cs = list(reversed(self.make_linear_commits(20)))
        self.assertWalkYields(cs, [cs[0].id])

        # Exactly how close we can get to a missing commit depends on our
        # implementation (in particular the choice of _MAX_EXTRA_COMMITS), but
        # we should at least be able to walk some history in a broken repo.
        del self.store[cs[-1].id]
        for i in range(1, 11):
            self.assertWalkYields(cs[:i], [cs[0].id], max_entries=i)
        self.assertRaises(MissingCommitError, Walker, self.store, [cs[-1].id])

    def test_branch(self):
        c1, x2, x3, y4 = self.make_commits([[1], [2, 1], [3, 2], [4, 1]])
        self.assertWalkYields([x3, x2, c1], [x3.id])
        self.assertWalkYields([y4, c1], [y4.id])
        self.assertWalkYields([y4, x2, c1], [y4.id, x2.id])
        self.assertWalkYields([y4, x2], [y4.id, x2.id], exclude=[c1.id])
        self.assertWalkYields([y4, x3], [y4.id, x3.id], exclude=[x2.id])
        self.assertWalkYields([y4], [y4.id], exclude=[x3.id])
        self.assertWalkYields([x3, x2], [x3.id], exclude=[y4.id])

    def test_merge(self):
        c1, c2, c3, c4 = self.make_commits([[1], [2, 1], [3, 1], [4, 2, 3]])
        self.assertWalkYields([c4, c3, c2, c1], [c4.id])
        self.assertWalkYields([c3, c1], [c3.id])
        self.assertWalkYields([c2, c1], [c2.id])
        self.assertWalkYields([c4, c3], [c4.id], exclude=[c2.id])
        self.assertWalkYields([c4, c2], [c4.id], exclude=[c3.id])

    def test_reverse(self):
        c1, c2, c3 = self.make_linear_commits(3)
        self.assertWalkYields([c1, c2, c3], [c3.id], reverse=True)

    def test_max_entries(self):
        c1, c2, c3 = self.make_linear_commits(3)
        self.assertWalkYields([c3, c2, c1], [c3.id], max_entries=3)
        self.assertWalkYields([c3, c2], [c3.id], max_entries=2)
        self.assertWalkYields([c3], [c3.id], max_entries=1)

    def test_reverse_after_max_entries(self):
        c1, c2, c3 = self.make_linear_commits(3)
        self.assertWalkYields([c1, c2, c3], [c3.id], max_entries=3,
                              reverse=True)
        self.assertWalkYields([c2, c3], [c3.id], max_entries=2, reverse=True)
        self.assertWalkYields([c3], [c3.id], max_entries=1, reverse=True)

    def test_changes_one_parent(self):
        blob_a1 = make_object(Blob, data=b'a1')
        blob_a2 = make_object(Blob, data=b'a2')
        blob_b2 = make_object(Blob, data=b'b2')
        c1, c2 = self.make_linear_commits(
            2, trees={1: [(b'a', blob_a1)],
                      2: [(b'a', blob_a2), (b'b', blob_b2)]})
        e1 = TestWalkEntry(c1, [TreeChange.add((b'a', F, blob_a1.id))])
        e2 = TestWalkEntry(
                c2,
                [TreeChange(CHANGE_MODIFY, (b'a', F, blob_a1.id),
                                           (b'a', F, blob_a2.id)),
                 TreeChange.add((b'b', F, blob_b2.id))])
        self.assertWalkYields([e2, e1], [c2.id])

    def test_changes_multiple_parents(self):
        blob_a1 = make_object(Blob, data=b'a1')
        blob_b2 = make_object(Blob, data=b'b2')
        blob_a3 = make_object(Blob, data=b'a3')
        c1, c2, c3 = self.make_commits(
            [[1], [2], [3, 1, 2]],
            trees={1: [(b'a', blob_a1)], 2: [(b'b', blob_b2)],
                   3: [(b'a', blob_a3), (b'b', blob_b2)]})
        # a is a modify/add conflict and b is not conflicted.
        changes = [[
                TreeChange(CHANGE_MODIFY,
                           (b'a', F, blob_a1.id), (b'a', F, blob_a3.id)),
                TreeChange.add((b'a', F, blob_a3.id)),
        ]]
        self.assertWalkYields([TestWalkEntry(c3, changes)], [c3.id],
                              exclude=[c1.id, c2.id])

    def test_path_matches(self):
        walker = Walker(None, [], paths=[b'foo', b'bar', b'baz/quux'])
        self.assertTrue(walker._path_matches(b'foo'))
        self.assertTrue(walker._path_matches(b'foo/a'))
        self.assertTrue(walker._path_matches(b'foo/a/b'))
        self.assertTrue(walker._path_matches(b'bar'))
        self.assertTrue(walker._path_matches(b'baz/quux'))
        self.assertTrue(walker._path_matches(b'baz/quux/a'))

        self.assertFalse(walker._path_matches(None))
        self.assertFalse(walker._path_matches(b'oops'))
        self.assertFalse(walker._path_matches(b'fool'))
        self.assertFalse(walker._path_matches(b'baz'))
        self.assertFalse(walker._path_matches(b'baz/quu'))

    def test_paths(self):
        blob_a1 = make_object(Blob, data=b'a1')
        blob_b2 = make_object(Blob, data=b'b2')
        blob_a3 = make_object(Blob, data=b'a3')
        blob_b3 = make_object(Blob, data=b'b3')
        c1, c2, c3 = self.make_linear_commits(
            3, trees={1: [(b'a', blob_a1)],
                      2: [(b'a', blob_a1), (b'x/b', blob_b2)],
                      3: [(b'a', blob_a3), (b'x/b', blob_b3)]})

        self.assertWalkYields([c3, c2, c1], [c3.id])
        self.assertWalkYields([c3, c1], [c3.id], paths=[b'a'])
        self.assertWalkYields([c3, c2], [c3.id], paths=[b'x/b'])

        # All changes are included, not just for requested paths.
        changes = [
            TreeChange(CHANGE_MODIFY, (b'a', F, blob_a1.id),
                       (b'a', F, blob_a3.id)),
            TreeChange(CHANGE_MODIFY, (b'x/b', F, blob_b2.id),
                       (b'x/b', F, blob_b3.id)),
        ]
        self.assertWalkYields([TestWalkEntry(c3, changes)], [c3.id],
                              max_entries=1, paths=[b'a'])

    def test_paths_subtree(self):
        blob_a = make_object(Blob, data=b'a')
        blob_b = make_object(Blob, data=b'b')
        c1, c2, c3 = self.make_linear_commits(
            3, trees={1: [(b'x/a', blob_a)],
                      2: [(b'b', blob_b), (b'x/a', blob_a)],
                      3: [(b'b', blob_b), (b'x/a', blob_a), (b'x/b', blob_b)]})
        self.assertWalkYields([c2], [c3.id], paths=[b'b'])
        self.assertWalkYields([c3, c1], [c3.id], paths=[b'x'])

    def test_paths_max_entries(self):
        blob_a = make_object(Blob, data=b'a')
        blob_b = make_object(Blob, data=b'b')
        c1, c2 = self.make_linear_commits(
            2, trees={1: [(b'a', blob_a)],
                      2: [(b'a', blob_a), (b'b', blob_b)]})
        self.assertWalkYields([c2], [c2.id], paths=[b'b'], max_entries=1)
        self.assertWalkYields([c1], [c1.id], paths=[b'a'], max_entries=1)

    def test_paths_merge(self):
        blob_a1 = make_object(Blob, data=b'a1')
        blob_a2 = make_object(Blob, data=b'a2')
        blob_a3 = make_object(Blob, data=b'a3')
        x1, y2, m3, m4 = self.make_commits(
            [[1], [2], [3, 1, 2], [4, 1, 2]],
            trees={1: [(b'a', blob_a1)],
                   2: [(b'a', blob_a2)],
                   3: [(b'a', blob_a3)],
                   4: [(b'a', blob_a1)]})  # Non-conflicting
        self.assertWalkYields([m3, y2, x1], [m3.id], paths=[b'a'])
        self.assertWalkYields([y2, x1], [m4.id], paths=[b'a'])

    def test_changes_with_renames(self):
        blob = make_object(Blob, data=b'blob')
        c1, c2 = self.make_linear_commits(
            2, trees={1: [(b'a', blob)], 2: [(b'b', blob)]})
        entry_a = (b'a', F, blob.id)
        entry_b = (b'b', F, blob.id)
        changes_without_renames = [TreeChange.delete(entry_a),
                                   TreeChange.add(entry_b)]
        changes_with_renames = [TreeChange(CHANGE_RENAME, entry_a, entry_b)]
        self.assertWalkYields(
          [TestWalkEntry(c2, changes_without_renames)], [c2.id], max_entries=1)
        detector = RenameDetector(self.store)
        self.assertWalkYields(
          [TestWalkEntry(c2, changes_with_renames)], [c2.id], max_entries=1,
          rename_detector=detector)

    def test_follow_rename(self):
        blob = make_object(Blob, data=b'blob')
        names = [b'a', b'a', b'b', b'b', b'c', b'c']

        trees = dict((i + 1, [(n, blob, F)]) for i, n in enumerate(names))
        c1, c2, c3, c4, c5, c6 = self.make_linear_commits(6, trees=trees)
        self.assertWalkYields([c5], [c6.id], paths=[b'c'])

        def e(n):
            return (n, F, blob.id)
        self.assertWalkYields(
            [TestWalkEntry(c5, [TreeChange(CHANGE_RENAME, e(b'b'), e(b'c'))]),
             TestWalkEntry(c3, [TreeChange(CHANGE_RENAME, e(b'a'), e(b'b'))]),
             TestWalkEntry(c1, [TreeChange.add(e(b'a'))])],
            [c6.id], paths=[b'c'], follow=True)

    def test_follow_rename_remove_path(self):
        blob = make_object(Blob, data=b'blob')
        _, _, _, c4, c5, c6 = self.make_linear_commits(
            6, trees={1: [(b'a', blob), (b'c', blob)],
                      2: [],
                      3: [],
                      4: [(b'b', blob)],
                      5: [(b'a', blob)],
                      6: [(b'c', blob)]})

        def e(n):
            return (n, F, blob.id)
        # Once the path changes to b, we aren't interested in a or c anymore.
        self.assertWalkYields(
            [TestWalkEntry(c6, [TreeChange(CHANGE_RENAME, e(b'a'), e(b'c'))]),
             TestWalkEntry(c5, [TreeChange(CHANGE_RENAME, e(b'b'), e(b'a'))]),
             TestWalkEntry(c4, [TreeChange.add(e(b'b'))])],
            [c6.id], paths=[b'c'], follow=True)

    def test_since(self):
        c1, c2, c3 = self.make_linear_commits(3)
        self.assertWalkYields([c3, c2, c1], [c3.id], since=-1)
        self.assertWalkYields([c3, c2, c1], [c3.id], since=0)
        self.assertWalkYields([c3, c2], [c3.id], since=1)
        self.assertWalkYields([c3, c2], [c3.id], since=99)
        self.assertWalkYields([c3, c2], [c3.id], since=100)
        self.assertWalkYields([c3], [c3.id], since=101)
        self.assertWalkYields([c3], [c3.id], since=199)
        self.assertWalkYields([c3], [c3.id], since=200)
        self.assertWalkYields([], [c3.id], since=201)
        self.assertWalkYields([], [c3.id], since=300)

    def test_until(self):
        c1, c2, c3 = self.make_linear_commits(3)
        self.assertWalkYields([], [c3.id], until=-1)
        self.assertWalkYields([c1], [c3.id], until=0)
        self.assertWalkYields([c1], [c3.id], until=1)
        self.assertWalkYields([c1], [c3.id], until=99)
        self.assertWalkYields([c2, c1], [c3.id], until=100)
        self.assertWalkYields([c2, c1], [c3.id], until=101)
Loading ...