# test_porcelain.py -- porcelain tests
# Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#
"""Tests for dulwich.porcelain."""
from io import BytesIO
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
import os
import shutil
import tarfile
import tempfile
import time
from dulwich import porcelain
from dulwich.diff_tree import tree_changes
from dulwich.objects import (
Blob,
Tag,
Tree,
ZERO_SHA,
)
from dulwich.repo import (
NoIndexPresent,
Repo,
)
from dulwich.tests import (
TestCase,
)
from dulwich.tests.utils import (
build_commit_graph,
make_commit,
make_object,
)
class PorcelainTestCase(TestCase):
def setUp(self):
super(PorcelainTestCase, self).setUp()
self.test_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.test_dir)
self.repo = Repo.init(os.path.join(self.test_dir, 'repo'), mkdir=True)
self.addCleanup(self.repo.close)
class ArchiveTests(PorcelainTestCase):
"""Tests for the archive command."""
def test_simple(self):
c1, c2, c3 = build_commit_graph(
self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"refs/heads/master"] = c3.id
out = BytesIO()
err = BytesIO()
porcelain.archive(self.repo.path, b"refs/heads/master", outstream=out,
errstream=err)
self.assertEqual(b"", err.getvalue())
tf = tarfile.TarFile(fileobj=out)
self.addCleanup(tf.close)
self.assertEqual([], tf.getnames())
class UpdateServerInfoTests(PorcelainTestCase):
def test_simple(self):
c1, c2, c3 = build_commit_graph(
self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"refs/heads/foo"] = c3.id
porcelain.update_server_info(self.repo.path)
self.assertTrue(os.path.exists(
os.path.join(self.repo.controldir(), 'info', 'refs')))
class CommitTests(PorcelainTestCase):
def test_custom_author(self):
c1, c2, c3 = build_commit_graph(
self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"refs/heads/foo"] = c3.id
sha = porcelain.commit(
self.repo.path, message=b"Some message",
author=b"Joe <joe@example.com>",
committer=b"Bob <bob@example.com>")
self.assertTrue(isinstance(sha, bytes))
self.assertEqual(len(sha), 40)
def test_unicode(self):
c1, c2, c3 = build_commit_graph(
self.repo.object_store, [[1], [2, 1], [3, 1, 2]])
self.repo.refs[b"refs/heads/foo"] = c3.id
sha = porcelain.commit(
self.repo.path, message="Some message",
author="Joe <joe@example.com>",
committer="Bob <bob@example.com>")
self.assertTrue(isinstance(sha, bytes))
self.assertEqual(len(sha), 40)
class CloneTests(PorcelainTestCase):
def test_simple_local(self):
f1_1 = make_object(Blob, data=b'f1')
commit_spec = [[1], [2, 1], [3, 1, 2]]
trees = {1: [(b'f1', f1_1), (b'f2', f1_1)],
2: [(b'f1', f1_1), (b'f2', f1_1)],
3: [(b'f1', f1_1), (b'f2', f1_1)], }
c1, c2, c3 = build_commit_graph(self.repo.object_store,
commit_spec, trees)
self.repo.refs[b"refs/heads/master"] = c3.id
self.repo.refs[b"refs/tags/foo"] = c3.id
target_path = tempfile.mkdtemp()
errstream = BytesIO()
self.addCleanup(shutil.rmtree, target_path)
r = porcelain.clone(self.repo.path, target_path,
checkout=False, errstream=errstream)
self.addCleanup(r.close)
self.assertEqual(r.path, target_path)
target_repo = Repo(target_path)
self.assertEqual(0, len(target_repo.open_index()))
self.assertEqual(c3.id, target_repo.refs[b'refs/tags/foo'])
self.assertTrue(b'f1' not in os.listdir(target_path))
self.assertTrue(b'f2' not in os.listdir(target_path))
c = r.get_config()
encoded_path = self.repo.path
if not isinstance(encoded_path, bytes):
encoded_path = encoded_path.encode('utf-8')
self.assertEqual(encoded_path, c.get((b'remote', b'origin'), b'url'))
self.assertEqual(
b'+refs/heads/*:refs/remotes/origin/*',
c.get((b'remote', b'origin'), b'fetch'))
def test_simple_local_with_checkout(self):
f1_1 = make_object(Blob, data=b'f1')
commit_spec = [[1], [2, 1], [3, 1, 2]]
trees = {1: [(b'f1', f1_1), (b'f2', f1_1)],
2: [(b'f1', f1_1), (b'f2', f1_1)],
3: [(b'f1', f1_1), (b'f2', f1_1)], }
c1, c2, c3 = build_commit_graph(self.repo.object_store,
commit_spec, trees)
self.repo.refs[b"refs/heads/master"] = c3.id
target_path = tempfile.mkdtemp()
errstream = BytesIO()
self.addCleanup(shutil.rmtree, target_path)
with porcelain.clone(self.repo.path, target_path,
checkout=True,
errstream=errstream) as r:
self.assertEqual(r.path, target_path)
with Repo(target_path) as r:
self.assertEqual(r.head(), c3.id)
self.assertTrue('f1' in os.listdir(target_path))
self.assertTrue('f2' in os.listdir(target_path))
def test_bare_local_with_checkout(self):
f1_1 = make_object(Blob, data=b'f1')
commit_spec = [[1], [2, 1], [3, 1, 2]]
trees = {1: [(b'f1', f1_1), (b'f2', f1_1)],
2: [(b'f1', f1_1), (b'f2', f1_1)],
3: [(b'f1', f1_1), (b'f2', f1_1)], }
c1, c2, c3 = build_commit_graph(self.repo.object_store,
commit_spec, trees)
self.repo.refs[b"refs/heads/master"] = c3.id
target_path = tempfile.mkdtemp()
errstream = BytesIO()
self.addCleanup(shutil.rmtree, target_path)
with porcelain.clone(
self.repo.path, target_path, bare=True,
errstream=errstream) as r:
self.assertEqual(r.path, target_path)
with Repo(target_path) as r:
r.head()
self.assertRaises(NoIndexPresent, r.open_index)
self.assertFalse(b'f1' in os.listdir(target_path))
self.assertFalse(b'f2' in os.listdir(target_path))
def test_no_checkout_with_bare(self):
f1_1 = make_object(Blob, data=b'f1')
commit_spec = [[1]]
trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]}
(c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees)
self.repo.refs[b"refs/heads/master"] = c1.id
self.repo.refs[b"HEAD"] = c1.id
target_path = tempfile.mkdtemp()
errstream = BytesIO()
self.addCleanup(shutil.rmtree, target_path)
self.assertRaises(
ValueError, porcelain.clone, self.repo.path,
target_path, checkout=True, bare=True, errstream=errstream)
def test_no_head_no_checkout(self):
f1_1 = make_object(Blob, data=b'f1')
commit_spec = [[1]]
trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]}
(c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees)
self.repo.refs[b"refs/heads/master"] = c1.id
target_path = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, target_path)
errstream = BytesIO()
r = porcelain.clone(
self.repo.path, target_path, checkout=True, errstream=errstream)
r.close()
def test_no_head_no_checkout_outstream_errstream_autofallback(self):
f1_1 = make_object(Blob, data=b'f1')
commit_spec = [[1]]
trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]}
(c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees)
self.repo.refs[b"refs/heads/master"] = c1.id
target_path = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, target_path)
errstream = porcelain.NoneStream()
r = porcelain.clone(
self.repo.path, target_path, checkout=True, errstream=errstream)
r.close()
class InitTests(TestCase):
def test_non_bare(self):
repo_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, repo_dir)
porcelain.init(repo_dir)
def test_bare(self):
repo_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, repo_dir)
porcelain.init(repo_dir, bare=True)
class AddTests(PorcelainTestCase):
def test_add_default_paths(self):
# create a file for initial commit
fullpath = os.path.join(self.repo.path, 'blah')
with open(fullpath, 'w') as f:
f.write("\n")
porcelain.add(repo=self.repo.path, paths=[fullpath])
porcelain.commit(repo=self.repo.path, message=b'test',
author=b'test <email>', committer=b'test <email>')
# Add a second test file and a file in a directory
with open(os.path.join(self.repo.path, 'foo'), 'w') as f:
f.write("\n")
os.mkdir(os.path.join(self.repo.path, 'adir'))
with open(os.path.join(self.repo.path, 'adir', 'afile'), 'w') as f:
f.write("\n")
cwd = os.getcwd()
try:
os.chdir(self.repo.path)
porcelain.add(self.repo.path)
finally:
os.chdir(cwd)
# Check that foo was added and nothing in .git was modified
index = self.repo.open_index()
self.assertEqual(sorted(index), [b'adir/afile', b'blah', b'foo'])
def test_add_default_paths_subdir(self):
os.mkdir(os.path.join(self.repo.path, 'foo'))
with open(os.path.join(self.repo.path, 'blah'), 'w') as f:
f.write("\n")
with open(os.path.join(self.repo.path, 'foo', 'blie'), 'w') as f:
f.write("\n")
cwd = os.getcwd()
try:
os.chdir(os.path.join(self.repo.path, 'foo'))
porcelain.add(repo=self.repo.path)
porcelain.commit(repo=self.repo.path, message=b'test',
author=b'test <email>',
committer=b'test <email>')
finally:
os.chdir(cwd)
index = self.repo.open_index()
self.assertEqual(sorted(index), [b'foo/blie'])
def test_add_file(self):
fullpath = os.path.join(self.repo.path, 'foo')
with open(fullpath, 'w') as f:
f.write("BAR")
porcelain.add(self.repo.path, paths=[fullpath])
self.assertIn(b"foo", self.repo.open_index())
def test_add_ignored(self):
with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f:
f.write("foo")
with open(os.path.join(self.repo.path, 'foo'), 'w') as f:
f.write("BAR")
with open(os.path.join(self.repo.path, 'bar'), 'w') as f:
f.write("BAR")
(added, ignored) = porcelain.add(self.repo.path, paths=[
os.path.join(self.repo.path, "foo"),
os.path.join(self.repo.path, "bar")])
self.assertIn(b"bar", self.repo.open_index())
self.assertEqual(set(['bar']), set(added))
self.assertEqual(set(['foo']), ignored)
def test_add_file_absolute_path(self):
# Absolute paths are (not yet) supported
with open(os.path.join(self.repo.path, 'foo'), 'w') as f:
f.write("BAR")
porcelain.add(self.repo, paths=[os.path.join(self.repo.path, "foo")])
self.assertIn(b"foo", self.repo.open_index())
def test_add_not_in_repo(self):
with open(os.path.join(self.test_dir, 'foo'), 'w') as f:
f.write("BAR")
self.assertRaises(
ValueError,
porcelain.add, self.repo,
paths=[os.path.join(self.test_dir, "foo")])
self.assertRaises(
ValueError,
porcelain.add, self.repo,
paths=["../foo"])
self.assertEqual([], list(self.repo.open_index()))
def test_add_file_clrf_conversion(self):
# Set the right configuration to the repo
Loading ...