Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / dulwich   python

Repository URL to install this package:

/ tests / test_porcelain.py

# test_porcelain.py -- porcelain tests
# Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#

"""Tests for dulwich.porcelain."""

from io import BytesIO
try:
    from StringIO import StringIO
except ImportError:
    from io import StringIO
import os
import shutil
import tarfile
import tempfile
import time

from dulwich import porcelain
from dulwich.diff_tree import tree_changes
from dulwich.objects import (
    Blob,
    Tag,
    Tree,
    ZERO_SHA,
    )
from dulwich.repo import (
    NoIndexPresent,
    Repo,
    )
from dulwich.tests import (
    TestCase,
    )
from dulwich.tests.utils import (
    build_commit_graph,
    make_commit,
    make_object,
    )


class PorcelainTestCase(TestCase):

    def setUp(self):
        super(PorcelainTestCase, self).setUp()
        self.test_dir = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, self.test_dir)
        self.repo = Repo.init(os.path.join(self.test_dir, 'repo'), mkdir=True)
        self.addCleanup(self.repo.close)


class ArchiveTests(PorcelainTestCase):
    """Tests for the archive command."""

    def test_simple(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"refs/heads/master"] = c3.id
        out = BytesIO()
        err = BytesIO()
        porcelain.archive(self.repo.path, b"refs/heads/master", outstream=out,
                          errstream=err)
        self.assertEqual(b"", err.getvalue())
        tf = tarfile.TarFile(fileobj=out)
        self.addCleanup(tf.close)
        self.assertEqual([], tf.getnames())


class UpdateServerInfoTests(PorcelainTestCase):

    def test_simple(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"refs/heads/foo"] = c3.id
        porcelain.update_server_info(self.repo.path)
        self.assertTrue(os.path.exists(
                os.path.join(self.repo.controldir(), 'info', 'refs')))


class CommitTests(PorcelainTestCase):

    def test_custom_author(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"refs/heads/foo"] = c3.id
        sha = porcelain.commit(
                self.repo.path, message=b"Some message",
                author=b"Joe <joe@example.com>",
                committer=b"Bob <bob@example.com>")
        self.assertTrue(isinstance(sha, bytes))
        self.assertEqual(len(sha), 40)

    def test_unicode(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"refs/heads/foo"] = c3.id
        sha = porcelain.commit(
                self.repo.path, message="Some message",
                author="Joe <joe@example.com>",
                committer="Bob <bob@example.com>")
        self.assertTrue(isinstance(sha, bytes))
        self.assertEqual(len(sha), 40)


class CloneTests(PorcelainTestCase):

    def test_simple_local(self):
        f1_1 = make_object(Blob, data=b'f1')
        commit_spec = [[1], [2, 1], [3, 1, 2]]
        trees = {1: [(b'f1', f1_1), (b'f2', f1_1)],
                 2: [(b'f1', f1_1), (b'f2', f1_1)],
                 3: [(b'f1', f1_1), (b'f2', f1_1)], }

        c1, c2, c3 = build_commit_graph(self.repo.object_store,
                                        commit_spec, trees)
        self.repo.refs[b"refs/heads/master"] = c3.id
        self.repo.refs[b"refs/tags/foo"] = c3.id
        target_path = tempfile.mkdtemp()
        errstream = BytesIO()
        self.addCleanup(shutil.rmtree, target_path)
        r = porcelain.clone(self.repo.path, target_path,
                            checkout=False, errstream=errstream)
        self.addCleanup(r.close)
        self.assertEqual(r.path, target_path)
        target_repo = Repo(target_path)
        self.assertEqual(0, len(target_repo.open_index()))
        self.assertEqual(c3.id, target_repo.refs[b'refs/tags/foo'])
        self.assertTrue(b'f1' not in os.listdir(target_path))
        self.assertTrue(b'f2' not in os.listdir(target_path))
        c = r.get_config()
        encoded_path = self.repo.path
        if not isinstance(encoded_path, bytes):
            encoded_path = encoded_path.encode('utf-8')
        self.assertEqual(encoded_path, c.get((b'remote', b'origin'), b'url'))
        self.assertEqual(
            b'+refs/heads/*:refs/remotes/origin/*',
            c.get((b'remote', b'origin'), b'fetch'))

    def test_simple_local_with_checkout(self):
        f1_1 = make_object(Blob, data=b'f1')
        commit_spec = [[1], [2, 1], [3, 1, 2]]
        trees = {1: [(b'f1', f1_1), (b'f2', f1_1)],
                 2: [(b'f1', f1_1), (b'f2', f1_1)],
                 3: [(b'f1', f1_1), (b'f2', f1_1)], }

        c1, c2, c3 = build_commit_graph(self.repo.object_store,
                                        commit_spec, trees)
        self.repo.refs[b"refs/heads/master"] = c3.id
        target_path = tempfile.mkdtemp()
        errstream = BytesIO()
        self.addCleanup(shutil.rmtree, target_path)
        with porcelain.clone(self.repo.path, target_path,
                             checkout=True,
                             errstream=errstream) as r:
            self.assertEqual(r.path, target_path)
        with Repo(target_path) as r:
            self.assertEqual(r.head(), c3.id)
        self.assertTrue('f1' in os.listdir(target_path))
        self.assertTrue('f2' in os.listdir(target_path))

    def test_bare_local_with_checkout(self):
        f1_1 = make_object(Blob, data=b'f1')
        commit_spec = [[1], [2, 1], [3, 1, 2]]
        trees = {1: [(b'f1', f1_1), (b'f2', f1_1)],
                 2: [(b'f1', f1_1), (b'f2', f1_1)],
                 3: [(b'f1', f1_1), (b'f2', f1_1)], }

        c1, c2, c3 = build_commit_graph(self.repo.object_store,
                                        commit_spec, trees)
        self.repo.refs[b"refs/heads/master"] = c3.id
        target_path = tempfile.mkdtemp()
        errstream = BytesIO()
        self.addCleanup(shutil.rmtree, target_path)
        with porcelain.clone(
                self.repo.path, target_path, bare=True,
                errstream=errstream) as r:
            self.assertEqual(r.path, target_path)
        with Repo(target_path) as r:
            r.head()
            self.assertRaises(NoIndexPresent, r.open_index)
        self.assertFalse(b'f1' in os.listdir(target_path))
        self.assertFalse(b'f2' in os.listdir(target_path))

    def test_no_checkout_with_bare(self):
        f1_1 = make_object(Blob, data=b'f1')
        commit_spec = [[1]]
        trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]}

        (c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees)
        self.repo.refs[b"refs/heads/master"] = c1.id
        self.repo.refs[b"HEAD"] = c1.id
        target_path = tempfile.mkdtemp()
        errstream = BytesIO()
        self.addCleanup(shutil.rmtree, target_path)
        self.assertRaises(
            ValueError, porcelain.clone, self.repo.path,
            target_path, checkout=True, bare=True, errstream=errstream)

    def test_no_head_no_checkout(self):
        f1_1 = make_object(Blob, data=b'f1')
        commit_spec = [[1]]
        trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]}

        (c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees)
        self.repo.refs[b"refs/heads/master"] = c1.id
        target_path = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, target_path)
        errstream = BytesIO()
        r = porcelain.clone(
            self.repo.path, target_path, checkout=True, errstream=errstream)
        r.close()

    def test_no_head_no_checkout_outstream_errstream_autofallback(self):
        f1_1 = make_object(Blob, data=b'f1')
        commit_spec = [[1]]
        trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]}

        (c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees)
        self.repo.refs[b"refs/heads/master"] = c1.id
        target_path = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, target_path)
        errstream = porcelain.NoneStream()
        r = porcelain.clone(
            self.repo.path, target_path, checkout=True, errstream=errstream)
        r.close()


class InitTests(TestCase):

    def test_non_bare(self):
        repo_dir = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, repo_dir)
        porcelain.init(repo_dir)

    def test_bare(self):
        repo_dir = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, repo_dir)
        porcelain.init(repo_dir, bare=True)


class AddTests(PorcelainTestCase):

    def test_add_default_paths(self):
        # create a file for initial commit
        fullpath = os.path.join(self.repo.path, 'blah')
        with open(fullpath, 'w') as f:
            f.write("\n")
        porcelain.add(repo=self.repo.path, paths=[fullpath])
        porcelain.commit(repo=self.repo.path, message=b'test',
                         author=b'test <email>', committer=b'test <email>')

        # Add a second test file and a file in a directory
        with open(os.path.join(self.repo.path, 'foo'), 'w') as f:
            f.write("\n")
        os.mkdir(os.path.join(self.repo.path, 'adir'))
        with open(os.path.join(self.repo.path, 'adir', 'afile'), 'w') as f:
            f.write("\n")
        cwd = os.getcwd()
        try:
            os.chdir(self.repo.path)
            porcelain.add(self.repo.path)
        finally:
            os.chdir(cwd)

        # Check that foo was added and nothing in .git was modified
        index = self.repo.open_index()
        self.assertEqual(sorted(index), [b'adir/afile', b'blah', b'foo'])

    def test_add_default_paths_subdir(self):
        os.mkdir(os.path.join(self.repo.path, 'foo'))
        with open(os.path.join(self.repo.path, 'blah'), 'w') as f:
            f.write("\n")
        with open(os.path.join(self.repo.path, 'foo', 'blie'), 'w') as f:
            f.write("\n")

        cwd = os.getcwd()
        try:
            os.chdir(os.path.join(self.repo.path, 'foo'))
            porcelain.add(repo=self.repo.path)
            porcelain.commit(repo=self.repo.path, message=b'test',
                             author=b'test <email>',
                             committer=b'test <email>')
        finally:
            os.chdir(cwd)

        index = self.repo.open_index()
        self.assertEqual(sorted(index), [b'foo/blie'])

    def test_add_file(self):
        fullpath = os.path.join(self.repo.path, 'foo')
        with open(fullpath, 'w') as f:
            f.write("BAR")
        porcelain.add(self.repo.path, paths=[fullpath])
        self.assertIn(b"foo", self.repo.open_index())

    def test_add_ignored(self):
        with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f:
            f.write("foo")
        with open(os.path.join(self.repo.path, 'foo'), 'w') as f:
            f.write("BAR")
        with open(os.path.join(self.repo.path, 'bar'), 'w') as f:
            f.write("BAR")
        (added, ignored) = porcelain.add(self.repo.path, paths=[
            os.path.join(self.repo.path, "foo"),
            os.path.join(self.repo.path, "bar")])
        self.assertIn(b"bar", self.repo.open_index())
        self.assertEqual(set(['bar']), set(added))
        self.assertEqual(set(['foo']), ignored)

    def test_add_file_absolute_path(self):
        # Absolute paths are (not yet) supported
        with open(os.path.join(self.repo.path, 'foo'), 'w') as f:
            f.write("BAR")
        porcelain.add(self.repo, paths=[os.path.join(self.repo.path, "foo")])
        self.assertIn(b"foo", self.repo.open_index())

    def test_add_not_in_repo(self):
        with open(os.path.join(self.test_dir, 'foo'), 'w') as f:
            f.write("BAR")
        self.assertRaises(
            ValueError,
            porcelain.add, self.repo,
            paths=[os.path.join(self.test_dir, "foo")])
        self.assertRaises(
            ValueError,
            porcelain.add, self.repo,
            paths=["../foo"])
        self.assertEqual([], list(self.repo.open_index()))

    def test_add_file_clrf_conversion(self):
        # Set the right configuration to the repo
Loading ...