# (c) Copyright 2010, 2015. CodeWeavers, Inc.
"""Monitors the filesystem and notifies the registered observers of any change
to their paths of interest."""
import os
import sys
import threading
import time
import cxlog
# These are the events you may receive
CREATED = 'created'
DELETED = 'deleted'
MODIFIED = 'modified'
#####
#
# Fallback notifier implementation
#
#####
class _Path(object):
"""This is a helper class that monitors a single path."""
def __init__(self, path):
self._path = path
self._dentries = set()
self._mtime = None
self._observers = None
def add_observer(self, observer, observer_data=None):
"""Adds the specified observer to this path.
See cxfsnotifier.add_observer() for more details.
"""
if self._observers is None:
self._observers = {}
# Now is the time to establish a baseline for path
try:
self._mtime = os.path.getmtime(self._path)
self._dentries = set(os.listdir(self._path))
except OSError:
# We already have suitable defaults for these fields
pass
self._observers[id(observer)] = observer, observer_data
def remove_observer(self, observer):
"""Removes the specified observer from this path.
See cxfsnotifier.remove_observer() for more details.
"""
try:
del self._observers[id(observer)]
except KeyError:
# Just assume this observer has been removed already to
# simplify error handling
pass
def _emit_event(self, event, path):
"""This is a helper for poll() which emits the specified event to all
the observers.
"""
for (observer, observer_data) in self._observers.itervalues():
observer(event, path, observer_data)
def poll(self):
"""Checks this path object for changes and emits the events if any."""
try:
mtime = os.path.getmtime(self._path)
except OSError:
mtime = None
if mtime == self._mtime:
# path has not changed
return
old_mtime = self._mtime
old_dentries = self._dentries
self._mtime = mtime
try:
self._dentries = set(os.listdir(self._path))
except OSError:
self._dentries = set()
# Try to send the events in a logical order
if old_mtime != None:
if old_dentries:
# Notify observers of the deletion of subfolders and files
# before notifying them of the deletion of the parent directory
for dentry in old_dentries.difference(self._dentries):
self._emit_event(DELETED, os.path.join(self._path, dentry))
if not self._dentries:
# This is not a folder anymore
self._emit_event(DELETED, self._path)
elif mtime is None:
self._emit_event(DELETED, self._path)
if mtime != None:
if self._dentries:
if not old_dentries:
# Notify observers of the directory creation, before
# sending the notifications for its content
self._emit_event(CREATED, self._path)
for dentry in self._dentries.difference(old_dentries):
self._emit_event(CREATED, os.path.join(self._path, dentry))
# Don't send MODIFIED events for directories
elif old_mtime is None or old_dentries:
self._emit_event(CREATED, self._path)
else:
self._emit_event(MODIFIED, self._path)
class PollingFSNotifier(object):
"""The fallback filesystem notifier implementation.
It works by polling the specified paths at regular intervals.
"""
def __init__(self):
self.interval = 2
self._paths = {}
self._lock = threading.Lock()
self._thread = threading.Thread(target=self.worker)
self._thread.setDaemon(True)
self._thread.start()
def add_observer(self, path, observer, observer_data=None):
"""See cxfsnotifier.add_observer()."""
self._lock.acquire()
try:
if path not in self._paths:
self._paths[path] = _Path(path)
self._paths[path].add_observer(observer, observer_data)
finally:
self._lock.release()
def remove_observer(self, path, observer):
"""See cxfsnotifier.remove_observer()."""
self._lock.acquire()
try:
try:
self._paths[path].remove_observer(observer)
except KeyError:
# Just assume this observer has been removed already to
# simplify error handling
pass
finally:
self._lock.release()
def poll(self, path):
"""See cxfsnotifier.poll()."""
self._lock.acquire()
try:
pathobj = self._paths.get(path, None)
if pathobj:
pathobj.poll()
finally:
self._lock.release()
def worker(self):
"""This method implements the polling loop and runs in the background
thread.
"""
while True:
time.sleep(self.interval)
self._lock.acquire()
try:
for pathobj in self._paths.itervalues():
pathobj.poll()
finally:
self._lock.release()
#####
#
# Implementation registration and selection
#
#####
# Candidate notifier implementations
_NOTIFIERS = {}
# The selected notifier implementation
_NOTIFIER = None
def add(cls, priority):
"""Registers a filesystem notifier implementation class.
The implementation with the highest priority (a positive integer) will be
selected, providing that the object instantiation succeeds. Otherwise the
implementation with the next highest priority will be used.
It is an error to register an implementation after observers have been
added.
"""
if _NOTIFIER:
raise AssertionError('A cxfsnotifier implementation has already been selected')
_NOTIFIERS[cls] = priority
# Register the fallback implementation
add(PollingFSNotifier, 0)
def get():
"""Returns the filesystem notifier implementation."""
# pylint: disable=W0603
global _NOTIFIER
if not _NOTIFIER:
for cls in sorted(_NOTIFIERS, key=_NOTIFIERS.__getitem__, reverse=True):
try:
_NOTIFIER = cls()
break
except Exception: # pylint: disable=W0703
exception = sys.exc_info()[1]
cxlog.log("%s is not available: %s" % (cls, str(exception)))
return _NOTIFIER
#####
#
# Wrappers for the notifier implementations
#
#####
def add_observer(path, observer, observer_data=None):
"""Adds an event observer for the specified path.
path is the path to monitor. This can either be a file or a directory. It
can also be a nonexistent path, the observer will then be notified when
and if it gets created. Also note that paths are not canonicalized. This
means two observers may in fact be watching the same filesystem objects
but just with two different paths. They will still each receive the
relevant events, each for its own path. If path points to a directory
the observer receives events for the creation and deletion of files and
folders in that directory.
observer_data is any data the observer would like to be given back
when it gets called.
observer is a callable. Its signature must be of the form:
observer(event, path, observer_data)
Where event is one of CREATED, DELETED or MODIFIED, path is the relevant
path and observer_data is the data that was specified in add_observer().
Observers must not raise exceptions.
"""
get().add_observer(path, observer, observer_data)
def remove_observer(path, observer):
"""Removes the specified observer for the specified path.
No exception is raised if the observer has already been removed.
"""
get().add_observer(path, observer)
def poll(path):
"""Synchronously checks whether there are any events to report for the
specified path.
Note that because this is done synchronously the observers will be called
in the current thread.
"""
get().poll(path)
#####
#
# Some test code
#
#####
def main():
class FakeNotifier(object):
def __init__(self):
raise Exception('bah')
class FakeNotifier2(object):
def __init__(self):
raise Exception('bah')
add(FakeNotifier, 2)
add(FakeNotifier2, -1)
def test_observer(event, path, _data):
# We need the parentheses for Python 3
# pylint: disable=C0325
print("observer: %s: %s" % (event, path))
def stop_observer(event, path, _data):
# We need the parentheses for Python 3
# pylint: disable=C0325
print("stopper: %s: %s" % (event, path))
if path == 'stop':
print("Exiting")
sys.exit(0)
add_observer(os.getcwd(), test_observer)
add_observer("new", test_observer)
add_observer("stop", stop_observer)
time.sleep(60)
if __name__ == '__main__':
main()