Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
cronitor / cronitor / celery.py
Size: Mime:
import typing
import datetime
import humanize
import logging
from cronitor import State, Monitor
import cronitor
import functools
import shutil
import tempfile
import sys

logger = logging.getLogger(__name__)
try:
    import celery
    import celery.beat
    from celery.schedules import crontab, schedule, solar
    from celery.signals import beat_init, task_prerun, task_failure, task_success, task_retry

    if typing.TYPE_CHECKING:
        from typing import Dict, List, Union, Optional, Tuple
        import billiard.einfo
        from celery.worker.request import Request
except ImportError:
    logger.error("Cannot use the cronitor.celery module without celery installed")
    sys.exit(1)

# For the signals to properly register, they need to be top-level objects.
# Since they are defined dynamically in initialize(), we have to declare them up top,
# make them global, and override them.
celerybeat_startup = None
ping_monitor_before_task = None
ping_monitor_on_success = None
ping_monitor_on_failure = None
ping_monitor_on_retry = None


def get_headers_from_task(task):  # type: (celery.Task) -> Dict
    headers = task.request.headers or {}
    headers.update(task.request.get('properties', {}).get('application_headers', {}))
    return headers


def initialize(app, celerybeat_only=False, api_key=None):  # type: (celery.Celery, bool, Optional[str]) -> None
    if api_key:
        cronitor.api_key = api_key

    if celerybeat_only:
        cronitor.celerybeat_only = True

    global celerybeat_startup
    global ping_monitor_before_task
    global ping_monitor_on_success
    global ping_monitor_on_failure
    global ping_monitor_on_retry

    def celerybeat_startup(sender, **kwargs):  # type: (celery.beat.Service, Dict) -> None
        # To avoid recursion, since restarting celerybeat will result in this
        # signal being called again, we disconnect the signal.
        beat_init.disconnect(celerybeat_startup, dispatch_uid=1)

        # Must use the cached_property from scheduler so as not to re-open the shelve database
        scheduler = sender.scheduler  # type: celery.beat.Scheduler
        # Also need to use the property here, including for django-celery-beat
        schedules = scheduler.schedule
        monitors = []  # type: List[Dict[str, str]]

        add_periodic_task_deferred = []
        for name in schedules:
            if name.startswith('celery.'):
                continue
            entry = schedules[name]  # type: celery.beat.ScheduleEntry

            # ignore all celerybeat scheduled events with the Cronitor exclusion header
            headers = entry.options.pop('headers', {})
            if headers.get('x-cronitor-exclude') in (True, 'true', 'True'):
                logger.info("celerybeat entry '{}' ignored per exclusion header".format(name))
                continue

            item = entry.schedule  # type: celery.schedules.schedule
            if isinstance(item, crontab):
                cronitor_schedule = ('{0._orig_minute} {0._orig_hour} {0._orig_day_of_week} {0._orig_day_of_month} '
                                     '{0._orig_month_of_year}').format(item)
            elif isinstance(item, schedule):
                freq = item.run_every  # type: datetime.timedelta
                cronitor_schedule = 'every ' + humanize.precisedelta(freq)
            elif isinstance(item, solar):
                # We don't support solar schedules
                logger.warning("The cronitor-python celery module does not support "
                               "tasks using solar schedules. Task schedule '{}' will "
                               "not be monitored".format(name))
                continue
            else:
                logger.warning("The cronitor-python celery module does not support "
                               "schedules of type `{}`".format(type(item)))
                continue

            monitors.append({
                'type': 'job',
                'key': name,
                'schedule': cronitor_schedule,
            })

            headers.update({
                'x-cronitor-task-origin': 'celerybeat',
                'x-cronitor-celerybeat-name': name,
            })

            add_periodic_task_deferred.append(
                functools.partial(app.add_periodic_task,
                                  entry.schedule,
                                  # Setting headers in the signature
                                  # works better than in periodic task options
                                  app.tasks.get(entry.task).s().set(headers=headers),
                                  args=entry.args, kwargs=entry.kwargs,
                                  name=entry.name, **(entry.options or {}))
            )

        if isinstance(sender.scheduler, celery.beat.PersistentScheduler):
            # The celerybeat-schedule file with shelve gets corrupted really easily, so we need
            # to set up a tempfile instead.
            new_schedule = tempfile.NamedTemporaryFile()
            with open(sender.schedule_filename, 'rb') as current_schedule:
                shutil.copyfileobj(current_schedule, new_schedule)
            # We need to stop and restart celerybeat to get the task updates in place.
            # This isn't ideal, but seems to work.

            sender.stop()
            # Now, actually add all the periodic tasks to overwrite beat with the headers
            for task in add_periodic_task_deferred:
                task()
            # Then, restart celerybeat, on the new schedule file (copied from the old one)
            app.Beat(schedule=new_schedule.name).run()

        else:
            # For django-celery, etc., we don't need to stop and restart celerybeat
            for task in add_periodic_task_deferred:
                task()

        logger.debug("[Cronitor] creating monitors: %s", [m['key'] for m in monitors])
        Monitor.put(monitors)

    beat_init.connect(celerybeat_startup, dispatch_uid=1)

    @task_prerun.connect
    def ping_monitor_before_task(sender, **kwargs):  # type: (celery.Task, Dict) -> None
        headers = get_headers_from_task(sender)
        if 'x-cronitor-celerybeat-name' in headers:
            monitor = Monitor(headers['x-cronitor-celerybeat-name'])
        elif not cronitor.celerybeat_only:
            monitor = Monitor(sender.name)
        else:
            return

        monitor.ping(state=State.RUN, series=sender.request.id)

    @task_success.connect
    def ping_monitor_on_success(sender, **kwargs):  # type: (celery.Task, Dict) -> None
        headers = get_headers_from_task(sender)
        if 'x-cronitor-celerybeat-name' in headers:
            monitor = Monitor(headers['x-cronitor-celerybeat-name'])
        elif not cronitor.celerybeat_only:
            monitor = Monitor(sender.name)
        else:
            return

        monitor.ping(state=State.COMPLETE, series=sender.request.id)

    @task_failure.connect
    def ping_monitor_on_failure(sender,  # type: celery.Task
                                task_id,  # type: str
                                exception,  # type: Exception
                                args,  # type: Tuple
                                kwargs,  # type: Dict
                                traceback,
                                einfo,  # type: billiard.einfo.ExceptionInfo
                                **kwargs2  # type: Dict
                                ):
        headers = get_headers_from_task(sender)
        if 'x-cronitor-celerybeat-name' in headers:
            monitor = Monitor(headers['x-cronitor-celerybeat-name'])
        elif not cronitor.celerybeat_only:
            monitor = Monitor(sender.name)
        else:
            return

        monitor.ping(state=State.FAIL, series=sender.request.id, message=str(exception))

    @task_retry.connect
    def ping_monitor_on_retry(sender,  # type: celery.Task
                              request,  # type: celery.worker.request.Request
                              reason,  # type: Union[Exception, str]
                              einfo,  # type: billiard.einfo.ExceptionInfo
                              **kwargs,  # type: Dict
                              ):
        headers = get_headers_from_task(sender)
        if 'x-cronitor-celerybeat-name' in headers:
            monitor = Monitor(headers['x-cronitor-celerybeat-name'])
        elif not cronitor.celerybeat_only:
            monitor = Monitor(sender.name)
        else:
            return

        monitor.ping(state=State.FAIL, series=sender.request.id, message=str(reason))