Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / django-celery   python

Repository URL to install this package:

Version: 3.1.17 

/ tests / test_backends / test_database.py

from __future__ import absolute_import, unicode_literals

import celery

from datetime import timedelta

from celery import current_app
from celery import states
from celery.result import AsyncResult
from celery.task import PeriodicTask
from celery.utils import gen_unique_id

from djcelery.app import app
from djcelery.backends.database import DatabaseBackend
from djcelery.utils import now
from djcelery.tests.utils import unittest


class SomeClass(object):

    def __init__(self, data):
        self.data = data


class MyPeriodicTask(PeriodicTask):
    name = 'c.u.my-periodic-task-244'
    run_every = timedelta(seconds=1)

    def run(self, **kwargs):
        return 42


class TestDatabaseBackend(unittest.TestCase):

    def test_backend(self):
        b = DatabaseBackend(app=app)
        tid = gen_unique_id()

        self.assertEqual(b.get_status(tid), states.PENDING)
        self.assertIsNone(b.get_result(tid))

        b.mark_as_done(tid, 42)
        self.assertEqual(b.get_status(tid), states.SUCCESS)
        self.assertEqual(b.get_result(tid), 42)

        tid2 = gen_unique_id()
        result = {'foo': 'baz', 'bar': SomeClass(12345)}
        b.mark_as_done(tid2, result)
        # is serialized properly.
        rindb = b.get_result(tid2)
        self.assertEqual(rindb.get('foo'), 'baz')
        self.assertEqual(rindb.get('bar').data, 12345)

        tid3 = gen_unique_id()
        try:
            raise KeyError('foo')
        except KeyError as exception:
            b.mark_as_failure(tid3, exception)

        self.assertEqual(b.get_status(tid3), states.FAILURE)
        self.assertIsInstance(b.get_result(tid3), KeyError)

    def test_forget(self):
        b = DatabaseBackend(app=app)
        tid = gen_unique_id()
        b.mark_as_done(tid, {'foo': 'bar'})
        x = AsyncResult(tid)
        self.assertEqual(x.result.get('foo'), 'bar')
        x.forget()
        if celery.VERSION[0:3] == (3, 1, 10):
            # bug in 3.1.10 means result did not clear cache after forget.
            x._cache = None
        self.assertIsNone(x.result)

    def test_group_store(self):
        b = DatabaseBackend(app=app)
        tid = gen_unique_id()

        self.assertIsNone(b.restore_group(tid))

        result = {'foo': 'baz', 'bar': SomeClass(12345)}
        b.save_group(tid, result)
        rindb = b.restore_group(tid)
        self.assertIsNotNone(rindb)
        self.assertEqual(rindb.get('foo'), 'baz')
        self.assertEqual(rindb.get('bar').data, 12345)
        b.delete_group(tid)
        self.assertIsNone(b.restore_group(tid))

    def test_cleanup(self):
        b = DatabaseBackend(app=app)
        b.TaskModel._default_manager.all().delete()
        ids = [gen_unique_id() for _ in range(3)]
        for i, res in enumerate((16, 32, 64)):
            b.mark_as_done(ids[i], res)

        self.assertEqual(b.TaskModel._default_manager.count(), 3)

        then = now() - current_app.conf.CELERY_TASK_RESULT_EXPIRES * 2
        # Have to avoid save() because it applies the auto_now=True.
        b.TaskModel._default_manager.filter(task_id__in=ids[:-1]) \
                                    .update(date_done=then)

        b.cleanup()
        self.assertEqual(b.TaskModel._default_manager.count(), 1)