Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ worker / autoreload.py

# -*- coding: utf-8 -*-
"""
    celery.worker.autoreload
    ~~~~~~~~~~~~~~~~~~~~~~~~

    This module implements automatic module reloading
"""
from __future__ import absolute_import

import hashlib
import os
import select
import sys
import time

from collections import defaultdict
from threading import Event

from kombu.utils import eventio
from kombu.utils.encoding import ensure_bytes

from celery import bootsteps
from celery.five import items
from celery.platforms import ignore_errno
from celery.utils.imports import module_file
from celery.utils.log import get_logger
from celery.utils.threads import bgThread

from .components import Pool

try:                        # pragma: no cover
    import pyinotify
    _ProcessEvent = pyinotify.ProcessEvent
except ImportError:         # pragma: no cover
    pyinotify = None        # noqa
    _ProcessEvent = object  # noqa

__all__ = [
    'WorkerComponent', 'Autoreloader', 'Monitor', 'BaseMonitor',
    'StatMonitor', 'KQueueMonitor', 'InotifyMonitor', 'file_hash',
]

logger = get_logger(__name__)


class WorkerComponent(bootsteps.StartStopStep):
    label = 'Autoreloader'
    conditional = True
    requires = (Pool, )

    def __init__(self, w, autoreload=None, **kwargs):
        self.enabled = w.autoreload = autoreload
        w.autoreloader = None

    def create(self, w):
        w.autoreloader = self.instantiate(w.autoreloader_cls, w)
        return w.autoreloader if not w.use_eventloop else None

    def register_with_event_loop(self, w, hub):
        w.autoreloader.register_with_event_loop(hub)
        hub.on_close.add(w.autoreloader.on_event_loop_close)


def file_hash(filename, algorithm='md5'):
    hobj = hashlib.new(algorithm)
    with open(filename, 'rb') as f:
        for chunk in iter(lambda: f.read(2 ** 20), ''):
            hobj.update(ensure_bytes(chunk))
    return hobj.digest()


class BaseMonitor(object):

    def __init__(self, files,
                 on_change=None, shutdown_event=None, interval=0.5):
        self.files = files
        self.interval = interval
        self._on_change = on_change
        self.modify_times = defaultdict(int)
        self.shutdown_event = shutdown_event or Event()

    def start(self):
        raise NotImplementedError('Subclass responsibility')

    def stop(self):
        pass

    def on_change(self, modified):
        if self._on_change:
            return self._on_change(modified)

    def on_event_loop_close(self, hub):
        pass


class StatMonitor(BaseMonitor):
    """File change monitor based on the ``stat`` system call."""

    def _mtimes(self):
        return ((f, self._mtime(f)) for f in self.files)

    def _maybe_modified(self, f, mt):
        return mt is not None and self.modify_times[f] != mt

    def register_with_event_loop(self, hub):
        hub.call_repeatedly(2.0, self.find_changes)

    def find_changes(self):
        maybe_modified = self._maybe_modified
        modified = dict((f, mt) for f, mt in self._mtimes()
                        if maybe_modified(f, mt))
        if modified:
            self.on_change(modified)
            self.modify_times.update(modified)

    def start(self):
        while not self.shutdown_event.is_set():
            self.find_changes()
            time.sleep(self.interval)

    @staticmethod
    def _mtime(path):
        try:
            return os.stat(path).st_mtime
        except Exception:
            pass


class KQueueMonitor(BaseMonitor):
    """File change monitor based on BSD kernel event notifications"""

    def __init__(self, *args, **kwargs):
        super(KQueueMonitor, self).__init__(*args, **kwargs)
        self.filemap = dict((f, None) for f in self.files)
        self.fdmap = {}

    def register_with_event_loop(self, hub):
        if eventio.kqueue is not None:
            self._kq = eventio._kqueue()
            self.add_events(self._kq)
            self._kq.on_file_change = self.handle_event
            hub.add_reader(self._kq._kqueue, self._kq.poll, 0)

    def on_event_loop_close(self, hub):
        self.close(self._kq)

    def add_events(self, poller):
        for f in self.filemap:
            self.filemap[f] = fd = os.open(f, os.O_RDONLY)
            self.fdmap[fd] = f
            poller.watch_file(fd)

    def handle_event(self, events):
        self.on_change([self.fdmap[e.ident] for e in events])

    def start(self):
        self.poller = eventio.poll()
        self.add_events(self.poller)
        self.poller.on_file_change = self.handle_event
        while not self.shutdown_event.is_set():
            self.poller.poll(1)

    def close(self, poller):
        for f, fd in items(self.filemap):
            if fd is not None:
                poller.unregister(fd)
                with ignore_errno('EBADF'):  # pragma: no cover
                    os.close(fd)
        self.filemap.clear()
        self.fdmap.clear()

    def stop(self):
        self.close(self.poller)
        self.poller.close()


class InotifyMonitor(_ProcessEvent):
    """File change monitor based on Linux kernel `inotify` subsystem"""

    def __init__(self, modules, on_change=None, **kwargs):
        assert pyinotify
        self._modules = modules
        self._on_change = on_change
        self._wm = None
        self._notifier = None

    def register_with_event_loop(self, hub):
        self.create_notifier()
        hub.add_reader(self._wm.get_fd(), self.on_readable)

    def on_event_loop_close(self, hub):
        pass

    def on_readable(self):
        self._notifier.read_events()
        self._notifier.process_events()

    def create_notifier(self):
        self._wm = pyinotify.WatchManager()
        self._notifier = pyinotify.Notifier(self._wm, self)
        add_watch = self._wm.add_watch
        flags = pyinotify.IN_MODIFY | pyinotify.IN_ATTRIB
        for m in self._modules:
            add_watch(m, flags)

    def start(self):
        try:
            self.create_notifier()
            self._notifier.loop()
        finally:
            if self._wm:
                self._wm.close()
                # Notifier.close is called at the end of Notifier.loop
                self._wm = self._notifier = None

    def stop(self):
        pass

    def process_(self, event):
        self.on_change([event.path])

    process_IN_ATTRIB = process_IN_MODIFY = process_

    def on_change(self, modified):
        if self._on_change:
            return self._on_change(modified)


def default_implementation():
    if hasattr(select, 'kqueue') and eventio.kqueue is not None:
        return 'kqueue'
    elif sys.platform.startswith('linux') and pyinotify:
        return 'inotify'
    else:
        return 'stat'

implementations = {'kqueue': KQueueMonitor,
                   'inotify': InotifyMonitor,
                   'stat': StatMonitor}
Monitor = implementations[
    os.environ.get('CELERYD_FSNOTIFY') or default_implementation()]


class Autoreloader(bgThread):
    """Tracks changes in modules and fires reload commands"""
    Monitor = Monitor

    def __init__(self, controller, modules=None, monitor_cls=None, **options):
        super(Autoreloader, self).__init__()
        self.controller = controller
        app = self.controller.app
        self.modules = app.loader.task_modules if modules is None else modules
        self.options = options
        self._monitor = None
        self._hashes = None
        self.file_to_module = {}

    def on_init(self):
        files = self.file_to_module
        files.update(dict(
            (module_file(sys.modules[m]), m) for m in self.modules))

        self._monitor = self.Monitor(
            files, self.on_change,
            shutdown_event=self._is_shutdown, **self.options)
        self._hashes = dict([(f, file_hash(f)) for f in files])

    def register_with_event_loop(self, hub):
        if self._monitor is None:
            self.on_init()
        self._monitor.register_with_event_loop(hub)

    def on_event_loop_close(self, hub):
        if self._monitor is not None:
            self._monitor.on_event_loop_close(hub)

    def body(self):
        self.on_init()
        with ignore_errno('EINTR', 'EAGAIN'):
            self._monitor.start()

    def _maybe_modified(self, f):
        if os.path.exists(f):
            digest = file_hash(f)
            if digest != self._hashes[f]:
                self._hashes[f] = digest
                return True
        return False

    def on_change(self, files):
        modified = [f for f in files if self._maybe_modified(f)]
        if modified:
            names = [self.file_to_module[module] for module in modified]
            logger.info('Detected modified modules: %r', names)
            self._reload(names)

    def _reload(self, modules):
        self.controller.reload(modules, reload=True)

    def stop(self):
        if self._monitor:
            self._monitor.stop()