Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
celery / backends / elasticsearch.py
Size: Mime:
# -* coding: utf-8 -*-
"""Elasticsearch result store backend."""
from __future__ import absolute_import, unicode_literals

from datetime import datetime

from kombu.utils.encoding import bytes_to_str
from kombu.utils.url import _parse_url

from celery.exceptions import ImproperlyConfigured
from celery.five import items

from .base import KeyValueStoreBackend

try:
    import elasticsearch
except ImportError:
    elasticsearch = None  # noqa

__all__ = ('ElasticsearchBackend',)

E_LIB_MISSING = """\
You need to install the elasticsearch library to use the Elasticsearch \
result backend.\
"""


class ElasticsearchBackend(KeyValueStoreBackend):
    """Elasticsearch Backend.

    Raises:
        celery.exceptions.ImproperlyConfigured:
            if module :pypi:`elasticsearch` is not available.
    """

    index = 'celery'
    doc_type = 'backend'
    scheme = 'http'
    host = 'localhost'
    port = 9200
    es_retry_on_timeout = False
    es_timeout = 10
    es_max_retries = 3

    def __init__(self, url=None, *args, **kwargs):
        super(ElasticsearchBackend, self).__init__(*args, **kwargs)
        self.url = url
        _get = self.app.conf.get

        if elasticsearch is None:
            raise ImproperlyConfigured(E_LIB_MISSING)

        index = doc_type = scheme = host = port = None

        if url:
            scheme, host, port, _, _, path, _ = _parse_url(url)  # noqa
            if path:
                path = path.strip('/')
                index, _, doc_type = path.partition('/')

        self.index = index or self.index
        self.doc_type = doc_type or self.doc_type
        self.scheme = scheme or self.scheme
        self.host = host or self.host
        self.port = port or self.port

        self.es_retry_on_timeout = (
            _get('elasticsearch_retry_on_timeout') or self.es_retry_on_timeout
        )

        es_timeout = _get('elasticsearch_timeout')
        if es_timeout is not None:
            self.es_timeout = es_timeout

        es_max_retries = _get('elasticsearch_max_retries')
        if es_max_retries is not None:
            self.es_max_retries = es_max_retries

        self._server = None

    def get(self, key):
        try:
            res = self.server.get(
                index=self.index,
                doc_type=self.doc_type,
                id=key,
            )
            try:
                if res['found']:
                    return res['_source']['result']
            except (TypeError, KeyError):
                pass
        except elasticsearch.exceptions.NotFoundError:
            pass

    def set(self, key, value):
        try:
            self._index(
                id=key,
                body={
                    'result': value,
                    '@timestamp': '{0}Z'.format(
                        datetime.utcnow().isoformat()[:-3]
                    ),
                },
            )
        except elasticsearch.exceptions.ConflictError:
            # document already exists, update it
            data = self.get(key)
            data[key] = value
            self._index(key, data, refresh=True)

    def _index(self, id, body, **kwargs):
        body = {bytes_to_str(k): v for k, v in items(body)}
        return self.server.index(
            id=bytes_to_str(id),
            index=self.index,
            doc_type=self.doc_type,
            body=body,
            **kwargs
        )

    def mget(self, keys):
        return [self.get(key) for key in keys]

    def delete(self, key):
        self.server.delete(index=self.index, doc_type=self.doc_type, id=key)

    def _get_server(self):
        """Connect to the Elasticsearch server."""
        return elasticsearch.Elasticsearch(
            '%s:%s' % (self.host, self.port),
            retry_on_timeout=self.es_retry_on_timeout,
            max_retries=self.es_max_retries,
            timeout=self.es_timeout
        )

    @property
    def server(self):
        if self._server is None:
            self._server = self._get_server()
        return self._server