# test_server.py -- Tests for the git server
# Copyright (C) 2010 Google, Inc.
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#
"""Tests for the smart protocol server."""
from io import BytesIO
import os
import shutil
import tempfile
import sys
from dulwich.errors import (
GitProtocolError,
NotGitRepository,
UnexpectedCommandError,
HangupException,
)
from dulwich.object_store import (
MemoryObjectStore,
)
from dulwich.repo import (
MemoryRepo,
Repo,
)
from dulwich.server import (
Backend,
DictBackend,
FileSystemBackend,
MultiAckGraphWalkerImpl,
MultiAckDetailedGraphWalkerImpl,
PackHandler,
_split_proto_line,
serve_command,
_find_shallow,
_ProtocolGraphWalker,
ReceivePackHandler,
SingleAckGraphWalkerImpl,
UploadPackHandler,
update_server_info,
)
from dulwich.tests import TestCase
from dulwich.tests.utils import (
make_commit,
make_tag,
)
from dulwich.protocol import (
ZERO_SHA,
)
ONE = b'1' * 40
TWO = b'2' * 40
THREE = b'3' * 40
FOUR = b'4' * 40
FIVE = b'5' * 40
SIX = b'6' * 40
class TestProto(object):
def __init__(self):
self._output = []
self._received = {0: [], 1: [], 2: [], 3: []}
def set_output(self, output_lines):
self._output = output_lines
def read_pkt_line(self):
if self._output:
data = self._output.pop(0)
if data is not None:
return data.rstrip() + b'\n'
else:
# flush-pkt ('0000').
return None
else:
raise HangupException()
def write_sideband(self, band, data):
self._received[band].append(data)
def write_pkt_line(self, data):
self._received[0].append(data)
def get_received_line(self, band=0):
lines = self._received[band]
return lines.pop(0)
class TestGenericPackHandler(PackHandler):
def __init__(self):
PackHandler.__init__(self, Backend(), None)
@classmethod
def capabilities(cls):
return [b'cap1', b'cap2', b'cap3']
@classmethod
def required_capabilities(cls):
return [b'cap2']
class HandlerTestCase(TestCase):
def setUp(self):
super(HandlerTestCase, self).setUp()
self._handler = TestGenericPackHandler()
def assertSucceeds(self, func, *args, **kwargs):
try:
func(*args, **kwargs)
except GitProtocolError as e:
self.fail(e)
def test_capability_line(self):
self.assertEqual(
b' cap1 cap2 cap3',
self._handler.capability_line([b'cap1', b'cap2', b'cap3']))
def test_set_client_capabilities(self):
set_caps = self._handler.set_client_capabilities
self.assertSucceeds(set_caps, [b'cap2'])
self.assertSucceeds(set_caps, [b'cap1', b'cap2'])
# different order
self.assertSucceeds(set_caps, [b'cap3', b'cap1', b'cap2'])
# error cases
self.assertRaises(GitProtocolError, set_caps, [b'capxxx', b'cap2'])
self.assertRaises(GitProtocolError, set_caps, [b'cap1', b'cap3'])
# ignore innocuous but unknown capabilities
self.assertRaises(GitProtocolError, set_caps, [b'cap2', b'ignoreme'])
self.assertFalse(b'ignoreme' in self._handler.capabilities())
self._handler.innocuous_capabilities = lambda: (b'ignoreme',)
self.assertSucceeds(set_caps, [b'cap2', b'ignoreme'])
def test_has_capability(self):
self.assertRaises(GitProtocolError, self._handler.has_capability,
b'cap')
caps = self._handler.capabilities()
self._handler.set_client_capabilities(caps)
for cap in caps:
self.assertTrue(self._handler.has_capability(cap))
self.assertFalse(self._handler.has_capability(b'capxxx'))
class UploadPackHandlerTestCase(TestCase):
def setUp(self):
super(UploadPackHandlerTestCase, self).setUp()
self._repo = MemoryRepo.init_bare([], {})
backend = DictBackend({b'/': self._repo})
self._handler = UploadPackHandler(
backend, [b'/', b'host=lolcathost'], TestProto())
def test_progress(self):
caps = self._handler.required_capabilities()
self._handler.set_client_capabilities(caps)
self._handler.progress(b'first message')
self._handler.progress(b'second message')
self.assertEqual(b'first message',
self._handler.proto.get_received_line(2))
self.assertEqual(b'second message',
self._handler.proto.get_received_line(2))
self.assertRaises(IndexError, self._handler.proto.get_received_line, 2)
def test_no_progress(self):
caps = list(self._handler.required_capabilities()) + [b'no-progress']
self._handler.set_client_capabilities(caps)
self._handler.progress(b'first message')
self._handler.progress(b'second message')
self.assertRaises(IndexError, self._handler.proto.get_received_line, 2)
def test_get_tagged(self):
refs = {
b'refs/tags/tag1': ONE,
b'refs/tags/tag2': TWO,
b'refs/heads/master': FOUR, # not a tag, no peeled value
}
# repo needs to peel this object
self._repo.object_store.add_object(make_commit(id=FOUR))
self._repo.refs._update(refs)
peeled = {
b'refs/tags/tag1': b'1234' * 10,
b'refs/tags/tag2': b'5678' * 10,
}
self._repo.refs._update_peeled(peeled)
caps = list(self._handler.required_capabilities()) + [b'include-tag']
self._handler.set_client_capabilities(caps)
self.assertEqual({b'1234' * 10: ONE, b'5678' * 10: TWO},
self._handler.get_tagged(refs, repo=self._repo))
# non-include-tag case
caps = self._handler.required_capabilities()
self._handler.set_client_capabilities(caps)
self.assertEqual({}, self._handler.get_tagged(refs, repo=self._repo))
class FindShallowTests(TestCase):
def setUp(self):
super(FindShallowTests, self).setUp()
self._store = MemoryObjectStore()
def make_commit(self, **attrs):
commit = make_commit(**attrs)
self._store.add_object(commit)
return commit
def make_linear_commits(self, n, message=b''):
commits = []
parents = []
for _ in range(n):
commits.append(self.make_commit(parents=parents, message=message))
parents = [commits[-1].id]
return commits
def assertSameElements(self, expected, actual):
self.assertEqual(set(expected), set(actual))
def test_linear(self):
c1, c2, c3 = self.make_linear_commits(3)
self.assertEqual((set([c3.id]), set([])),
_find_shallow(self._store, [c3.id], 1))
self.assertEqual((set([c2.id]), set([c3.id])),
_find_shallow(self._store, [c3.id], 2))
self.assertEqual((set([c1.id]), set([c2.id, c3.id])),
_find_shallow(self._store, [c3.id], 3))
self.assertEqual((set([]), set([c1.id, c2.id, c3.id])),
_find_shallow(self._store, [c3.id], 4))
def test_multiple_independent(self):
a = self.make_linear_commits(2, message=b'a')
b = self.make_linear_commits(2, message=b'b')
c = self.make_linear_commits(2, message=b'c')
heads = [a[1].id, b[1].id, c[1].id]
self.assertEqual((set([a[0].id, b[0].id, c[0].id]), set(heads)),
_find_shallow(self._store, heads, 2))
def test_multiple_overlapping(self):
# Create the following commit tree:
# 1--2
# \
# 3--4
c1, c2 = self.make_linear_commits(2)
c3 = self.make_commit(parents=[c1.id])
c4 = self.make_commit(parents=[c3.id])
# 1 is shallow along the path from 4, but not along the path from 2.
self.assertEqual((set([c1.id]), set([c1.id, c2.id, c3.id, c4.id])),
_find_shallow(self._store, [c2.id, c4.id], 3))
def test_merge(self):
c1 = self.make_commit()
c2 = self.make_commit()
c3 = self.make_commit(parents=[c1.id, c2.id])
self.assertEqual((set([c1.id, c2.id]), set([c3.id])),
_find_shallow(self._store, [c3.id], 2))
def test_tag(self):
c1, c2 = self.make_linear_commits(2)
tag = make_tag(c2, name=b'tag')
self._store.add_object(tag)
self.assertEqual((set([c1.id]), set([c2.id])),
_find_shallow(self._store, [tag.id], 2))
class TestUploadPackHandler(UploadPackHandler):
@classmethod
def required_capabilities(self):
return []
class ReceivePackHandlerTestCase(TestCase):
def setUp(self):
super(ReceivePackHandlerTestCase, self).setUp()
self._repo = MemoryRepo.init_bare([], {})
backend = DictBackend({b'/': self._repo})
self._handler = ReceivePackHandler(
backend, [b'/', b'host=lolcathost'], TestProto())
def test_apply_pack_del_ref(self):
refs = {
b'refs/heads/master': TWO,
b'refs/heads/fake-branch': ONE}
self._repo.refs._update(refs)
update_refs = [[ONE, ZERO_SHA, b'refs/heads/fake-branch'], ]
self._handler.set_client_capabilities([b'delete-refs'])
status = self._handler._apply_pack(update_refs)
self.assertEqual(status[0][0], b'unpack')
self.assertEqual(status[0][1], b'ok')
self.assertEqual(status[1][0], b'refs/heads/fake-branch')
self.assertEqual(status[1][1], b'ok')
class ProtocolGraphWalkerEmptyTestCase(TestCase):
def setUp(self):
super(ProtocolGraphWalkerEmptyTestCase, self).setUp()
self._repo = MemoryRepo.init_bare([], {})
backend = DictBackend({b'/': self._repo})
self._walker = _ProtocolGraphWalker(
TestUploadPackHandler(backend, [b'/', b'host=lolcats'],
TestProto()),
self._repo.object_store, self._repo.get_peeled,
self._repo.refs.get_symrefs)
def test_empty_repository(self):
# The server should wait for a flush packet.
self._walker.proto.set_output([])
self.assertRaises(HangupException, self._walker.determine_wants, {})
self.assertEqual(None, self._walker.proto.get_received_line())
self._walker.proto.set_output([None])
self.assertEqual([], self._walker.determine_wants({}))
self.assertEqual(None, self._walker.proto.get_received_line())
class ProtocolGraphWalkerTestCase(TestCase):
Loading ...