Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / django-celery   python

Repository URL to install this package:

Version: 3.1.17 

/ backends / database.py

from __future__ import absolute_import, unicode_literals

from celery import current_app
from celery.backends.base import BaseDictBackend
from celery.utils.timeutils import maybe_timedelta

from ..models import TaskMeta, TaskSetMeta


class DatabaseBackend(BaseDictBackend):
    """The database backend.

    Using Django models to store task state.

    """
    TaskModel = TaskMeta
    TaskSetModel = TaskSetMeta

    expires = current_app.conf.CELERY_TASK_RESULT_EXPIRES
    create_django_tables = True

    subpolling_interval = 0.5

    def _store_result(self, task_id, result, status,
                      traceback=None, request=None):
        """Store return value and status of an executed task."""
        self.TaskModel._default_manager.store_result(
            task_id, result, status,
            traceback=traceback, children=self.current_task_children(request),
        )
        return result

    def _save_group(self, group_id, result):
        """Store the result of an executed group."""
        self.TaskSetModel._default_manager.store_result(group_id, result)
        return result

    def _get_task_meta_for(self, task_id):
        """Get task metadata for a task by id."""
        return self.TaskModel._default_manager.get_task(task_id).to_dict()

    def _restore_group(self, group_id):
        """Get group metadata for a group by id."""
        meta = self.TaskSetModel._default_manager.restore_taskset(group_id)
        if meta:
            return meta.to_dict()

    def _delete_group(self, group_id):
        self.TaskSetModel._default_manager.delete_taskset(group_id)

    def _forget(self, task_id):
        try:
            self.TaskModel._default_manager.get(task_id=task_id).delete()
        except self.TaskModel.DoesNotExist:
            pass

    def cleanup(self):
        """Delete expired metadata."""
        expires = maybe_timedelta(self.expires)
        for model in self.TaskModel, self.TaskSetModel:
            model._default_manager.delete_expired(expires)