Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
dulwich / tests / test_fastexport.py
Size: Mime:
# test_fastexport.py -- Fast export/import functionality
# Copyright (C) 2010 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#

from io import BytesIO
import stat


from dulwich.object_store import (
    MemoryObjectStore,
    )
from dulwich.objects import (
    Blob,
    Commit,
    Tree,
    ZERO_SHA,
    )
from dulwich.repo import (
    MemoryRepo,
    )
from dulwich.tests import (
    SkipTest,
    TestCase,
    )
from dulwich.tests.utils import (
    build_commit_graph,
    )


class GitFastExporterTests(TestCase):
    """Tests for the GitFastExporter tests."""

    def setUp(self):
        super(GitFastExporterTests, self).setUp()
        self.store = MemoryObjectStore()
        self.stream = BytesIO()
        try:
            from dulwich.fastexport import GitFastExporter
        except ImportError:
            raise SkipTest("python-fastimport not available")
        self.fastexporter = GitFastExporter(self.stream, self.store)

    def test_emit_blob(self):
        b = Blob()
        b.data = b"fooBAR"
        self.fastexporter.emit_blob(b)
        self.assertEqual(b'blob\nmark :1\ndata 6\nfooBAR\n',
                         self.stream.getvalue())

    def test_emit_commit(self):
        b = Blob()
        b.data = b"FOO"
        t = Tree()
        t.add(b"foo", stat.S_IFREG | 0o644, b.id)
        c = Commit()
        c.committer = c.author = b"Jelmer <jelmer@host>"
        c.author_time = c.commit_time = 1271345553
        c.author_timezone = c.commit_timezone = 0
        c.message = b"msg"
        c.tree = t.id
        self.store.add_objects([(b, None), (t, None), (c, None)])
        self.fastexporter.emit_commit(c, b"refs/heads/master")
        self.assertEqual(b"""blob
mark :1
data 3
FOO
commit refs/heads/master
mark :2
author Jelmer <jelmer@host> 1271345553 +0000
committer Jelmer <jelmer@host> 1271345553 +0000
data 3
msg
M 644 :1 foo
""", self.stream.getvalue())


class GitImportProcessorTests(TestCase):
    """Tests for the GitImportProcessor tests."""

    def setUp(self):
        super(GitImportProcessorTests, self).setUp()
        self.repo = MemoryRepo()
        try:
            from dulwich.fastexport import GitImportProcessor
        except ImportError:
            raise SkipTest("python-fastimport not available")
        self.processor = GitImportProcessor(self.repo)

    def test_reset_handler(self):
        from fastimport import commands
        [c1] = build_commit_graph(self.repo.object_store, [[1]])
        cmd = commands.ResetCommand(b"refs/heads/foo", c1.id)
        self.processor.reset_handler(cmd)
        self.assertEqual(c1.id, self.repo.get_refs()[b"refs/heads/foo"])
        self.assertEqual(c1.id, self.processor.last_commit)

    def test_reset_handler_marker(self):
        from fastimport import commands
        [c1, c2] = build_commit_graph(self.repo.object_store, [[1], [2]])
        self.processor.markers[b'10'] = c1.id
        cmd = commands.ResetCommand(b"refs/heads/foo", b':10')
        self.processor.reset_handler(cmd)
        self.assertEqual(c1.id, self.repo.get_refs()[b"refs/heads/foo"])

    def test_reset_handler_default(self):
        from fastimport import commands
        [c1, c2] = build_commit_graph(self.repo.object_store, [[1], [2]])
        cmd = commands.ResetCommand(b"refs/heads/foo", None)
        self.processor.reset_handler(cmd)
        self.assertEqual(ZERO_SHA, self.repo.get_refs()[b"refs/heads/foo"])

    def test_commit_handler(self):
        from fastimport import commands
        cmd = commands.CommitCommand(
                b"refs/heads/foo",  b"mrkr",
                (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
                (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
                b"FOO", None, [], [])
        self.processor.commit_handler(cmd)
        commit = self.repo[self.processor.last_commit]
        self.assertEqual(b"Jelmer <jelmer@samba.org>", commit.author)
        self.assertEqual(b"Jelmer <jelmer@samba.org>", commit.committer)
        self.assertEqual(b"FOO", commit.message)
        self.assertEqual([], commit.parents)
        self.assertEqual(432432432.0, commit.commit_time)
        self.assertEqual(432432432.0, commit.author_time)
        self.assertEqual(3600, commit.commit_timezone)
        self.assertEqual(3600, commit.author_timezone)
        self.assertEqual(commit, self.repo[b"refs/heads/foo"])

    def test_commit_handler_markers(self):
        from fastimport import commands
        [c1, c2, c3] = build_commit_graph(self.repo.object_store,
                                          [[1], [2], [3]])
        self.processor.markers[b'10'] = c1.id
        self.processor.markers[b'42'] = c2.id
        self.processor.markers[b'98'] = c3.id
        cmd = commands.CommitCommand(
                b"refs/heads/foo",  b"mrkr",
                (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
                (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
                b"FOO", b':10', [b':42', b':98'], [])
        self.processor.commit_handler(cmd)
        commit = self.repo[self.processor.last_commit]
        self.assertEqual(c1.id, commit.parents[0])
        self.assertEqual(c2.id, commit.parents[1])
        self.assertEqual(c3.id, commit.parents[2])

    def test_import_stream(self):
        markers = self.processor.import_stream(BytesIO(b"""blob
mark :1
data 11
text for a

commit refs/heads/master
mark :2
committer Joe Foo <joe@foo.com> 1288287382 +0000
data 20
<The commit message>
M 100644 :1 a

"""))
        self.assertEqual(2, len(markers))
        self.assertTrue(isinstance(self.repo[markers[b"1"]], Blob))
        self.assertTrue(isinstance(self.repo[markers[b"2"]], Commit))

    def test_file_add(self):
        from fastimport import commands
        cmd = commands.BlobCommand(b"23", b"data")
        self.processor.blob_handler(cmd)
        cmd = commands.CommitCommand(
                b"refs/heads/foo", b"mrkr",
                (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
                (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
                b"FOO", None, [],
                [commands.FileModifyCommand(b"path", 0o100644, b":23", None)])
        self.processor.commit_handler(cmd)
        commit = self.repo[self.processor.last_commit]
        self.assertEqual([
            (b'path', 0o100644, b'6320cd248dd8aeaab759d5871f8781b5c0505172')],
            self.repo[commit.tree].items())

    def simple_commit(self):
        from fastimport import commands
        cmd = commands.BlobCommand(b"23", b"data")
        self.processor.blob_handler(cmd)
        cmd = commands.CommitCommand(
                b"refs/heads/foo", b"mrkr",
                (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
                (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
                b"FOO", None, [],
                [commands.FileModifyCommand(b"path", 0o100644, b":23", None)])
        self.processor.commit_handler(cmd)
        commit = self.repo[self.processor.last_commit]
        return commit

    def make_file_commit(self, file_cmds):
        """Create a trivial commit with the specified file commands.

        :param file_cmds: File commands to run.
        :return: The created commit object
        """
        from fastimport import commands
        cmd = commands.CommitCommand(
                b"refs/heads/foo", b"mrkr",
                (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
                (b"Jelmer", b"jelmer@samba.org", 432432432.0, 3600),
                b"FOO", None, [], file_cmds)
        self.processor.commit_handler(cmd)
        return self.repo[self.processor.last_commit]

    def test_file_copy(self):
        from fastimport import commands
        self.simple_commit()
        commit = self.make_file_commit(
                [commands.FileCopyCommand(b"path", b"new_path")])
        self.assertEqual([
                (b'new_path', 0o100644,
                 b'6320cd248dd8aeaab759d5871f8781b5c0505172'),
                (b'path', 0o100644,
                 b'6320cd248dd8aeaab759d5871f8781b5c0505172'),
                ], self.repo[commit.tree].items())

    def test_file_move(self):
        from fastimport import commands
        self.simple_commit()
        commit = self.make_file_commit(
                [commands.FileRenameCommand(b"path", b"new_path")])
        self.assertEqual([
                (b'new_path', 0o100644,
                 b'6320cd248dd8aeaab759d5871f8781b5c0505172'),
                ], self.repo[commit.tree].items())

    def test_file_delete(self):
        from fastimport import commands
        self.simple_commit()
        commit = self.make_file_commit([commands.FileDeleteCommand(b"path")])
        self.assertEqual([], self.repo[commit.tree].items())

    def test_file_deleteall(self):
        from fastimport import commands
        self.simple_commit()
        commit = self.make_file_commit([commands.FileDeleteAllCommand()])
        self.assertEqual([], self.repo[commit.tree].items())