##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Interface object implementation
"""
from __future__ import generators
import sys
from types import MethodType
from types import FunctionType
import warnings
import weakref
from zope.interface.exceptions import Invalid
from zope.interface.ro import ro
CO_VARARGS = 4
CO_VARKEYWORDS = 8
TAGGED_DATA = '__interface_tagged_values__'
_decorator_non_return = object()
def invariant(call):
f_locals = sys._getframe(1).f_locals
tags = f_locals.setdefault(TAGGED_DATA, {})
invariants = tags.setdefault('invariants', [])
invariants.append(call)
return _decorator_non_return
def taggedValue(key, value):
"""Attaches a tagged value to an interface at definition time."""
f_locals = sys._getframe(1).f_locals
tagged_values = f_locals.setdefault(TAGGED_DATA, {})
tagged_values[key] = value
return _decorator_non_return
class Element(object):
"""
Default implementation of `zope.interface.interfaces.IElement`.
"""
# We can't say this yet because we don't have enough
# infrastructure in place.
#
#implements(IElement)
def __init__(self, __name__, __doc__=''):
if not __doc__ and __name__.find(' ') >= 0:
__doc__ = __name__
__name__ = None
self.__name__=__name__
self.__doc__=__doc__
self.__tagged_values = {}
def getName(self):
""" Returns the name of the object. """
return self.__name__
def getDoc(self):
""" Returns the documentation for the object. """
return self.__doc__
def getTaggedValue(self, tag):
""" Returns the value associated with 'tag'. """
return self.__tagged_values[tag]
def queryTaggedValue(self, tag, default=None):
""" Returns the value associated with 'tag'. """
return self.__tagged_values.get(tag, default)
def getTaggedValueTags(self):
""" Returns a list of all tags. """
return self.__tagged_values.keys()
def setTaggedValue(self, tag, value):
""" Associates 'value' with 'key'. """
self.__tagged_values[tag] = value
class SpecificationBasePy(object):
def providedBy(self, ob):
"""Is the interface implemented by an object
"""
spec = providedBy(ob)
return self in spec._implied
def implementedBy(self, cls):
"""Test whether the specification is implemented by a class or factory.
Raise TypeError if argument is neither a class nor a callable.
"""
spec = implementedBy(cls)
return self in spec._implied
def isOrExtends(self, interface):
"""Is the interface the same as or extend the given interface
"""
return interface in self._implied
__call__ = isOrExtends
SpecificationBase = SpecificationBasePy
try:
from zope.interface._zope_interface_coptimizations import SpecificationBase
except ImportError:
pass
_marker = object()
class InterfaceBasePy(object):
"""Base class that wants to be replaced with a C base :)
"""
def __call__(self, obj, alternate=_marker):
"""Adapt an object to the interface
"""
conform = getattr(obj, '__conform__', None)
if conform is not None:
adapter = self._call_conform(conform)
if adapter is not None:
return adapter
adapter = self.__adapt__(obj)
if adapter is not None:
return adapter
elif alternate is not _marker:
return alternate
else:
raise TypeError("Could not adapt", obj, self)
def __adapt__(self, obj):
"""Adapt an object to the reciever
"""
if self.providedBy(obj):
return obj
for hook in adapter_hooks:
adapter = hook(self, obj)
if adapter is not None:
return adapter
InterfaceBase = InterfaceBasePy
try:
from zope.interface._zope_interface_coptimizations import InterfaceBase
except ImportError:
pass
adapter_hooks = []
try:
from zope.interface._zope_interface_coptimizations import adapter_hooks
except ImportError:
pass
class Specification(SpecificationBase):
"""Specifications
An interface specification is used to track interface declarations
and component registrations.
This class is a base class for both interfaces themselves and for
interface specifications (declarations).
Specifications are mutable. If you reassign their bases, their
relations with other specifications are adjusted accordingly.
"""
# Copy some base class methods for speed
isOrExtends = SpecificationBase.isOrExtends
providedBy = SpecificationBase.providedBy
def __init__(self, bases=()):
self._implied = {}
self.dependents = weakref.WeakKeyDictionary()
self.__bases__ = tuple(bases)
def subscribe(self, dependent):
self.dependents[dependent] = self.dependents.get(dependent, 0) + 1
def unsubscribe(self, dependent):
n = self.dependents.get(dependent, 0) - 1
if not n:
del self.dependents[dependent]
elif n > 0:
self.dependents[dependent] = n
else:
raise KeyError(dependent)
def __setBases(self, bases):
# Register ourselves as a dependent of our old bases
for b in self.__bases__:
b.unsubscribe(self)
# Register ourselves as a dependent of our bases
self.__dict__['__bases__'] = bases
for b in bases:
b.subscribe(self)
self.changed(self)
__bases__ = property(
lambda self: self.__dict__.get('__bases__', ()),
__setBases,
)
def changed(self, originally_changed):
"""We, or something we depend on, have changed
"""
try:
del self._v_attrs
except AttributeError:
pass
implied = self._implied
implied.clear()
ancestors = ro(self)
try:
if Interface not in ancestors:
ancestors.append(Interface)
except NameError:
pass # defining Interface itself
self.__sro__ = tuple(ancestors)
self.__iro__ = tuple([ancestor for ancestor in ancestors
if isinstance(ancestor, InterfaceClass)
])
for ancestor in ancestors:
# We directly imply our ancestors:
implied[ancestor] = ()
# Now, advise our dependents of change:
for dependent in tuple(self.dependents.keys()):
dependent.changed(originally_changed)
def interfaces(self):
"""Return an iterator for the interfaces in the specification.
"""
seen = {}
for base in self.__bases__:
for interface in base.interfaces():
if interface not in seen:
seen[interface] = 1
yield interface
def extends(self, interface, strict=True):
"""Does the specification extend the given interface?
Test whether an interface in the specification extends the
given interface
"""
return ((interface in self._implied)
and
((not strict) or (self != interface))
)
def weakref(self, callback=None):
return weakref.ref(self, callback)
def get(self, name, default=None):
"""Query for an attribute description
"""
try:
attrs = self._v_attrs
except AttributeError:
attrs = self._v_attrs = {}
attr = attrs.get(name)
if attr is None:
for iface in self.__iro__:
attr = iface.direct(name)
if attr is not None:
attrs[name] = attr
break
if attr is None:
return default
else:
return attr
class InterfaceClass(Element, InterfaceBase, Specification):
"""Prototype (scarecrow) Interfaces Implementation."""
# We can't say this yet because we don't have enough
# infrastructure in place.
#
#implements(IInterface)
def __init__(self, name, bases=(), attrs=None, __doc__=None,
__module__=None):
if attrs is None:
attrs = {}
if __module__ is None:
__module__ = attrs.get('__module__')
if isinstance(__module__, str):
del attrs['__module__']
else:
try:
# Figure out what module defined the interface.
# This is how cPython figures out the module of
# a class, but of course it does it in C. :-/
__module__ = sys._getframe(1).f_globals['__name__']
except (AttributeError, KeyError): # pragma: no cover
pass
self.__module__ = __module__
d = attrs.get('__doc__')
if d is not None:
if not isinstance(d, Attribute):
if __doc__ is None:
__doc__ = d
del attrs['__doc__']
if __doc__ is None:
__doc__ = ''
Element.__init__(self, name, __doc__)
tagged_data = attrs.pop(TAGGED_DATA, None)
if tagged_data is not None:
for key, val in tagged_data.items():
self.setTaggedValue(key, val)
Loading ...