Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

beebox / crossover   deb

Repository URL to install this package:

Version: 18.5.0-1 

/ opt / cxoffice / lib / python / cxfsnotifier.py

# (c) Copyright 2010, 2015. CodeWeavers, Inc.

"""Monitors the filesystem and notifies the registered observers of any change
to their paths of interest."""

import os
import sys
import threading
import time

import cxlog


# These are the events you may receive
CREATED = 'created'
DELETED = 'deleted'
MODIFIED = 'modified'


#####
#
# Fallback notifier implementation
#
#####

class _Path(object):
    """This is a helper class that monitors a single path."""

    def __init__(self, path):
        self._path = path
        self._dentries = set()
        self._mtime = None
        self._observers = None


    def add_observer(self, observer, observer_data=None):
        """Adds the specified observer to this path.

        See cxfsnotifier.add_observer() for more details.
        """
        if self._observers is None:
            self._observers = {}
            # Now is the time to establish a baseline for path
            try:
                self._mtime = os.path.getmtime(self._path)
                self._dentries = set(os.listdir(self._path))
            except OSError:
                # We already have suitable defaults for these fields
                pass
        self._observers[id(observer)] = observer, observer_data


    def remove_observer(self, observer):
        """Removes the specified observer from this path.

        See cxfsnotifier.remove_observer() for more details.
        """
        try:
            del self._observers[id(observer)]
        except KeyError:
            # Just assume this observer has been removed already to
            # simplify error handling
            pass


    def _emit_event(self, event, path):
        """This is a helper for poll() which emits the specified event to all
        the observers.
        """
        for (observer, observer_data) in self._observers.itervalues():
            observer(event, path, observer_data)


    def poll(self):
        """Checks this path object for changes and emits the events if any."""
        try:
            mtime = os.path.getmtime(self._path)
        except OSError:
            mtime = None
        if mtime == self._mtime:
            # path has not changed
            return
        old_mtime = self._mtime
        old_dentries = self._dentries
        self._mtime = mtime
        try:
            self._dentries = set(os.listdir(self._path))
        except OSError:
            self._dentries = set()

        # Try to send the events in a logical order
        if old_mtime != None:
            if old_dentries:
                # Notify observers of the deletion of subfolders and files
                # before notifying them of the deletion of the parent directory
                for dentry in old_dentries.difference(self._dentries):
                    self._emit_event(DELETED, os.path.join(self._path, dentry))
                if not self._dentries:
                    # This is not a folder anymore
                    self._emit_event(DELETED, self._path)
            elif mtime is None:
                self._emit_event(DELETED, self._path)

        if mtime != None:
            if self._dentries:
                if not old_dentries:
                    # Notify observers of the directory creation, before
                    # sending the notifications for its content
                    self._emit_event(CREATED, self._path)
                for dentry in self._dentries.difference(old_dentries):
                    self._emit_event(CREATED, os.path.join(self._path, dentry))
                # Don't send MODIFIED events for directories
            elif old_mtime is None or old_dentries:
                self._emit_event(CREATED, self._path)
            else:
                self._emit_event(MODIFIED, self._path)


class PollingFSNotifier(object):
    """The fallback filesystem notifier implementation.

    It works by polling the specified paths at regular intervals.
    """

    def __init__(self):
        self.interval = 2
        self._paths = {}
        self._lock = threading.Lock()
        self._thread = threading.Thread(target=self.worker)
        self._thread.setDaemon(True)
        self._thread.start()


    def add_observer(self, path, observer, observer_data=None):
        """See cxfsnotifier.add_observer()."""
        self._lock.acquire()
        try:
            if path not in self._paths:
                self._paths[path] = _Path(path)
            self._paths[path].add_observer(observer, observer_data)
        finally:
            self._lock.release()


    def remove_observer(self, path, observer):
        """See cxfsnotifier.remove_observer()."""
        self._lock.acquire()
        try:
            try:
                self._paths[path].remove_observer(observer)
            except KeyError:
                # Just assume this observer has been removed already to
                # simplify error handling
                pass
        finally:
            self._lock.release()


    def poll(self, path):
        """See cxfsnotifier.poll()."""
        self._lock.acquire()
        try:
            pathobj = self._paths.get(path, None)
            if pathobj:
                pathobj.poll()
        finally:
            self._lock.release()


    def worker(self):
        """This method implements the polling loop and runs in the background
        thread.
        """
        while True:
            time.sleep(self.interval)
            self._lock.acquire()
            try:
                for pathobj in self._paths.itervalues():
                    pathobj.poll()
            finally:
                self._lock.release()


#####
#
# Implementation registration and selection
#
#####

# Candidate notifier implementations
_NOTIFIERS = {}

# The selected notifier implementation
_NOTIFIER = None


def add(cls, priority):
    """Registers a filesystem notifier implementation class.

    The implementation with the highest priority (a positive integer) will be
    selected, providing that the object instantiation succeeds. Otherwise the
    implementation with the next highest priority will be used.

    It is an error to register an implementation after observers have been
    added.
    """
    if _NOTIFIER:
        raise AssertionError('A cxfsnotifier implementation has already been selected')
    _NOTIFIERS[cls] = priority


# Register the fallback implementation
add(PollingFSNotifier, 0)


def get():
    """Returns the filesystem notifier implementation."""
    # pylint: disable=W0603
    global _NOTIFIER
    if not _NOTIFIER:
        for cls in sorted(_NOTIFIERS, key=_NOTIFIERS.__getitem__, reverse=True):
            try:
                _NOTIFIER = cls()
                break
            except Exception: # pylint: disable=W0703
                exception = sys.exc_info()[1]
                cxlog.log("%s is not available: %s" % (cls, str(exception)))
    return _NOTIFIER


#####
#
# Wrappers for the notifier implementations
#
#####

def add_observer(path, observer, observer_data=None):
    """Adds an event observer for the specified path.

    path is the path to monitor. This can either be a file or a directory. It
      can also be a nonexistent path, the observer will then be notified when
      and if it gets created. Also note that paths are not canonicalized. This
      means two observers may in fact be watching the same filesystem objects
      but just with two different paths. They will still each receive the
      relevant events, each for its own path. If path points to a directory
      the observer receives events for the creation and deletion of files and
      folders in that directory.

    observer_data is any data the observer would like to be given back
      when it gets called.

    observer is a callable. Its signature must be of the form:
        observer(event, path, observer_data)
      Where event is one of CREATED, DELETED or MODIFIED, path is the relevant
      path and observer_data is the data that was specified in add_observer().
      Observers must not raise exceptions.
    """
    get().add_observer(path, observer, observer_data)


def remove_observer(path, observer):
    """Removes the specified observer for the specified path.

    No exception is raised if the observer has already been removed.
    """
    get().add_observer(path, observer)


def poll(path):
    """Synchronously checks whether there are any events to report for the
    specified path.

    Note that because this is done synchronously the observers will be called
    in the current thread.
    """
    get().poll(path)


#####
#
# Some test code
#
#####

def main():
    class FakeNotifier(object):
        def __init__(self):
            raise Exception('bah')

    class FakeNotifier2(object):
        def __init__(self):
            raise Exception('bah')

    add(FakeNotifier, 2)
    add(FakeNotifier2, -1)

    def test_observer(event, path, _data):
        # We need the parentheses for Python 3
        # pylint: disable=C0325
        print("observer: %s: %s" % (event, path))

    def stop_observer(event, path, _data):
        # We need the parentheses for Python 3
        # pylint: disable=C0325
        print("stopper: %s: %s" % (event, path))
        if path == 'stop':
            print("Exiting")
            sys.exit(0)

    add_observer(os.getcwd(), test_observer)
    add_observer("new", test_observer)
    add_observer("stop", stop_observer)
    time.sleep(60)

if __name__ == '__main__':
    main()