Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

beebox / crossover   deb

Repository URL to install this package:

Version: 18.5.0-1 

/ opt / cxoffice / lib / python / cxobservable.py

# (c) Copyright 2010. CodeWeavers, Inc.

"""Provides a framework for classes that have observable properties."""

ANY_EVENT = None


class Object(object):
    """Provides a framework for a class that has observable properties.

    The derived classes should have an observable_events class attribute that
    defines the set of events this class can emit. Events can be strings or
    any other type of object.
    """

    # This is the list of observable events
    observable_events = frozenset()


    def __init__(self):
        self._observers = {}


    def add_observer(self, events, observer, observer_data=None):
        """Adds an event observer for the current object.

        events can either be a single event or a sequence of events the
          observer is interested in. To subscribe to all the events from
          this object, specify the ANY_EVENT value.

        observer_data is any data the observer would like to be given back
          when it gets called.

        observer is a callable. Its signature must be of the form:
            observer(obj, event, observer_data, *args)
          Where obj is the object the event happens on, event is the event that
          was emitted, observer_data is the data specified above, and *args is
          an optional set of extra data specific to the event. Observers must
          not raise exceptions.
        """
        if not hasattr(events, '__iter__'):
            events = (events,)
        for event in events:
            if event and event not in self.observable_events:
                raise TypeError("class %s has no %s observable event" %
                                (self.__class__, repr(event)))
            self._observers.setdefault(event, {})
            self._observers[event][id(observer)] = observer, observer_data


    def remove_observer(self, events, observer):
        """Removes the specified observer for the specified events.

        No exception is raised if the observer has already been removed.
        """
        if not hasattr(events, '__iter__'):
            events = (events,)
        for event in events:
            if event and event not in self.observable_events:
                raise TypeError("class %s has no %s observable event" %
                                (self.__class__, repr(event)))
            try:
                del self._observers[event][id(observer)]
            except KeyError:
                # Just assume this observer has been removed already to
                # simplify error handling
                pass


    def emit_event(self, event, *args):
        """Emits the specified event.

        Note that observers will be called in the same thread as emit_event().
        """
        if event not in self.observable_events:
            raise TypeError("class %s has no %s observable event" %
                            (self.__class__, repr(event)))
        for observer_event in (event, ANY_EVENT):
            if observer_event in self._observers:
                for (observer, observer_data) in self._observers[observer_event].itervalues():
                    observer(self, event, observer_data, *args)