Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / dulwich   python

Repository URL to install this package:

Version: 0.19.11 

/ tests / test_porcelain.py

# test_porcelain.py -- porcelain tests
# Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#

"""Tests for dulwich.porcelain."""

from io import BytesIO
try:
    from StringIO import StringIO
except ImportError:
    from io import StringIO
import os
import shutil
import tarfile
import tempfile
import time

from dulwich import porcelain
from dulwich.diff_tree import tree_changes
from dulwich.objects import (
    Blob,
    Tag,
    Tree,
    ZERO_SHA,
    )
from dulwich.repo import (
    NoIndexPresent,
    Repo,
    )
from dulwich.tests import (
    TestCase,
    )
from dulwich.tests.utils import (
    build_commit_graph,
    make_commit,
    make_object,
    )


class PorcelainTestCase(TestCase):

    def setUp(self):
        super(PorcelainTestCase, self).setUp()
        self.test_dir = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, self.test_dir)
        self.repo = Repo.init(os.path.join(self.test_dir, 'repo'), mkdir=True)
        self.addCleanup(self.repo.close)


class ArchiveTests(PorcelainTestCase):
    """Tests for the archive command."""

    def test_simple(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"refs/heads/master"] = c3.id
        out = BytesIO()
        err = BytesIO()
        porcelain.archive(self.repo.path, b"refs/heads/master", outstream=out,
                          errstream=err)
        self.assertEqual(b"", err.getvalue())
        tf = tarfile.TarFile(fileobj=out)
        self.addCleanup(tf.close)
        self.assertEqual([], tf.getnames())


class UpdateServerInfoTests(PorcelainTestCase):

    def test_simple(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"refs/heads/foo"] = c3.id
        porcelain.update_server_info(self.repo.path)
        self.assertTrue(os.path.exists(
                os.path.join(self.repo.controldir(), 'info', 'refs')))


class CommitTests(PorcelainTestCase):

    def test_custom_author(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"refs/heads/foo"] = c3.id
        sha = porcelain.commit(
                self.repo.path, message=b"Some message",
                author=b"Joe <joe@example.com>",
                committer=b"Bob <bob@example.com>")
        self.assertTrue(isinstance(sha, bytes))
        self.assertEqual(len(sha), 40)

    def test_unicode(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"refs/heads/foo"] = c3.id
        sha = porcelain.commit(
                self.repo.path, message="Some message",
                author="Joe <joe@example.com>",
                committer="Bob <bob@example.com>")
        self.assertTrue(isinstance(sha, bytes))
        self.assertEqual(len(sha), 40)


class CloneTests(PorcelainTestCase):

    def test_simple_local(self):
        f1_1 = make_object(Blob, data=b'f1')
        commit_spec = [[1], [2, 1], [3, 1, 2]]
        trees = {1: [(b'f1', f1_1), (b'f2', f1_1)],
                 2: [(b'f1', f1_1), (b'f2', f1_1)],
                 3: [(b'f1', f1_1), (b'f2', f1_1)], }

        c1, c2, c3 = build_commit_graph(self.repo.object_store,
                                        commit_spec, trees)
        self.repo.refs[b"refs/heads/master"] = c3.id
        self.repo.refs[b"refs/tags/foo"] = c3.id
        target_path = tempfile.mkdtemp()
        errstream = BytesIO()
        self.addCleanup(shutil.rmtree, target_path)
        r = porcelain.clone(self.repo.path, target_path,
                            checkout=False, errstream=errstream)
        self.addCleanup(r.close)
        self.assertEqual(r.path, target_path)
        target_repo = Repo(target_path)
        self.assertEqual(0, len(target_repo.open_index()))
        self.assertEqual(c3.id, target_repo.refs[b'refs/tags/foo'])
        self.assertTrue(b'f1' not in os.listdir(target_path))
        self.assertTrue(b'f2' not in os.listdir(target_path))
        c = r.get_config()
        encoded_path = self.repo.path
        if not isinstance(encoded_path, bytes):
            encoded_path = encoded_path.encode('utf-8')
        self.assertEqual(encoded_path, c.get((b'remote', b'origin'), b'url'))
        self.assertEqual(
            b'+refs/heads/*:refs/remotes/origin/*',
            c.get((b'remote', b'origin'), b'fetch'))

    def test_simple_local_with_checkout(self):
        f1_1 = make_object(Blob, data=b'f1')
        commit_spec = [[1], [2, 1], [3, 1, 2]]
        trees = {1: [(b'f1', f1_1), (b'f2', f1_1)],
                 2: [(b'f1', f1_1), (b'f2', f1_1)],
                 3: [(b'f1', f1_1), (b'f2', f1_1)], }

        c1, c2, c3 = build_commit_graph(self.repo.object_store,
                                        commit_spec, trees)
        self.repo.refs[b"refs/heads/master"] = c3.id
        target_path = tempfile.mkdtemp()
        errstream = BytesIO()
        self.addCleanup(shutil.rmtree, target_path)
        with porcelain.clone(self.repo.path, target_path,
                             checkout=True,
                             errstream=errstream) as r:
            self.assertEqual(r.path, target_path)
        with Repo(target_path) as r:
            self.assertEqual(r.head(), c3.id)
        self.assertTrue('f1' in os.listdir(target_path))
        self.assertTrue('f2' in os.listdir(target_path))

    def test_bare_local_with_checkout(self):
        f1_1 = make_object(Blob, data=b'f1')
        commit_spec = [[1], [2, 1], [3, 1, 2]]
        trees = {1: [(b'f1', f1_1), (b'f2', f1_1)],
                 2: [(b'f1', f1_1), (b'f2', f1_1)],
                 3: [(b'f1', f1_1), (b'f2', f1_1)], }

        c1, c2, c3 = build_commit_graph(self.repo.object_store,
                                        commit_spec, trees)
        self.repo.refs[b"refs/heads/master"] = c3.id
        target_path = tempfile.mkdtemp()
        errstream = BytesIO()
        self.addCleanup(shutil.rmtree, target_path)
        with porcelain.clone(
                self.repo.path, target_path, bare=True,
                errstream=errstream) as r:
            self.assertEqual(r.path, target_path)
        with Repo(target_path) as r:
            r.head()
            self.assertRaises(NoIndexPresent, r.open_index)
        self.assertFalse(b'f1' in os.listdir(target_path))
        self.assertFalse(b'f2' in os.listdir(target_path))

    def test_no_checkout_with_bare(self):
        f1_1 = make_object(Blob, data=b'f1')
        commit_spec = [[1]]
        trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]}

        (c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees)
        self.repo.refs[b"refs/heads/master"] = c1.id
        self.repo.refs[b"HEAD"] = c1.id
        target_path = tempfile.mkdtemp()
        errstream = BytesIO()
        self.addCleanup(shutil.rmtree, target_path)
        self.assertRaises(
            ValueError, porcelain.clone, self.repo.path,
            target_path, checkout=True, bare=True, errstream=errstream)

    def test_no_head_no_checkout(self):
        f1_1 = make_object(Blob, data=b'f1')
        commit_spec = [[1]]
        trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]}

        (c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees)
        self.repo.refs[b"refs/heads/master"] = c1.id
        target_path = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, target_path)
        errstream = BytesIO()
        r = porcelain.clone(
            self.repo.path, target_path, checkout=True, errstream=errstream)
        r.close()

    def test_no_head_no_checkout_outstream_errstream_autofallback(self):
        f1_1 = make_object(Blob, data=b'f1')
        commit_spec = [[1]]
        trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]}

        (c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees)
        self.repo.refs[b"refs/heads/master"] = c1.id
        target_path = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, target_path)
        errstream = porcelain.NoneStream()
        r = porcelain.clone(
            self.repo.path, target_path, checkout=True, errstream=errstream)
        r.close()


class InitTests(TestCase):

    def test_non_bare(self):
        repo_dir = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, repo_dir)
        porcelain.init(repo_dir)

    def test_bare(self):
        repo_dir = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, repo_dir)
        porcelain.init(repo_dir, bare=True)


class AddTests(PorcelainTestCase):

    def test_add_default_paths(self):
        # create a file for initial commit
        fullpath = os.path.join(self.repo.path, 'blah')
        with open(fullpath, 'w') as f:
            f.write("\n")
        porcelain.add(repo=self.repo.path, paths=[fullpath])
        porcelain.commit(repo=self.repo.path, message=b'test',
                         author=b'test <email>', committer=b'test <email>')

        # Add a second test file and a file in a directory
        with open(os.path.join(self.repo.path, 'foo'), 'w') as f:
            f.write("\n")
        os.mkdir(os.path.join(self.repo.path, 'adir'))
        with open(os.path.join(self.repo.path, 'adir', 'afile'), 'w') as f:
            f.write("\n")
        cwd = os.getcwd()
        try:
            os.chdir(self.repo.path)
            porcelain.add(self.repo.path)
        finally:
            os.chdir(cwd)

        # Check that foo was added and nothing in .git was modified
        index = self.repo.open_index()
        self.assertEqual(sorted(index), [b'adir/afile', b'blah', b'foo'])

    def test_add_default_paths_subdir(self):
        os.mkdir(os.path.join(self.repo.path, 'foo'))
        with open(os.path.join(self.repo.path, 'blah'), 'w') as f:
            f.write("\n")
        with open(os.path.join(self.repo.path, 'foo', 'blie'), 'w') as f:
            f.write("\n")

        cwd = os.getcwd()
        try:
            os.chdir(os.path.join(self.repo.path, 'foo'))
            porcelain.add(repo=self.repo.path)
            porcelain.commit(repo=self.repo.path, message=b'test',
                             author=b'test <email>',
                             committer=b'test <email>')
        finally:
            os.chdir(cwd)

        index = self.repo.open_index()
        self.assertEqual(sorted(index), [b'foo/blie'])

    def test_add_file(self):
        fullpath = os.path.join(self.repo.path, 'foo')
        with open(fullpath, 'w') as f:
            f.write("BAR")
        porcelain.add(self.repo.path, paths=[fullpath])
        self.assertIn(b"foo", self.repo.open_index())

    def test_add_ignored(self):
        with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f:
            f.write("foo")
        with open(os.path.join(self.repo.path, 'foo'), 'w') as f:
            f.write("BAR")
        with open(os.path.join(self.repo.path, 'bar'), 'w') as f:
            f.write("BAR")
        (added, ignored) = porcelain.add(self.repo.path, paths=[
            os.path.join(self.repo.path, "foo"),
            os.path.join(self.repo.path, "bar")])
        self.assertIn(b"bar", self.repo.open_index())
        self.assertEqual(set(['bar']), set(added))
        self.assertEqual(set(['foo']), ignored)

    def test_add_file_absolute_path(self):
        # Absolute paths are (not yet) supported
        with open(os.path.join(self.repo.path, 'foo'), 'w') as f:
            f.write("BAR")
        porcelain.add(self.repo, paths=[os.path.join(self.repo.path, "foo")])
        self.assertIn(b"foo", self.repo.open_index())

    def test_add_not_in_repo(self):
        with open(os.path.join(self.test_dir, 'foo'), 'w') as f:
            f.write("BAR")
        self.assertRaises(
            ValueError,
            porcelain.add, self.repo,
            paths=[os.path.join(self.test_dir, "foo")])
        self.assertRaises(
            ValueError,
            porcelain.add, self.repo,
            paths=["../foo"])
        self.assertEqual([], list(self.repo.open_index()))

    def test_add_file_clrf_conversion(self):
        # Set the right configuration to the repo
        c = self.repo.get_config()
        c.set("core", "autocrlf", "input")
        c.write_to_path()

        # Add a file with CRLF line-ending
        fullpath = os.path.join(self.repo.path, 'foo')
        with open(fullpath, 'wb') as f:
            f.write(b"line1\r\nline2")
        porcelain.add(self.repo.path, paths=[fullpath])

        # The line-endings should have been converted to LF
        index = self.repo.open_index()
        self.assertIn(b"foo", index)

        entry = index[b"foo"]
        blob = self.repo[entry.sha]
        self.assertEqual(blob.data, b"line1\nline2")


class RemoveTests(PorcelainTestCase):

    def test_remove_file(self):
        fullpath = os.path.join(self.repo.path, 'foo')
        with open(fullpath, 'w') as f:
            f.write("BAR")
        porcelain.add(self.repo.path, paths=[fullpath])
        porcelain.commit(repo=self.repo, message=b'test',
                         author=b'test <email>',
                         committer=b'test <email>')
        self.assertTrue(os.path.exists(os.path.join(self.repo.path, 'foo')))
        cwd = os.getcwd()
        try:
            os.chdir(self.repo.path)
            porcelain.remove(self.repo.path, paths=["foo"])
        finally:
            os.chdir(cwd)
        self.assertFalse(os.path.exists(os.path.join(self.repo.path, 'foo')))

    def test_remove_file_staged(self):
        fullpath = os.path.join(self.repo.path, 'foo')
        with open(fullpath, 'w') as f:
            f.write("BAR")
        cwd = os.getcwd()
        try:
            os.chdir(self.repo.path)
            porcelain.add(self.repo.path, paths=[fullpath])
            self.assertRaises(Exception, porcelain.rm, self.repo.path,
                              paths=["foo"])
        finally:
            os.chdir(cwd)


class LogTests(PorcelainTestCase):

    def test_simple(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"HEAD"] = c3.id
        outstream = StringIO()
        porcelain.log(self.repo.path, outstream=outstream)
        self.assertEqual(3, outstream.getvalue().count("-" * 50))

    def test_max_entries(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"HEAD"] = c3.id
        outstream = StringIO()
        porcelain.log(self.repo.path, outstream=outstream, max_entries=1)
        self.assertEqual(1, outstream.getvalue().count("-" * 50))


class ShowTests(PorcelainTestCase):

    def test_nolist(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"HEAD"] = c3.id
        outstream = StringIO()
        porcelain.show(self.repo.path, objects=c3.id, outstream=outstream)
        self.assertTrue(outstream.getvalue().startswith("-" * 50))

    def test_simple(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"HEAD"] = c3.id
        outstream = StringIO()
        porcelain.show(self.repo.path, objects=[c3.id], outstream=outstream)
        self.assertTrue(outstream.getvalue().startswith("-" * 50))

    def test_blob(self):
        b = Blob.from_string(b"The Foo\n")
        self.repo.object_store.add_object(b)
        outstream = StringIO()
        porcelain.show(self.repo.path, objects=[b.id], outstream=outstream)
        self.assertEqual(outstream.getvalue(), "The Foo\n")

    def test_commit_no_parent(self):
        a = Blob.from_string(b"The Foo\n")
        ta = Tree()
        ta.add(b"somename", 0o100644, a.id)
        ca = make_commit(tree=ta.id)
        self.repo.object_store.add_objects([(a, None), (ta, None), (ca, None)])
        outstream = StringIO()
        porcelain.show(self.repo.path, objects=[ca.id], outstream=outstream)
        self.assertMultiLineEqual(outstream.getvalue(), """\
--------------------------------------------------
commit: 344da06c1bb85901270b3e8875c988a027ec087d
Author: Test Author <test@nodomain.com>
Committer: Test Committer <test@nodomain.com>
Date:   Fri Jan 01 2010 00:00:00 +0000

Test message.

diff --git /dev/null b/somename
new mode 100644
index 0000000..ea5c7bf 100644
--- /dev/null
+++ b/somename
@@ -0,0 +1 @@
+The Foo
""")

    def test_commit_with_change(self):
        a = Blob.from_string(b"The Foo\n")
        ta = Tree()
        ta.add(b"somename", 0o100644, a.id)
        ca = make_commit(tree=ta.id)
        b = Blob.from_string(b"The Bar\n")
        tb = Tree()
        tb.add(b"somename", 0o100644, b.id)
        cb = make_commit(tree=tb.id, parents=[ca.id])
        self.repo.object_store.add_objects(
            [(a, None), (b, None), (ta, None), (tb, None),
             (ca, None), (cb, None)])
        outstream = StringIO()
        porcelain.show(self.repo.path, objects=[cb.id], outstream=outstream)
        self.assertMultiLineEqual(outstream.getvalue(), """\
--------------------------------------------------
commit: 2c6b6c9cb72c130956657e1fdae58e5b103744fa
Author: Test Author <test@nodomain.com>
Committer: Test Committer <test@nodomain.com>
Date:   Fri Jan 01 2010 00:00:00 +0000

Test message.

diff --git a/somename b/somename
index ea5c7bf..fd38bcb 100644
--- a/somename
+++ b/somename
@@ -1 +1 @@
-The Foo
+The Bar
""")


class SymbolicRefTests(PorcelainTestCase):

    def test_set_wrong_symbolic_ref(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"HEAD"] = c3.id

        self.assertRaises(ValueError, porcelain.symbolic_ref, self.repo.path,
                          b'foobar')

    def test_set_force_wrong_symbolic_ref(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"HEAD"] = c3.id

        porcelain.symbolic_ref(self.repo.path, b'force_foobar', force=True)

        # test if we actually changed the file
        with self.repo.get_named_file('HEAD') as f:
            new_ref = f.read()
        self.assertEqual(new_ref, b'ref: refs/heads/force_foobar\n')

    def test_set_symbolic_ref(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"HEAD"] = c3.id

        porcelain.symbolic_ref(self.repo.path, b'master')

    def test_set_symbolic_ref_other_than_master(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]],
                attrs=dict(refs='develop'))
        self.repo.refs[b"HEAD"] = c3.id
        self.repo.refs[b"refs/heads/develop"] = c3.id

        porcelain.symbolic_ref(self.repo.path, b'develop')

        # test if we actually changed the file
        with self.repo.get_named_file('HEAD') as f:
            new_ref = f.read()
        self.assertEqual(new_ref, b'ref: refs/heads/develop\n')


class DiffTreeTests(PorcelainTestCase):

    def test_empty(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"HEAD"] = c3.id
        outstream = BytesIO()
        porcelain.diff_tree(self.repo.path, c2.tree, c3.tree,
                            outstream=outstream)
        self.assertEqual(outstream.getvalue(), b"")


class CommitTreeTests(PorcelainTestCase):

    def test_simple(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        b = Blob()
        b.data = b"foo the bar"
        t = Tree()
        t.add(b"somename", 0o100644, b.id)
        self.repo.object_store.add_object(t)
        self.repo.object_store.add_object(b)
        sha = porcelain.commit_tree(
            self.repo.path, t.id, message=b"Withcommit.",
            author=b"Joe <joe@example.com>",
            committer=b"Jane <jane@example.com>")
        self.assertTrue(isinstance(sha, bytes))
        self.assertEqual(len(sha), 40)


class RevListTests(PorcelainTestCase):

    def test_simple(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        outstream = BytesIO()
        porcelain.rev_list(
            self.repo.path, [c3.id], outstream=outstream)
        self.assertEqual(
            c3.id + b"\n" +
            c2.id + b"\n" +
            c1.id + b"\n",
            outstream.getvalue())


class TagCreateTests(PorcelainTestCase):

    def test_annotated(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"HEAD"] = c3.id

        porcelain.tag_create(self.repo.path, b"tryme", b'foo <foo@bar.com>',
                             b'bar', annotated=True)

        tags = self.repo.refs.as_dict(b"refs/tags")
        self.assertEqual(list(tags.keys()), [b"tryme"])
        tag = self.repo[b'refs/tags/tryme']
        self.assertTrue(isinstance(tag, Tag))
        self.assertEqual(b"foo <foo@bar.com>", tag.tagger)
        self.assertEqual(b"bar", tag.message)
        self.assertLess(time.time() - tag.tag_time, 5)

    def test_unannotated(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"HEAD"] = c3.id

        porcelain.tag_create(self.repo.path, b"tryme", annotated=False)

        tags = self.repo.refs.as_dict(b"refs/tags")
        self.assertEqual(list(tags.keys()), [b"tryme"])
        self.repo[b'refs/tags/tryme']
        self.assertEqual(list(tags.values()), [self.repo.head()])

    def test_unannotated_unicode(self):
        c1, c2, c3 = build_commit_graph(
                self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
        self.repo.refs[b"HEAD"] = c3.id

        porcelain.tag_create(self.repo.path, "tryme", annotated=False)

        tags = self.repo.refs.as_dict(b"refs/tags")
        self.assertEqual(list(tags.keys()), [b"tryme"])
        self.repo[b'refs/tags/tryme']
        self.assertEqual(list(tags.values()), [self.repo.head()])


class TagListTests(PorcelainTestCase):

    def test_empty(self):
        tags = porcelain.tag_list(self.repo.path)
        self.assertEqual([], tags)

    def test_simple(self):
        self.repo.refs[b"refs/tags/foo"] = b"aa" * 20
        self.repo.refs[b"refs/tags/bar/bla"] = b"bb" * 20
        tags = porcelain.tag_list(self.repo.path)

        self.assertEqual([b"bar/bla", b"foo"], tags)


class TagDeleteTests(PorcelainTestCase):

    def test_simple(self):
        [c1] = build_commit_graph(self.repo.object_store, [[1]])
        self.repo[b"HEAD"] = c1.id
        porcelain.tag_create(self.repo, b'foo')
        self.assertTrue(b"foo" in porcelain.tag_list(self.repo))
        porcelain.tag_delete(self.repo, b'foo')
        self.assertFalse(b"foo" in porcelain.tag_list(self.repo))


class ResetTests(PorcelainTestCase):

    def test_hard_head(self):
        fullpath = os.path.join(self.repo.path, 'foo')
        with open(fullpath, 'w') as f:
            f.write("BAR")
        porcelain.add(self.repo.path, paths=[fullpath])
        porcelain.commit(self.repo.path, message=b"Some message",
                         committer=b"Jane <jane@example.com>",
                         author=b"John <john@example.com>")

        with open(os.path.join(self.repo.path, 'foo'), 'wb') as f:
            f.write(b"OOH")

        porcelain.reset(self.repo, "hard", b"HEAD")

        index = self.repo.open_index()
        changes = list(tree_changes(self.repo,
                       index.commit(self.repo.object_store),
                       self.repo[b'HEAD'].tree))

        self.assertEqual([], changes)

    def test_hard_commit(self):
        fullpath = os.path.join(self.repo.path, 'foo')
        with open(fullpath, 'w') as f:
            f.write("BAR")
        porcelain.add(self.repo.path, paths=[fullpath])
        sha = porcelain.commit(self.repo.path, message=b"Some message",
                               committer=b"Jane <jane@example.com>",
                               author=b"John <john@example.com>")

        with open(fullpath, 'wb') as f:
            f.write(b"BAZ")
        porcelain.add(self.repo.path, paths=[fullpath])
        porcelain.commit(self.repo.path, message=b"Some other message",
                         committer=b"Jane <jane@example.com>",
                         author=b"John <john@example.com>")

        porcelain.reset(self.repo, "hard", sha)

        index = self.repo.open_index()
        changes = list(tree_changes(self.repo,
                       index.commit(self.repo.object_store),
                       self.repo[sha].tree))

        self.assertEqual([], changes)


class PushTests(PorcelainTestCase):

    def test_simple(self):
        """
        Basic test of porcelain push where self.repo is the remote.  First
        clone the remote, commit a file to the clone, then push the changes
        back to the remote.
        """
        outstream = BytesIO()
        errstream = BytesIO()

        porcelain.commit(repo=self.repo.path, message=b'init',
                         author=b'author <email>',
                         committer=b'committer <email>')

        # Setup target repo cloned from temp test repo
        clone_path = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, clone_path)
        target_repo = porcelain.clone(self.repo.path, target=clone_path,
                                      errstream=errstream)
        try:
            self.assertEqual(target_repo[b'HEAD'], self.repo[b'HEAD'])
        finally:
            target_repo.close()

        # create a second file to be pushed back to origin
        handle, fullpath = tempfile.mkstemp(dir=clone_path)
        os.close(handle)
        porcelain.add(repo=clone_path, paths=[fullpath])
        porcelain.commit(repo=clone_path, message=b'push',
                         author=b'author <email>',
                         committer=b'committer <email>')

        # Setup a non-checked out branch in the remote
        refs_path = b"refs/heads/foo"
        new_id = self.repo[b'HEAD'].id
        self.assertNotEqual(new_id, ZERO_SHA)
        self.repo.refs[refs_path] = new_id

        # Push to the remote
        porcelain.push(clone_path, self.repo.path, b"HEAD:" + refs_path,
                       outstream=outstream, errstream=errstream)

        # Check that the target and source
        with Repo(clone_path) as r_clone:
            self.assertEqual({
                b'HEAD': new_id,
                b'refs/heads/foo': r_clone[b'HEAD'].id,
                b'refs/heads/master': new_id,
                }, self.repo.get_refs())
            self.assertEqual(r_clone[b'HEAD'].id, self.repo[refs_path].id)

            # Get the change in the target repo corresponding to the add
            # this will be in the foo branch.
            change = list(tree_changes(self.repo, self.repo[b'HEAD'].tree,
                                       self.repo[b'refs/heads/foo'].tree))[0]
            self.assertEqual(os.path.basename(fullpath),
                             change.new.path.decode('ascii'))

    def test_delete(self):
        """Basic test of porcelain push, removing a branch.
        """
        outstream = BytesIO()
        errstream = BytesIO()

        porcelain.commit(repo=self.repo.path, message=b'init',
                         author=b'author <email>',
                         committer=b'committer <email>')

        # Setup target repo cloned from temp test repo
        clone_path = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, clone_path)
        target_repo = porcelain.clone(self.repo.path, target=clone_path,
                                      errstream=errstream)
        target_repo.close()

        # Setup a non-checked out branch in the remote
        refs_path = b"refs/heads/foo"
        new_id = self.repo[b'HEAD'].id
        self.assertNotEqual(new_id, ZERO_SHA)
        self.repo.refs[refs_path] = new_id

        # Push to the remote
        porcelain.push(clone_path, self.repo.path, b":" + refs_path,
                       outstream=outstream, errstream=errstream)

        self.assertEqual({
            b'HEAD': new_id,
            b'refs/heads/master': new_id,
            }, self.repo.get_refs())


class PullTests(PorcelainTestCase):

    def setUp(self):
        super(PullTests, self).setUp()
        # create a file for initial commit
        handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
        os.close(handle)
        porcelain.add(repo=self.repo.path, paths=fullpath)
        porcelain.commit(repo=self.repo.path, message=b'test',
                         author=b'test <email>',
                         committer=b'test <email>')

        # Setup target repo
        self.target_path = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, self.target_path)
        target_repo = porcelain.clone(self.repo.path, target=self.target_path,
                                      errstream=BytesIO())
        target_repo.close()

        # create a second file to be pushed
        handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
        os.close(handle)
        porcelain.add(repo=self.repo.path, paths=fullpath)
        porcelain.commit(repo=self.repo.path, message=b'test2',
                         author=b'test2 <email>',
                         committer=b'test2 <email>')

        self.assertTrue(b'refs/heads/master' in self.repo.refs)
        self.assertTrue(b'refs/heads/master' in target_repo.refs)

    def test_simple(self):
        outstream = BytesIO()
        errstream = BytesIO()

        # Pull changes into the cloned repo
        porcelain.pull(self.target_path, self.repo.path, b'refs/heads/master',
                       outstream=outstream, errstream=errstream)

        # Check the target repo for pushed changes
        with Repo(self.target_path) as r:
            self.assertEqual(r[b'HEAD'].id, self.repo[b'HEAD'].id)

    def test_no_refspec(self):
        outstream = BytesIO()
        errstream = BytesIO()

        # Pull changes into the cloned repo
        porcelain.pull(self.target_path, self.repo.path, outstream=outstream,
                       errstream=errstream)

        # Check the target repo for pushed changes
        with Repo(self.target_path) as r:
            self.assertEqual(r[b'HEAD'].id, self.repo[b'HEAD'].id)


class StatusTests(PorcelainTestCase):

    def test_empty(self):
        results = porcelain.status(self.repo)
        self.assertEqual(
            {'add': [], 'delete': [], 'modify': []},
            results.staged)
        self.assertEqual([], results.unstaged)

    def test_status_base(self):
        """Integration test for `status` functionality."""

        # Commit a dummy file then modify it
        fullpath = os.path.join(self.repo.path, 'foo')
        with open(fullpath, 'w') as f:
            f.write('origstuff')

        porcelain.add(repo=self.repo.path, paths=[fullpath])
        porcelain.commit(repo=self.repo.path, message=b'test status',
                         author=b'author <email>',
                         committer=b'committer <email>')

        # modify access and modify time of path
        os.utime(fullpath, (0, 0))

        with open(fullpath, 'wb') as f:
            f.write(b'stuff')

        # Make a dummy file and stage it
        filename_add = 'bar'
        fullpath = os.path.join(self.repo.path, filename_add)
        with open(fullpath, 'w') as f:
            f.write('stuff')
        porcelain.add(repo=self.repo.path, paths=fullpath)

        results = porcelain.status(self.repo)

        self.assertEqual(results.staged['add'][0],
                         filename_add.encode('ascii'))
        self.assertEqual(results.unstaged, [b'foo'])

    def test_status_all(self):
        del_path = os.path.join(self.repo.path, 'foo')
        mod_path = os.path.join(self.repo.path, 'bar')
        add_path = os.path.join(self.repo.path, 'baz')
        us_path = os.path.join(self.repo.path, 'blye')
        ut_path = os.path.join(self.repo.path, 'blyat')
        with open(del_path, 'w') as f:
            f.write('origstuff')
        with open(mod_path, 'w') as f:
            f.write('origstuff')
        with open(us_path, 'w') as f:
            f.write('origstuff')
        porcelain.add(repo=self.repo.path, paths=[del_path, mod_path, us_path])
        porcelain.commit(repo=self.repo.path, message=b'test status',
                         author=b'author <email>',
                         committer=b'committer <email>')
        porcelain.remove(self.repo.path, [del_path])
        with open(add_path, 'w') as f:
            f.write('origstuff')
        with open(mod_path, 'w') as f:
            f.write('more_origstuff')
        with open(us_path, 'w') as f:
            f.write('more_origstuff')
        porcelain.add(repo=self.repo.path, paths=[add_path, mod_path])
        with open(us_path, 'w') as f:
            f.write('\norigstuff')
        with open(ut_path, 'w') as f:
            f.write('origstuff')
        results = porcelain.status(self.repo.path)
        self.assertDictEqual(
            {'add': [b'baz'], 'delete': [b'foo'], 'modify': [b'bar']},
            results.staged)
        self.assertListEqual(results.unstaged, [b'blye'])
        self.assertListEqual(results.untracked, ['blyat'])

    def test_status_crlf_mismatch(self):
        # First make a commit as if the file has been added on a Linux system
        # or with core.autocrlf=True
        file_path = os.path.join(self.repo.path, 'crlf')
        with open(file_path, 'wb') as f:
            f.write(b'line1\nline2')
        porcelain.add(repo=self.repo.path, paths=[file_path])
        porcelain.commit(repo=self.repo.path, message=b'test status',
                         author=b'author <email>',
                         committer=b'committer <email>')

        # Then update the file as if it was created by CGit on a Windows
        # system with core.autocrlf=true
        with open(file_path, 'wb') as f:
            f.write(b'line1\r\nline2')

        results = porcelain.status(self.repo)
        self.assertDictEqual(
            {'add': [], 'delete': [], 'modify': []},
            results.staged)
        self.assertListEqual(results.unstaged, [b'crlf'])
        self.assertListEqual(results.untracked, [])

    def test_status_crlf_convert(self):
        # First make a commit as if the file has been added on a Linux system
        # or with core.autocrlf=True
        file_path = os.path.join(self.repo.path, 'crlf')
        with open(file_path, 'wb') as f:
            f.write(b'line1\nline2')
        porcelain.add(repo=self.repo.path, paths=[file_path])
        porcelain.commit(repo=self.repo.path, message=b'test status',
                         author=b'author <email>',
                         committer=b'committer <email>')

        # Then update the file as if it was created by CGit on a Windows
        # system with core.autocrlf=true
        with open(file_path, 'wb') as f:
            f.write(b'line1\r\nline2')

        # TODO: It should be set automatically by looking at the configuration
        c = self.repo.get_config()
        c.set("core", "autocrlf", True)
        c.write_to_path()

        results = porcelain.status(self.repo)
        self.assertDictEqual(
            {'add': [], 'delete': [], 'modify': []},
            results.staged)
        self.assertListEqual(results.unstaged, [])
        self.assertListEqual(results.untracked, [])

    def test_get_tree_changes_add(self):
        """Unit test for get_tree_changes add."""

        # Make a dummy file, stage
        filename = 'bar'
        fullpath = os.path.join(self.repo.path, filename)
        with open(fullpath, 'w') as f:
            f.write('stuff')
        porcelain.add(repo=self.repo.path, paths=fullpath)
        porcelain.commit(repo=self.repo.path, message=b'test status',
                         author=b'author <email>',
                         committer=b'committer <email>')

        filename = 'foo'
        fullpath = os.path.join(self.repo.path, filename)
        with open(fullpath, 'w') as f:
            f.write('stuff')
        porcelain.add(repo=self.repo.path, paths=fullpath)
        changes = porcelain.get_tree_changes(self.repo.path)

        self.assertEqual(changes['add'][0], filename.encode('ascii'))
        self.assertEqual(len(changes['add']), 1)
        self.assertEqual(len(changes['modify']), 0)
        self.assertEqual(len(changes['delete']), 0)

    def test_get_tree_changes_modify(self):
        """Unit test for get_tree_changes modify."""

        # Make a dummy file, stage, commit, modify
        filename = 'foo'
        fullpath = os.path.join(self.repo.path, filename)
        with open(fullpath, 'w') as f:
            f.write('stuff')
        porcelain.add(repo=self.repo.path, paths=fullpath)
        porcelain.commit(repo=self.repo.path, message=b'test status',
                         author=b'author <email>',
                         committer=b'committer <email>')
        with open(fullpath, 'w') as f:
            f.write('otherstuff')
        porcelain.add(repo=self.repo.path, paths=fullpath)
        changes = porcelain.get_tree_changes(self.repo.path)

        self.assertEqual(changes['modify'][0], filename.encode('ascii'))
        self.assertEqual(len(changes['add']), 0)
        self.assertEqual(len(changes['modify']), 1)
        self.assertEqual(len(changes['delete']), 0)

    def test_get_tree_changes_delete(self):
        """Unit test for get_tree_changes delete."""

        # Make a dummy file, stage, commit, remove
        filename = 'foo'
        fullpath = os.path.join(self.repo.path, filename)
        with open(fullpath, 'w') as f:
            f.write('stuff')
        porcelain.add(repo=self.repo.path, paths=fullpath)
        porcelain.commit(repo=self.repo.path, message=b'test status',
                         author=b'author <email>',
                         committer=b'committer <email>')
        cwd = os.getcwd()
        try:
            os.chdir(self.repo.path)
            porcelain.remove(repo=self.repo.path, paths=[filename])
        finally:
            os.chdir(cwd)
        changes = porcelain.get_tree_changes(self.repo.path)

        self.assertEqual(changes['delete'][0], filename.encode('ascii'))
        self.assertEqual(len(changes['add']), 0)
        self.assertEqual(len(changes['modify']), 0)
        self.assertEqual(len(changes['delete']), 1)

    def test_get_untracked_paths(self):
        with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f:
            f.write('ignored\n')
        with open(os.path.join(self.repo.path, 'ignored'), 'w') as f:
            f.write('blah\n')
        with open(os.path.join(self.repo.path, 'notignored'), 'w') as f:
            f.write('blah\n')
        self.assertEqual(
            set(['ignored', 'notignored', '.gitignore']),
            set(porcelain.get_untracked_paths(self.repo.path, self.repo.path,
                                              self.repo.open_index())))
        self.assertEqual(set(['.gitignore', 'notignored']),
                         set(porcelain.status(self.repo).untracked))
        self.assertEqual(set(['.gitignore', 'notignored', 'ignored']),
                         set(porcelain.status(self.repo, ignored=True)
                             .untracked))

    def test_get_untracked_paths_nested(self):
        with open(os.path.join(self.repo.path, 'notignored'), 'w') as f:
            f.write('blah\n')
        subrepo = Repo.init(os.path.join(self.repo.path, 'nested'), mkdir=True)
        with open(os.path.join(subrepo.path, 'another'), 'w') as f:
            f.write('foo\n')

        self.assertEqual(
            set(['notignored']),
            set(porcelain.get_untracked_paths(self.repo.path, self.repo.path,
                                              self.repo.open_index())))
        self.assertEqual(
            set(['another']),
            set(porcelain.get_untracked_paths(subrepo.path, subrepo.path,
                                              subrepo.open_index())))


# TODO(jelmer): Add test for dulwich.porcelain.daemon


class UploadPackTests(PorcelainTestCase):
    """Tests for upload_pack."""

    def test_upload_pack(self):
        outf = BytesIO()
        exitcode = porcelain.upload_pack(
                self.repo.path, BytesIO(b"0000"), outf)
        outlines = outf.getvalue().splitlines()
        self.assertEqual([b"0000"], outlines)
        self.assertEqual(0, exitcode)


class ReceivePackTests(PorcelainTestCase):
    """Tests for receive_pack."""

    def test_receive_pack(self):
        filename = 'foo'
        fullpath = os.path.join(self.repo.path, filename)
        with open(fullpath, 'w') as f:
            f.write('stuff')
        porcelain.add(repo=self.repo.path, paths=fullpath)
        self.repo.do_commit(message=b'test status',
                            author=b'author <email>',
                            committer=b'committer <email>',
                            author_timestamp=1402354300,
                            commit_timestamp=1402354300, author_timezone=0,
                            commit_timezone=0)
        outf = BytesIO()
        exitcode = porcelain.receive_pack(
                self.repo.path, BytesIO(b"0000"), outf)
        outlines = outf.getvalue().splitlines()
        self.assertEqual([
            b'0091319b56ce3aee2d489f759736a79cc552c9bb86d9 HEAD\x00 report-status '  # noqa: E501
            b'delete-refs quiet ofs-delta side-band-64k '
            b'no-done symref=HEAD:refs/heads/master',
           b'003f319b56ce3aee2d489f759736a79cc552c9bb86d9 refs/heads/master',
            b'0000'], outlines)
        self.assertEqual(0, exitcode)


class BranchListTests(PorcelainTestCase):

    def test_standard(self):
        self.assertEqual(set([]), set(porcelain.branch_list(self.repo)))

    def test_new_branch(self):
        [c1] = build_commit_graph(self.repo.object_store, [[1]])
        self.repo[b"HEAD"] = c1.id
        porcelain.branch_create(self.repo, b"foo")
        self.assertEqual(
            set([b"master", b"foo"]),
            set(porcelain.branch_list(self.repo)))


class BranchCreateTests(PorcelainTestCase):

    def test_branch_exists(self):
        [c1] = build_commit_graph(self.repo.object_store, [[1]])
        self.repo[b"HEAD"] = c1.id
        porcelain.branch_create(self.repo, b"foo")
        self.assertRaises(KeyError, porcelain.branch_create, self.repo, b"foo")
        porcelain.branch_create(self.repo, b"foo", force=True)

    def test_new_branch(self):
        [c1] = build_commit_graph(self.repo.object_store, [[1]])
        self.repo[b"HEAD"] = c1.id
        porcelain.branch_create(self.repo, b"foo")
        self.assertEqual(
            set([b"master", b"foo"]),
            set(porcelain.branch_list(self.repo)))


class BranchDeleteTests(PorcelainTestCase):

    def test_simple(self):
        [c1] = build_commit_graph(self.repo.object_store, [[1]])
        self.repo[b"HEAD"] = c1.id
        porcelain.branch_create(self.repo, b'foo')
        self.assertTrue(b"foo" in porcelain.branch_list(self.repo))
        porcelain.branch_delete(self.repo, b'foo')
        self.assertFalse(b"foo" in porcelain.branch_list(self.repo))

    def test_simple_unicode(self):
        [c1] = build_commit_graph(self.repo.object_store, [[1]])
        self.repo[b"HEAD"] = c1.id
        porcelain.branch_create(self.repo, 'foo')
        self.assertTrue(b"foo" in porcelain.branch_list(self.repo))
        porcelain.branch_delete(self.repo, 'foo')
        self.assertFalse(b"foo" in porcelain.branch_list(self.repo))


class FetchTests(PorcelainTestCase):

    def test_simple(self):
        outstream = BytesIO()
        errstream = BytesIO()

        # create a file for initial commit
        handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
        os.close(handle)
        porcelain.add(repo=self.repo.path, paths=fullpath)
        porcelain.commit(repo=self.repo.path, message=b'test',
                         author=b'test <email>',
                         committer=b'test <email>')

        # Setup target repo
        target_path = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, target_path)
        target_repo = porcelain.clone(self.repo.path, target=target_path,
                                      errstream=errstream)

        # create a second file to be pushed
        handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
        os.close(handle)
        porcelain.add(repo=self.repo.path, paths=fullpath)
        porcelain.commit(repo=self.repo.path, message=b'test2',
                         author=b'test2 <email>',
                         committer=b'test2 <email>')

        self.assertFalse(self.repo[b'HEAD'].id in target_repo)
        target_repo.close()

        # Fetch changes into the cloned repo
        porcelain.fetch(target_path, self.repo.path,
                        outstream=outstream, errstream=errstream)

        # Assert that fetch updated the local image of the remote
        self.assert_correct_remote_refs(
            target_repo.get_refs(), self.repo.get_refs())

        # Check the target repo for pushed changes
        with Repo(target_path) as r:
            self.assertTrue(self.repo[b'HEAD'].id in r)

    def test_with_remote_name(self):
        remote_name = b'origin'
        outstream = BytesIO()
        errstream = BytesIO()

        # create a file for initial commit
        handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
        os.close(handle)
        porcelain.add(repo=self.repo.path, paths=fullpath)
        porcelain.commit(repo=self.repo.path, message=b'test',
                         author=b'test <email>',
                         committer=b'test <email>')

        # Setup target repo
        target_path = tempfile.mkdtemp()
        self.addCleanup(shutil.rmtree, target_path)
        target_repo = porcelain.clone(self.repo.path, target=target_path,
                                      errstream=errstream)

        # Capture current refs
        target_refs = target_repo.get_refs()

        # create a second file to be pushed
        handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
        os.close(handle)
        porcelain.add(repo=self.repo.path, paths=fullpath)
        porcelain.commit(repo=self.repo.path, message=b'test2',
                         author=b'test2 <email>',
                         committer=b'test2 <email>')

        self.assertFalse(self.repo[b'HEAD'].id in target_repo)
        target_repo.close()

        # Fetch changes into the cloned repo
        porcelain.fetch(target_path, self.repo.path, remote_name=remote_name,
                        outstream=outstream, errstream=errstream)

        # Assert that fetch updated the local image of the remote
        self.assert_correct_remote_refs(
            target_repo.get_refs(), self.repo.get_refs())

        # Check the target repo for pushed changes, as well as updates
        # for the refs
        with Repo(target_path) as r:
            self.assertTrue(self.repo[b'HEAD'].id in r)
            self.assertNotEqual(self.repo.get_refs(), target_refs)

    def assert_correct_remote_refs(
            self, local_refs, remote_refs, remote_name=b'origin'):
        """Assert that known remote refs corresponds to actual remote refs."""
        local_ref_prefix = b'refs/heads'
        remote_ref_prefix = b'refs/remotes/' + remote_name

        locally_known_remote_refs = {
            k[len(remote_ref_prefix) + 1:]: v for k, v in local_refs.items()
            if k.startswith(remote_ref_prefix)}

        normalized_remote_refs = {
            k[len(local_ref_prefix) + 1:]: v for k, v in remote_refs.items()
            if k.startswith(local_ref_prefix)}

        self.assertEqual(locally_known_remote_refs, normalized_remote_refs)


class RepackTests(PorcelainTestCase):

    def test_empty(self):
        porcelain.repack(self.repo)

    def test_simple(self):
        handle, fullpath = tempfile.mkstemp(dir=self.repo.path)
        os.close(handle)
        porcelain.add(repo=self.repo.path, paths=fullpath)
        porcelain.repack(self.repo)


class LsTreeTests(PorcelainTestCase):

    def test_empty(self):
        porcelain.commit(repo=self.repo.path, message=b'test status',
                         author=b'author <email>',
                         committer=b'committer <email>')

        f = StringIO()
        porcelain.ls_tree(self.repo, b"HEAD", outstream=f)
        self.assertEqual(f.getvalue(), "")

    def test_simple(self):
        # Commit a dummy file then modify it
        fullpath = os.path.join(self.repo.path, 'foo')
        with open(fullpath, 'w') as f:
            f.write('origstuff')

        porcelain.add(repo=self.repo.path, paths=[fullpath])
        porcelain.commit(repo=self.repo.path, message=b'test status',
                         author=b'author <email>',
                         committer=b'committer <email>')

        f = StringIO()
        porcelain.ls_tree(self.repo, b"HEAD", outstream=f)
        self.assertEqual(
                f.getvalue(),
                '100644 blob 8b82634d7eae019850bb883f06abf428c58bc9aa\tfoo\n')

    def test_recursive(self):
        # Create a directory then write a dummy file in it
        dirpath = os.path.join(self.repo.path, 'adir')
        filepath = os.path.join(dirpath, 'afile')
        os.mkdir(dirpath)
        with open(filepath, 'w') as f:
            f.write('origstuff')
        porcelain.add(repo=self.repo.path, paths=[filepath])
        porcelain.commit(repo=self.repo.path, message=b'test status',
                         author=b'author <email>',
                         committer=b'committer <email>')
        f = StringIO()
        porcelain.ls_tree(self.repo, b"HEAD", outstream=f)
        self.assertEqual(
                f.getvalue(),
                '40000 tree b145cc69a5e17693e24d8a7be0016ed8075de66d\tadir\n')
        f = StringIO()
        porcelain.ls_tree(self.repo, b"HEAD", outstream=f, recursive=True)
        self.assertEqual(
                f.getvalue(),
                '40000 tree b145cc69a5e17693e24d8a7be0016ed8075de66d\tadir\n'
                '100644 blob 8b82634d7eae019850bb883f06abf428c58bc9aa\tadir'
                '/afile\n')


class LsRemoteTests(PorcelainTestCase):

    def test_empty(self):
        self.assertEqual({}, porcelain.ls_remote(self.repo.path))

    def test_some(self):
        cid = porcelain.commit(repo=self.repo.path, message=b'test status',
                               author=b'author <email>',
                               committer=b'committer <email>')

        self.assertEqual({
            b'refs/heads/master': cid,
            b'HEAD': cid},
            porcelain.ls_remote(self.repo.path))


class LsFilesTests(PorcelainTestCase):

    def test_empty(self):
        self.assertEqual([], list(porcelain.ls_files(self.repo)))

    def test_simple(self):
        # Commit a dummy file then modify it
        fullpath = os.path.join(self.repo.path, 'foo')
        with open(fullpath, 'w') as f:
            f.write('origstuff')

        porcelain.add(repo=self.repo.path, paths=[fullpath])
        self.assertEqual([b'foo'], list(porcelain.ls_files(self.repo)))


class RemoteAddTests(PorcelainTestCase):

    def test_new(self):
        porcelain.remote_add(
            self.repo, 'jelmer', 'git://jelmer.uk/code/dulwich')
        c = self.repo.get_config()
        self.assertEqual(
            c.get((b'remote', b'jelmer'), b'url'),
            b'git://jelmer.uk/code/dulwich')

    def test_exists(self):
        porcelain.remote_add(
            self.repo, 'jelmer', 'git://jelmer.uk/code/dulwich')
        self.assertRaises(porcelain.RemoteExists, porcelain.remote_add,
                          self.repo, 'jelmer', 'git://jelmer.uk/code/dulwich')


class CheckIgnoreTests(PorcelainTestCase):

    def test_check_ignored(self):
        with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f:
            f.write('foo')
        foo_path = os.path.join(self.repo.path, 'foo')
        with open(foo_path, 'w') as f:
            f.write('BAR')
        bar_path = os.path.join(self.repo.path, 'bar')
        with open(bar_path, 'w') as f:
            f.write('BAR')
        self.assertEqual(
            ['foo'],
            list(porcelain.check_ignore(self.repo, [foo_path])))
        self.assertEqual(
            [], list(porcelain.check_ignore(self.repo, [bar_path])))

    def test_check_added_abs(self):
        path = os.path.join(self.repo.path, 'foo')
        with open(path, 'w') as f:
            f.write('BAR')
        self.repo.stage(['foo'])
        with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f:
            f.write('foo\n')
        self.assertEqual(
            [], list(porcelain.check_ignore(self.repo, [path])))
        self.assertEqual(
            ['foo'],
            list(porcelain.check_ignore(self.repo, [path], no_index=True)))

    def test_check_added_rel(self):
        with open(os.path.join(self.repo.path, 'foo'), 'w') as f:
            f.write('BAR')
        self.repo.stage(['foo'])
        with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f:
            f.write('foo\n')
        cwd = os.getcwd()
        os.mkdir(os.path.join(self.repo.path, 'bar'))
        os.chdir(os.path.join(self.repo.path, 'bar'))
        try:
            self.assertEqual(
                list(porcelain.check_ignore(self.repo, ['../foo'])), [])
            self.assertEqual(['../foo'], list(
               porcelain.check_ignore(self.repo, ['../foo'], no_index=True)))
        finally:
            os.chdir(cwd)


class UpdateHeadTests(PorcelainTestCase):

    def test_set_to_branch(self):
        [c1] = build_commit_graph(self.repo.object_store, [[1]])
        self.repo.refs[b"refs/heads/blah"] = c1.id
        porcelain.update_head(self.repo, "blah")
        self.assertEqual(c1.id, self.repo.head())
        self.assertEqual(b'ref: refs/heads/blah',
                         self.repo.refs.read_ref(b'HEAD'))

    def test_set_to_branch_detached(self):
        [c1] = build_commit_graph(self.repo.object_store, [[1]])
        self.repo.refs[b"refs/heads/blah"] = c1.id
        porcelain.update_head(self.repo, "blah", detached=True)
        self.assertEqual(c1.id, self.repo.head())
        self.assertEqual(c1.id, self.repo.refs.read_ref(b'HEAD'))

    def test_set_to_commit_detached(self):
        [c1] = build_commit_graph(self.repo.object_store, [[1]])
        self.repo.refs[b"refs/heads/blah"] = c1.id
        porcelain.update_head(self.repo, c1.id, detached=True)
        self.assertEqual(c1.id, self.repo.head())
        self.assertEqual(c1.id, self.repo.refs.read_ref(b'HEAD'))

    def test_set_new_branch(self):
        [c1] = build_commit_graph(self.repo.object_store, [[1]])
        self.repo.refs[b"refs/heads/blah"] = c1.id
        porcelain.update_head(self.repo, "blah", new_branch="bar")
        self.assertEqual(c1.id, self.repo.head())
        self.assertEqual(b'ref: refs/heads/bar',
                         self.repo.refs.read_ref(b'HEAD'))


class MailmapTests(PorcelainTestCase):

    def test_no_mailmap(self):
        self.assertEqual(
            b'Jelmer Vernooij <jelmer@samba.org>',
            porcelain.check_mailmap(
                self.repo, b'Jelmer Vernooij <jelmer@samba.org>'))

    def test_mailmap_lookup(self):
        with open(os.path.join(self.repo.path, '.mailmap'), 'wb') as f:
            f.write(b"""\
Jelmer Vernooij <jelmer@debian.org>
""")
        self.assertEqual(
            b'Jelmer Vernooij <jelmer@debian.org>',
            porcelain.check_mailmap(
                self.repo, b'Jelmer Vernooij <jelmer@samba.org>'))


class FsckTests(PorcelainTestCase):

    def test_none(self):
        self.assertEqual(
                [],
                list(porcelain.fsck(self.repo)))

    def test_git_dir(self):
        obj = Tree()
        a = Blob()
        a.data = b"foo"
        obj.add(b".git", 0o100644, a.id)
        self.repo.object_store.add_objects(
            [(a, None), (obj, None)])
        self.assertEqual(
                [(obj.id, 'invalid name .git')],
                [(sha, str(e)) for (sha, e) in porcelain.fsck(self.repo)])


class DescribeTests(PorcelainTestCase):

    def test_no_commits(self):
        self.assertRaises(KeyError, porcelain.describe, self.repo.path)

    def test_single_commit(self):
        fullpath = os.path.join(self.repo.path, 'foo')
        with open(fullpath, 'w') as f:
            f.write("BAR")
        porcelain.add(repo=self.repo.path, paths=[fullpath])
        sha = porcelain.commit(
                self.repo.path, message=b"Some message",
                author=b"Joe <joe@example.com>",
                committer=b"Bob <bob@example.com>")
        self.assertEqual(
                'g{}'.format(sha[:7].decode('ascii')),
                porcelain.describe(self.repo.path))

    def test_tag(self):
        fullpath = os.path.join(self.repo.path, 'foo')
        with open(fullpath, 'w') as f:
            f.write("BAR")
        porcelain.add(repo=self.repo.path, paths=[fullpath])
        porcelain.commit(
                self.repo.path, message=b"Some message",
                author=b"Joe <joe@example.com>",
                committer=b"Bob <bob@example.com>")
        porcelain.tag_create(self.repo.path, b"tryme", b'foo <foo@bar.com>',
                             b'bar', annotated=True)
        self.assertEqual(
                "tryme",
                porcelain.describe(self.repo.path))

    def test_tag_and_commit(self):
        fullpath = os.path.join(self.repo.path, 'foo')
        with open(fullpath, 'w') as f:
            f.write("BAR")
        porcelain.add(repo=self.repo.path, paths=[fullpath])
        porcelain.commit(
                self.repo.path, message=b"Some message",
                author=b"Joe <joe@example.com>",
                committer=b"Bob <bob@example.com>")
        porcelain.tag_create(self.repo.path, b"tryme", b'foo <foo@bar.com>',
                             b'bar', annotated=True)
        with open(fullpath, 'w') as f:
            f.write("BAR2")
        porcelain.add(repo=self.repo.path, paths=[fullpath])
        sha = porcelain.commit(
                self.repo.path, message=b"Some message",
                author=b"Joe <joe@example.com>",
                committer=b"Bob <bob@example.com>")
        self.assertEqual(
                'tryme-1-g{}'.format(sha[:7].decode('ascii')),
                porcelain.describe(self.repo.path))


class HelperTests(PorcelainTestCase):

    def test_path_to_tree_path_base(self):
        self.assertEqual(
            b'bar', porcelain.path_to_tree_path('/home/foo', '/home/foo/bar'))
        self.assertEqual(b'bar', porcelain.path_to_tree_path('.', './bar'))
        self.assertEqual(b'bar', porcelain.path_to_tree_path('.', 'bar'))
        cwd = os.getcwd()
        self.assertEqual(
            b'bar', porcelain.path_to_tree_path('.', os.path.join(cwd, 'bar')))
        self.assertEqual(b'bar', porcelain.path_to_tree_path(cwd, 'bar'))

    def test_path_to_tree_path_syntax(self):
        self.assertEqual(b'bar', porcelain.path_to_tree_path(b'.', './bar'))
        self.assertEqual(b'bar', porcelain.path_to_tree_path('.', b'./bar'))
        self.assertEqual(b'bar', porcelain.path_to_tree_path(b'.', b'./bar'))

    def test_path_to_tree_path_error(self):
        with self.assertRaises(ValueError):
            porcelain.path_to_tree_path('/home/foo/', '/home/bar/baz')

    def test_path_to_tree_path_rel(self):
        cwd = os.getcwd()
        os.mkdir(os.path.join(self.repo.path, 'foo'))
        os.mkdir(os.path.join(self.repo.path, 'foo/bar'))
        try:
            os.chdir(os.path.join(self.repo.path, 'foo/bar'))
            self.assertEqual(b'bar/baz', porcelain.path_to_tree_path(
                '..', 'baz'))
            self.assertEqual(b'bar/baz', porcelain.path_to_tree_path(
                os.path.join(os.getcwd(), '..'),
                os.path.join(os.getcwd(), 'baz')))
            self.assertEqual(b'bar/baz', porcelain.path_to_tree_path(
                '..', os.path.join(os.getcwd(), 'baz')))
            self.assertEqual(b'bar/baz', porcelain.path_to_tree_path(
                os.path.join(os.getcwd(), '..'), 'baz'))
        finally:
            os.chdir(cwd)


class GetObjectBypathTests(PorcelainTestCase):

    def test_simple(self):
        fullpath = os.path.join(self.repo.path, 'foo')
        with open(fullpath, 'w') as f:
            f.write("BAR")
        porcelain.add(repo=self.repo.path, paths=[fullpath])
        porcelain.commit(
                self.repo.path, message=b"Some message",
                author=b"Joe <joe@example.com>",
                committer=b"Bob <bob@example.com>")
        self.assertEqual(
            b"BAR",
            porcelain.get_object_by_path(self.repo, 'foo').data)

    def test_missing(self):
        self.assertRaises(
            KeyError,
            porcelain.get_object_by_path, self.repo, 'foo')


class WriteTreeTests(PorcelainTestCase):

    def test_simple(self):
        fullpath = os.path.join(self.repo.path, 'foo')
        with open(fullpath, 'w') as f:
            f.write("BAR")
        porcelain.add(repo=self.repo.path, paths=[fullpath])
        self.assertEqual(
            b'd2092c8a9f311f0311083bf8d177f2ca0ab5b241',
            porcelain.write_tree(self.repo))