Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

hemamaps / celery   python

Repository URL to install this package:

Version: 3.1.23 

/ tests / backends / test_cassandra.py

from __future__ import absolute_import

import socket

from pickle import loads, dumps

from celery import states
from celery.exceptions import ImproperlyConfigured
from celery.tests.case import (
    AppCase, Mock, mock_module, depends_on_current_app,
)


class Object(object):
    pass


def install_exceptions(mod):
    # py3k: cannot catch exceptions not ineheriting from BaseException.

    class NotFoundException(Exception):
        pass

    class TException(Exception):
        pass

    class InvalidRequestException(Exception):
        pass

    class UnavailableException(Exception):
        pass

    class TimedOutException(Exception):
        pass

    class AllServersUnavailable(Exception):
        pass

    mod.NotFoundException = NotFoundException
    mod.TException = TException
    mod.InvalidRequestException = InvalidRequestException
    mod.TimedOutException = TimedOutException
    mod.UnavailableException = UnavailableException
    mod.AllServersUnavailable = AllServersUnavailable


class test_CassandraBackend(AppCase):

    def setup(self):
        self.app.conf.update(
            CASSANDRA_SERVERS=['example.com'],
            CASSANDRA_KEYSPACE='keyspace',
            CASSANDRA_COLUMN_FAMILY='columns',
        )

    def test_init_no_pycassa(self):
        with mock_module('pycassa'):
            from celery.backends import cassandra as mod
            prev, mod.pycassa = mod.pycassa, None
            try:
                with self.assertRaises(ImproperlyConfigured):
                    mod.CassandraBackend(app=self.app)
            finally:
                mod.pycassa = prev

    def test_init_with_and_without_LOCAL_QUROM(self):
        with mock_module('pycassa'):
            from celery.backends import cassandra as mod
            mod.pycassa = Mock()
            install_exceptions(mod.pycassa)
            cons = mod.pycassa.ConsistencyLevel = Object()
            cons.LOCAL_QUORUM = 'foo'

            self.app.conf.CASSANDRA_READ_CONSISTENCY = 'LOCAL_FOO'
            self.app.conf.CASSANDRA_WRITE_CONSISTENCY = 'LOCAL_FOO'

            mod.CassandraBackend(app=self.app)
            cons.LOCAL_FOO = 'bar'
            mod.CassandraBackend(app=self.app)

            # no servers raises ImproperlyConfigured
            with self.assertRaises(ImproperlyConfigured):
                self.app.conf.CASSANDRA_SERVERS = None
                mod.CassandraBackend(
                    app=self.app, keyspace='b', column_family='c',
                )

    @depends_on_current_app
    def test_reduce(self):
        with mock_module('pycassa'):
            from celery.backends.cassandra import CassandraBackend
            self.assertTrue(loads(dumps(CassandraBackend(app=self.app))))

    def test_get_task_meta_for(self):
        with mock_module('pycassa'):
            from celery.backends import cassandra as mod
            mod.pycassa = Mock()
            install_exceptions(mod.pycassa)
            mod.Thrift = Mock()
            install_exceptions(mod.Thrift)
            x = mod.CassandraBackend(app=self.app)
            Get_Column = x._get_column_family = Mock()
            get_column = Get_Column.return_value = Mock()
            get = get_column.get
            META = get.return_value = {
                'task_id': 'task_id',
                'status': states.SUCCESS,
                'result': '1',
                'date_done': 'date',
                'traceback': '',
                'children': None,
            }
            x.decode = Mock()
            x.detailed_mode = False
            meta = x._get_task_meta_for('task_id')
            self.assertEqual(meta['status'], states.SUCCESS)

            x.detailed_mode = True
            row = get.return_value = Mock()
            row.values.return_value = [Mock()]
            x.decode.return_value = META
            meta = x._get_task_meta_for('task_id')
            self.assertEqual(meta['status'], states.SUCCESS)
            x.decode.return_value = Mock()

            x.detailed_mode = False
            get.side_effect = KeyError()
            meta = x._get_task_meta_for('task_id')
            self.assertEqual(meta['status'], states.PENDING)

            calls = [0]
            end = [10]

            def work_eventually(*arg):
                try:
                    if calls[0] > end[0]:
                        return META
                    raise socket.error()
                finally:
                    calls[0] += 1
            get.side_effect = work_eventually
            x._retry_timeout = 10
            x._retry_wait = 0.01
            meta = x._get_task_meta_for('task')
            self.assertEqual(meta['status'], states.SUCCESS)

            x._retry_timeout = 0.1
            calls[0], end[0] = 0, 100
            with self.assertRaises(socket.error):
                x._get_task_meta_for('task')

    def test_store_result(self):
        with mock_module('pycassa'):
            from celery.backends import cassandra as mod
            mod.pycassa = Mock()
            install_exceptions(mod.pycassa)
            mod.Thrift = Mock()
            install_exceptions(mod.Thrift)
            x = mod.CassandraBackend(app=self.app)
            Get_Column = x._get_column_family = Mock()
            cf = Get_Column.return_value = Mock()
            x.detailed_mode = False
            x._store_result('task_id', 'result', states.SUCCESS)
            self.assertTrue(cf.insert.called)

            cf.insert.reset()
            x.detailed_mode = True
            x._store_result('task_id', 'result', states.SUCCESS)
            self.assertTrue(cf.insert.called)

    def test_process_cleanup(self):
        with mock_module('pycassa'):
            from celery.backends import cassandra as mod
            x = mod.CassandraBackend(app=self.app)
            x._column_family = None
            x.process_cleanup()

            x._column_family = True
            x.process_cleanup()
            self.assertIsNone(x._column_family)

    def test_get_column_family(self):
        with mock_module('pycassa'):
            from celery.backends import cassandra as mod
            mod.pycassa = Mock()
            install_exceptions(mod.pycassa)
            x = mod.CassandraBackend(app=self.app)
            self.assertTrue(x._get_column_family())
            self.assertIsNotNone(x._column_family)
            self.assertIs(x._get_column_family(), x._column_family)