##############################################################################
#
# Copyright (c) 2003 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Adapter registry tests
"""
import unittest
def _makeInterfaces():
from zope.interface import Interface
class IB0(Interface): pass
class IB1(IB0): pass
class IB2(IB0): pass
class IB3(IB2, IB1): pass
class IB4(IB1, IB2): pass
class IF0(Interface): pass
class IF1(IF0): pass
class IR0(Interface): pass
class IR1(IR0): pass
return IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1
class BaseAdapterRegistryTests(unittest.TestCase):
def _getTargetClass(self):
from zope.interface.adapter import BaseAdapterRegistry
class _CUT(BaseAdapterRegistry):
class LookupClass(object):
_changed = _extendors = ()
def __init__(self, reg):
pass
def changed(self, orig):
self._changed += (orig,)
def add_extendor(self, provided):
self._extendors += (provided,)
def remove_extendor(self, provided):
self._extendors = tuple([x for x in self._extendors
if x != provided])
for name in BaseAdapterRegistry._delegated:
setattr(_CUT.LookupClass, name, object())
return _CUT
def _makeOne(self):
return self._getTargetClass()()
def test_lookup_delegation(self):
CUT = self._getTargetClass()
registry = CUT()
for name in CUT._delegated:
self.assertTrue(
getattr(registry, name) is getattr(registry._v_lookup, name))
def test__generation_on_first_creation(self):
registry = self._makeOne()
# Bumped to 1 in BaseAdapterRegistry.__init__
self.assertEqual(registry._generation, 1)
def test__generation_after_calling_changed(self):
registry = self._makeOne()
orig = object()
registry.changed(orig)
# Bumped to 1 in BaseAdapterRegistry.__init__
self.assertEqual(registry._generation, 2)
self.assertEqual(registry._v_lookup._changed, (registry, orig,))
def test__generation_after_changing___bases__(self):
class _Base(object): pass
registry = self._makeOne()
registry.__bases__ = (_Base,)
self.assertEqual(registry._generation, 2)
def test_register(self):
IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
registry = self._makeOne()
registry.register([IB0], IR0, '', 'A1')
self.assertEqual(registry.registered([IB0], IR0, ''), 'A1')
self.assertEqual(len(registry._adapters), 2) #order 0 and order 1
self.assertEqual(registry._generation, 2)
def test_register_with_invalid_name(self):
IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
registry = self._makeOne()
with self.assertRaises(ValueError):
registry.register([IB0], IR0, object(), 'A1')
def test_register_with_value_None_unregisters(self):
IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
registry = self._makeOne()
registry.register([None], IR0, '', 'A1')
registry.register([None], IR0, '', None)
self.assertEqual(len(registry._adapters), 0)
def test_register_with_same_value(self):
IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
registry = self._makeOne()
_value = object()
registry.register([None], IR0, '', _value)
_before = registry._generation
registry.register([None], IR0, '', _value)
self.assertEqual(registry._generation, _before) # skipped changed()
def test_registered_empty(self):
registry = self._makeOne()
self.assertEqual(registry.registered([None], None, ''), None)
def test_registered_non_empty_miss(self):
IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
registry = self._makeOne()
registry.register([IB1], None, '', 'A1')
self.assertEqual(registry.registered([IB2], None, ''), None)
def test_registered_non_empty_hit(self):
registry = self._makeOne()
registry.register([None], None, '', 'A1')
self.assertEqual(registry.registered([None], None, ''), 'A1')
def test_unregister_empty(self):
registry = self._makeOne()
registry.unregister([None], None, '') #doesn't raise
self.assertEqual(registry.registered([None], None, ''), None)
def test_unregister_non_empty_miss_on_required(self):
IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
registry = self._makeOne()
registry.register([IB1], None, '', 'A1')
registry.unregister([IB2], None, '') #doesn't raise
self.assertEqual(registry.registered([IB1], None, ''), 'A1')
def test_unregister_non_empty_miss_on_name(self):
IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
registry = self._makeOne()
registry.register([IB1], None, '', 'A1')
registry.unregister([IB1], None, 'nonesuch') #doesn't raise
self.assertEqual(registry.registered([IB1], None, ''), 'A1')
def test_unregister_with_value_not_None_miss(self):
IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
registry = self._makeOne()
orig = object()
nomatch = object()
registry.register([IB1], None, '', orig)
registry.unregister([IB1], None, '', nomatch) #doesn't raise
self.assertTrue(registry.registered([IB1], None, '') is orig)
def test_unregister_hit_clears_empty_subcomponents(self):
IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
registry = self._makeOne()
one = object()
another = object()
registry.register([IB1, IB2], None, '', one)
registry.register([IB1, IB3], None, '', another)
self.assertTrue(IB2 in registry._adapters[2][IB1])
self.assertTrue(IB3 in registry._adapters[2][IB1])
registry.unregister([IB1, IB3], None, '', another)
self.assertTrue(IB2 in registry._adapters[2][IB1])
self.assertFalse(IB3 in registry._adapters[2][IB1])
def test_unsubscribe_empty(self):
registry = self._makeOne()
registry.unsubscribe([None], None, '') #doesn't raise
self.assertEqual(registry.registered([None], None, ''), None)
def test_unsubscribe_hit(self):
IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
registry = self._makeOne()
orig = object()
registry.subscribe([IB1], None, orig)
registry.unsubscribe([IB1], None, orig) #doesn't raise
self.assertEqual(len(registry._subscribers), 0)
def test_unsubscribe_after_multiple(self):
IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
registry = self._makeOne()
first = object()
second = object()
third = object()
fourth = object()
registry.subscribe([IB1], None, first)
registry.subscribe([IB1], None, second)
registry.subscribe([IB1], IR0, third)
registry.subscribe([IB1], IR0, fourth)
registry.unsubscribe([IB1], IR0, fourth)
registry.unsubscribe([IB1], IR0, third)
registry.unsubscribe([IB1], None, second)
registry.unsubscribe([IB1], None, first)
self.assertEqual(len(registry._subscribers), 0)
def test_unsubscribe_w_None_after_multiple(self):
IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
registry = self._makeOne()
first = object()
second = object()
third = object()
registry.subscribe([IB1], None, first)
registry.subscribe([IB1], None, second)
registry.unsubscribe([IB1], None)
self.assertEqual(len(registry._subscribers), 0)
def test_unsubscribe_non_empty_miss_on_required(self):
IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
registry = self._makeOne()
registry.subscribe([IB1], None, 'A1')
self.assertEqual(len(registry._subscribers), 2)
registry.unsubscribe([IB2], None, '') #doesn't raise
self.assertEqual(len(registry._subscribers), 2)
def test_unsubscribe_non_empty_miss_on_value(self):
IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
registry = self._makeOne()
registry.subscribe([IB1], None, 'A1')
self.assertEqual(len(registry._subscribers), 2)
registry.unsubscribe([IB1], None, 'A2') #doesn't raise
self.assertEqual(len(registry._subscribers), 2)
def test_unsubscribe_with_value_not_None_miss(self):
IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
registry = self._makeOne()
orig = object()
nomatch = object()
registry.subscribe([IB1], None, orig)
registry.unsubscribe([IB1], None, nomatch) #doesn't raise
self.assertEqual(len(registry._subscribers), 2)
def _instance_method_notify_target(self):
self.fail("Example method, not intended to be called.")
def test_unsubscribe_instance_method(self):
IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
registry = self._makeOne()
self.assertEqual(len(registry._subscribers), 0)
registry.subscribe([IB1], None, self._instance_method_notify_target)
registry.unsubscribe([IB1], None, self._instance_method_notify_target)
self.assertEqual(len(registry._subscribers), 0)
class LookupBaseFallbackTests(unittest.TestCase):
def _getTargetClass(self):
from zope.interface.adapter import LookupBaseFallback
return LookupBaseFallback
def _makeOne(self, uc_lookup=None, uc_lookupAll=None,
uc_subscriptions=None):
if uc_lookup is None:
def uc_lookup(self, required, provided, name):
pass
if uc_lookupAll is None:
def uc_lookupAll(self, required, provided):
raise NotImplementedError()
if uc_subscriptions is None:
def uc_subscriptions(self, required, provided):
raise NotImplementedError()
class Derived(self._getTargetClass()):
_uncached_lookup = uc_lookup
_uncached_lookupAll = uc_lookupAll
_uncached_subscriptions = uc_subscriptions
return Derived()
def test_lookup_w_invalid_name(self):
def _lookup(self, required, provided, name):
self.fail("This should never be called")
lb = self._makeOne(uc_lookup=_lookup)
with self.assertRaises(ValueError):
lb.lookup(('A',), 'B', object())
def test_lookup_miss_no_default(self):
_called_with = []
def _lookup(self, required, provided, name):
_called_with.append((required, provided, name))
return None
lb = self._makeOne(uc_lookup=_lookup)
found = lb.lookup(('A',), 'B', 'C')
self.assertTrue(found is None)
self.assertEqual(_called_with, [(('A',), 'B', 'C')])
def test_lookup_miss_w_default(self):
_called_with = []
_default = object()
def _lookup(self, required, provided, name):
_called_with.append((required, provided, name))
return None
lb = self._makeOne(uc_lookup=_lookup)
found = lb.lookup(('A',), 'B', 'C', _default)
self.assertTrue(found is _default)
self.assertEqual(_called_with, [(('A',), 'B', 'C')])
def test_lookup_not_cached(self):
_called_with = []
a, b, c = object(), object(), object()
_results = [a, b, c]
def _lookup(self, required, provided, name):
_called_with.append((required, provided, name))
return _results.pop(0)
lb = self._makeOne(uc_lookup=_lookup)
found = lb.lookup(('A',), 'B', 'C')
self.assertTrue(found is a)
self.assertEqual(_called_with, [(('A',), 'B', 'C')])
self.assertEqual(_results, [b, c])
def test_lookup_cached(self):
_called_with = []
a, b, c = object(), object(), object()
_results = [a, b, c]
def _lookup(self, required, provided, name):
_called_with.append((required, provided, name))
return _results.pop(0)
lb = self._makeOne(uc_lookup=_lookup)
found = lb.lookup(('A',), 'B', 'C')
found = lb.lookup(('A',), 'B', 'C')
self.assertTrue(found is a)
self.assertEqual(_called_with, [(('A',), 'B', 'C')])
self.assertEqual(_results, [b, c])
def test_lookup_not_cached_multi_required(self):
_called_with = []
a, b, c = object(), object(), object()
_results = [a, b, c]
def _lookup(self, required, provided, name):
_called_with.append((required, provided, name))
return _results.pop(0)
lb = self._makeOne(uc_lookup=_lookup)
found = lb.lookup(('A', 'D'), 'B', 'C')
self.assertTrue(found is a)
self.assertEqual(_called_with, [(('A', 'D'), 'B', 'C')])
self.assertEqual(_results, [b, c])
def test_lookup_cached_multi_required(self):
_called_with = []
a, b, c = object(), object(), object()
_results = [a, b, c]
def _lookup(self, required, provided, name):
Loading ...