Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aroundthecode / zope.interface   python

Repository URL to install this package:

Version: 4.6.0 

/ interface / tests / test_adapter.py

##############################################################################
#
# Copyright (c) 2003 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Adapter registry tests
"""
import unittest


def _makeInterfaces():
    from zope.interface import Interface

    class IB0(Interface): pass
    class IB1(IB0): pass
    class IB2(IB0): pass
    class IB3(IB2, IB1): pass
    class IB4(IB1, IB2): pass

    class IF0(Interface): pass
    class IF1(IF0): pass

    class IR0(Interface): pass
    class IR1(IR0): pass

    return IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1


class BaseAdapterRegistryTests(unittest.TestCase):

    def _getTargetClass(self):
        from zope.interface.adapter import BaseAdapterRegistry
        class _CUT(BaseAdapterRegistry):
            class LookupClass(object):
                _changed = _extendors = ()
                def __init__(self, reg):
                    pass
                def changed(self, orig):
                    self._changed += (orig,)
                def add_extendor(self, provided):
                    self._extendors += (provided,)
                def remove_extendor(self, provided):
                    self._extendors = tuple([x for x in self._extendors
                                                    if x != provided])
        for name in BaseAdapterRegistry._delegated:
            setattr(_CUT.LookupClass, name, object())
        return _CUT

    def _makeOne(self):
        return self._getTargetClass()()

    def test_lookup_delegation(self):
        CUT = self._getTargetClass()
        registry = CUT()
        for name in CUT._delegated:
            self.assertTrue(
                getattr(registry, name) is getattr(registry._v_lookup, name))

    def test__generation_on_first_creation(self):
        registry = self._makeOne()
        # Bumped to 1 in BaseAdapterRegistry.__init__
        self.assertEqual(registry._generation, 1)

    def test__generation_after_calling_changed(self):
        registry = self._makeOne()
        orig = object()
        registry.changed(orig)
        # Bumped to 1 in BaseAdapterRegistry.__init__
        self.assertEqual(registry._generation, 2)
        self.assertEqual(registry._v_lookup._changed, (registry, orig,))

    def test__generation_after_changing___bases__(self):
        class _Base(object): pass
        registry = self._makeOne()
        registry.__bases__ = (_Base,)
        self.assertEqual(registry._generation, 2)

    def test_register(self):
        IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
        registry = self._makeOne()
        registry.register([IB0], IR0, '', 'A1')
        self.assertEqual(registry.registered([IB0], IR0, ''), 'A1')
        self.assertEqual(len(registry._adapters), 2) #order 0 and order 1
        self.assertEqual(registry._generation, 2)

    def test_register_with_invalid_name(self):
        IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
        registry = self._makeOne()
        with self.assertRaises(ValueError):
            registry.register([IB0], IR0, object(), 'A1')

    def test_register_with_value_None_unregisters(self):
        IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
        registry = self._makeOne()
        registry.register([None], IR0, '', 'A1')
        registry.register([None], IR0, '', None)
        self.assertEqual(len(registry._adapters), 0)

    def test_register_with_same_value(self):
        IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
        registry = self._makeOne()
        _value = object()
        registry.register([None], IR0, '', _value)
        _before = registry._generation
        registry.register([None], IR0, '', _value)
        self.assertEqual(registry._generation, _before) # skipped changed()

    def test_registered_empty(self):
        registry = self._makeOne()
        self.assertEqual(registry.registered([None], None, ''), None)

    def test_registered_non_empty_miss(self):
        IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
        registry = self._makeOne()
        registry.register([IB1], None, '', 'A1')
        self.assertEqual(registry.registered([IB2], None, ''), None)

    def test_registered_non_empty_hit(self):
        registry = self._makeOne()
        registry.register([None], None, '', 'A1')
        self.assertEqual(registry.registered([None], None, ''), 'A1')

    def test_unregister_empty(self):
        registry = self._makeOne()
        registry.unregister([None], None, '') #doesn't raise
        self.assertEqual(registry.registered([None], None, ''), None)

    def test_unregister_non_empty_miss_on_required(self):
        IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
        registry = self._makeOne()
        registry.register([IB1], None, '', 'A1')
        registry.unregister([IB2], None, '') #doesn't raise
        self.assertEqual(registry.registered([IB1], None, ''), 'A1')

    def test_unregister_non_empty_miss_on_name(self):
        IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
        registry = self._makeOne()
        registry.register([IB1], None, '', 'A1')
        registry.unregister([IB1], None, 'nonesuch') #doesn't raise
        self.assertEqual(registry.registered([IB1], None, ''), 'A1')

    def test_unregister_with_value_not_None_miss(self):
        IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
        registry = self._makeOne()
        orig = object()
        nomatch = object()
        registry.register([IB1], None, '', orig)
        registry.unregister([IB1], None, '', nomatch) #doesn't raise
        self.assertTrue(registry.registered([IB1], None, '') is orig)

    def test_unregister_hit_clears_empty_subcomponents(self):
        IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
        registry = self._makeOne()
        one = object()
        another = object()
        registry.register([IB1, IB2], None, '', one)
        registry.register([IB1, IB3], None, '', another)
        self.assertTrue(IB2 in registry._adapters[2][IB1])
        self.assertTrue(IB3 in registry._adapters[2][IB1])
        registry.unregister([IB1, IB3], None, '', another)
        self.assertTrue(IB2 in registry._adapters[2][IB1])
        self.assertFalse(IB3 in registry._adapters[2][IB1])

    def test_unsubscribe_empty(self):
        registry = self._makeOne()
        registry.unsubscribe([None], None, '') #doesn't raise
        self.assertEqual(registry.registered([None], None, ''), None)

    def test_unsubscribe_hit(self):
        IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
        registry = self._makeOne()
        orig = object()
        registry.subscribe([IB1], None, orig)
        registry.unsubscribe([IB1], None, orig) #doesn't raise
        self.assertEqual(len(registry._subscribers), 0)

    def test_unsubscribe_after_multiple(self):
        IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
        registry = self._makeOne()
        first = object()
        second = object()
        third = object()
        fourth = object()
        registry.subscribe([IB1], None, first)
        registry.subscribe([IB1], None, second)
        registry.subscribe([IB1], IR0, third)
        registry.subscribe([IB1], IR0, fourth)
        registry.unsubscribe([IB1], IR0, fourth)
        registry.unsubscribe([IB1], IR0, third)
        registry.unsubscribe([IB1], None, second)
        registry.unsubscribe([IB1], None, first)
        self.assertEqual(len(registry._subscribers), 0)

    def test_unsubscribe_w_None_after_multiple(self):
        IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
        registry = self._makeOne()
        first = object()
        second = object()
        third = object()
        registry.subscribe([IB1], None, first)
        registry.subscribe([IB1], None, second)
        registry.unsubscribe([IB1], None)
        self.assertEqual(len(registry._subscribers), 0)

    def test_unsubscribe_non_empty_miss_on_required(self):
        IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
        registry = self._makeOne()
        registry.subscribe([IB1], None, 'A1')
        self.assertEqual(len(registry._subscribers), 2)
        registry.unsubscribe([IB2], None, '') #doesn't raise
        self.assertEqual(len(registry._subscribers), 2)

    def test_unsubscribe_non_empty_miss_on_value(self):
        IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
        registry = self._makeOne()
        registry.subscribe([IB1], None, 'A1')
        self.assertEqual(len(registry._subscribers), 2)
        registry.unsubscribe([IB1], None, 'A2') #doesn't raise
        self.assertEqual(len(registry._subscribers), 2)

    def test_unsubscribe_with_value_not_None_miss(self):
        IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
        registry = self._makeOne()
        orig = object()
        nomatch = object()
        registry.subscribe([IB1], None, orig)
        registry.unsubscribe([IB1], None, nomatch) #doesn't raise
        self.assertEqual(len(registry._subscribers), 2)
    
    def _instance_method_notify_target(self):
        self.fail("Example method, not intended to be called.")
    
    def test_unsubscribe_instance_method(self):
        IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces()
        registry = self._makeOne()
        self.assertEqual(len(registry._subscribers), 0)
        registry.subscribe([IB1], None, self._instance_method_notify_target)
        registry.unsubscribe([IB1], None, self._instance_method_notify_target)
        self.assertEqual(len(registry._subscribers), 0)


class LookupBaseFallbackTests(unittest.TestCase):

    def _getTargetClass(self):
        from zope.interface.adapter import LookupBaseFallback
        return LookupBaseFallback

    def _makeOne(self, uc_lookup=None, uc_lookupAll=None,
                 uc_subscriptions=None):
        if uc_lookup is None:
            def uc_lookup(self, required, provided, name):
                pass
        if uc_lookupAll is None:
            def uc_lookupAll(self, required, provided):
                raise NotImplementedError()
        if uc_subscriptions is None:
            def uc_subscriptions(self, required, provided):
                raise NotImplementedError()
        class Derived(self._getTargetClass()):
            _uncached_lookup = uc_lookup
            _uncached_lookupAll = uc_lookupAll
            _uncached_subscriptions = uc_subscriptions
        return Derived()

    def test_lookup_w_invalid_name(self):
        def _lookup(self, required, provided, name):
            self.fail("This should never be called")
        lb = self._makeOne(uc_lookup=_lookup)
        with self.assertRaises(ValueError):
            lb.lookup(('A',), 'B', object())

    def test_lookup_miss_no_default(self):
        _called_with = []
        def _lookup(self, required, provided, name):
            _called_with.append((required, provided, name))
            return None
        lb = self._makeOne(uc_lookup=_lookup)
        found = lb.lookup(('A',), 'B', 'C')
        self.assertTrue(found is None)
        self.assertEqual(_called_with, [(('A',), 'B', 'C')])

    def test_lookup_miss_w_default(self):
        _called_with = []
        _default = object()
        def _lookup(self, required, provided, name):
            _called_with.append((required, provided, name))
            return None
        lb = self._makeOne(uc_lookup=_lookup)
        found = lb.lookup(('A',), 'B', 'C', _default)
        self.assertTrue(found is _default)
        self.assertEqual(_called_with, [(('A',), 'B', 'C')])

    def test_lookup_not_cached(self):
        _called_with = []
        a, b, c = object(), object(), object()
        _results = [a, b, c]
        def _lookup(self, required, provided, name):
            _called_with.append((required, provided, name))
            return _results.pop(0)
        lb = self._makeOne(uc_lookup=_lookup)
        found = lb.lookup(('A',), 'B', 'C')
        self.assertTrue(found is a)
        self.assertEqual(_called_with, [(('A',), 'B', 'C')])
        self.assertEqual(_results, [b, c])

    def test_lookup_cached(self):
        _called_with = []
        a, b, c = object(), object(), object()
        _results = [a, b, c]
        def _lookup(self, required, provided, name):
            _called_with.append((required, provided, name))
            return _results.pop(0)
        lb = self._makeOne(uc_lookup=_lookup)
        found = lb.lookup(('A',), 'B', 'C')
        found = lb.lookup(('A',), 'B', 'C')
        self.assertTrue(found is a)
        self.assertEqual(_called_with, [(('A',), 'B', 'C')])
        self.assertEqual(_results, [b, c])

    def test_lookup_not_cached_multi_required(self):
        _called_with = []
        a, b, c = object(), object(), object()
        _results = [a, b, c]
        def _lookup(self, required, provided, name):
            _called_with.append((required, provided, name))
            return _results.pop(0)
        lb = self._makeOne(uc_lookup=_lookup)
        found = lb.lookup(('A', 'D'), 'B', 'C')
        self.assertTrue(found is a)
        self.assertEqual(_called_with, [(('A', 'D'), 'B', 'C')])
        self.assertEqual(_results, [b, c])

    def test_lookup_cached_multi_required(self):
        _called_with = []
        a, b, c = object(), object(), object()
        _results = [a, b, c]
        def _lookup(self, required, provided, name):
Loading ...