Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / django-celery   python

Repository URL to install this package:

Version: 3.1.17 

/ tests / test_snapshot.py

from __future__ import absolute_import, unicode_literals

from datetime import datetime
from itertools import count
from time import time

from celery import states
from celery.events import Event as _Event
from celery.events.state import State, Worker, Task
from celery.utils import gen_unique_id

from djcelery import celery
from djcelery import snapshot
from djcelery import models
from djcelery.utils import make_aware
from djcelery.tests.utils import unittest
from djcelery.compat import unicode


_ids = count(0)
_clock = count(1)


def Event(*args, **kwargs):
    kwargs.setdefault('clock', next(_clock))
    kwargs.setdefault('local_received', time())
    return _Event(*args, **kwargs)


def create_task(worker, **kwargs):
    d = dict(uuid=gen_unique_id(),
             name='djcelery.unittest.task{0}'.format(next(_ids)),
             worker=worker)
    return Task(**dict(d, **kwargs))


class test_Camera(unittest.TestCase):
    Camera = snapshot.Camera

    def setUp(self):
        self.state = State()
        self.cam = self.Camera(self.state)

    def test_constructor(self):
        cam = self.Camera(State())
        self.assertTrue(cam.state)
        self.assertTrue(cam.freq)
        self.assertTrue(cam.cleanup_freq)
        self.assertTrue(cam.logger)

    def test_get_heartbeat(self):
        worker = Worker(hostname='fuzzie')
        self.assertIsNone(self.cam.get_heartbeat(worker))
        t1 = time()
        t2 = time()
        t3 = time()
        for t in t1, t2, t3:
            worker.event('heartbeat', t, t, {})
        self.state.workers[worker.hostname] = worker
        self.assertEqual(self.cam.get_heartbeat(worker),
                         make_aware(datetime.fromtimestamp(t3)))

    def test_handle_worker(self):
        worker = Worker(hostname='fuzzie')
        worker.event('online', time(), time(), {})
        self.cam._last_worker_write.clear()
        m = self.cam.handle_worker((worker.hostname, worker))
        self.assertTrue(m)
        self.assertTrue(m.hostname)
        self.assertTrue(m.last_heartbeat)
        self.assertTrue(m.is_alive())
        self.assertEqual(unicode(m), unicode(m.hostname))
        self.assertTrue(repr(m))

    def test_handle_task_received(self):
        worker = Worker(hostname='fuzzie')
        worker.event('oneline', time(), time(), {})
        self.cam.handle_worker((worker.hostname, worker))

        task = create_task(worker)
        task.event('received', time(), time(), {})
        self.assertEqual(task.state, 'RECEIVED')
        mt = self.cam.handle_task((task.uuid, task))
        self.assertEqual(mt.name, task.name)
        self.assertTrue(unicode(mt))
        self.assertTrue(repr(mt))
        mt.eta = celery.now()
        self.assertIn('eta', unicode(mt))
        self.assertIn(mt, models.TaskState.objects.active())

    def test_handle_task(self):
        worker1 = Worker(hostname='fuzzie')
        worker1.event('online', time(), time(), {})
        mw = self.cam.handle_worker((worker1.hostname, worker1))
        task1 = create_task(worker1)
        task1.event('received', time(), time(), {})
        mt = self.cam.handle_task((task1.uuid, task1))
        self.assertEqual(mt.worker, mw)

        worker2 = Worker(hostname=None)
        task2 = create_task(worker2)
        task2.event('received', time(), time(), {})
        mt = self.cam.handle_task((task2.uuid, task2))
        self.assertIsNone(mt.worker)

        task1.event('succeeded', time(), time(), {'result': 42})
        self.assertEqual(task1.state, states.SUCCESS)
        self.assertEqual(task1.result, 42)
        mt = self.cam.handle_task((task1.uuid, task1))
        self.assertEqual(mt.name, task1.name)
        self.assertEqual(mt.result, 42)

        task3 = create_task(worker1, name=None)
        task3.event('revoked', time(), time(), {})
        mt = self.cam.handle_task((task3.uuid, task3))
        self.assertIsNone(mt)

    def assertExpires(self, dec, expired, tasks=10):
        worker = Worker(hostname='fuzzie')
        worker.event('online', time(), time(), {})
        for total in range(tasks):
            task = create_task(worker)
            task.event('received', time() - dec, time() - dec, {})
            task.event('succeeded', time() - dec, time() - dec, {'result': 42})
            self.assertTrue(task.name)
            self.assertTrue(self.cam.handle_task((task.uuid, task)))
        self.assertEqual(self.cam.on_cleanup(), expired)

    def test_on_cleanup_expires(self, dec=332000):
        self.assertExpires(dec, 10)

    def test_on_cleanup_does_not_expire_new(self, dec=0):
        self.assertExpires(dec, 0)

    def test_on_shutter(self):
        state = self.state
        cam = self.cam

        ws = ['worker1.ex.com', 'worker2.ex.com', 'worker3.ex.com']
        uus = [gen_unique_id() for i in range(50)]

        events = [Event('worker-online', hostname=ws[0]),
                  Event('worker-online', hostname=ws[1]),
                  Event('worker-online', hostname=ws[2]),
                  Event('task-received',
                        uuid=uus[0], name='A', hostname=ws[0]),
                  Event('task-started',
                        uuid=uus[0], name='A', hostname=ws[0]),
                  Event('task-received',
                        uuid=uus[1], name='B', hostname=ws[1]),
                  Event('task-revoked',
                        uuid=uus[2], name='C', hostname=ws[2])]

        for event in events:
            event['local_received'] = time()
            state.event(event)
        cam.on_shutter(state)

        for host in ws:
            worker = models.WorkerState.objects.get(hostname=host)
            self.assertTrue(worker.is_alive())

        t1 = models.TaskState.objects.get(task_id=uus[0])
        self.assertEqual(t1.state, 'STARTED')
        self.assertEqual(t1.name, 'A')
        t2 = models.TaskState.objects.get(task_id=uus[1])
        self.assertEqual(t2.state, 'RECEIVED')
        t3 = models.TaskState.objects.get(task_id=uus[2])
        self.assertEqual(t3.state, 'REVOKED')

        events = [Event('task-succeeded',
                        uuid=uus[0], hostname=ws[0], result=42),
                  Event('task-failed',
                        uuid=uus[1], exception="KeyError('foo')",
                        hostname=ws[1]),
                  Event('worker-offline', hostname=ws[0])]
        list(map(state.event, events))
        cam._last_worker_write.clear()
        cam.on_shutter(state)

        w1 = models.WorkerState.objects.get(hostname=ws[0])
        self.assertFalse(w1.is_alive())

        t1 = models.TaskState.objects.get(task_id=uus[0])
        self.assertEqual(t1.state, 'SUCCESS')
        self.assertEqual(t1.result, '42')
        self.assertEqual(t1.worker, w1)

        t2 = models.TaskState.objects.get(task_id=uus[1])
        self.assertEqual(t2.state, 'FAILURE')
        self.assertEqual(t2.result, "KeyError('foo')")
        self.assertEqual(t2.worker.hostname, ws[1])

        cam.on_shutter(state)