Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
supermeter / supermeter / data / celery_data.py
Size: Mime:
try:
    from typing import Any, Dict, Union
except ImportError:
    pass

import six

from supertenant import consts
from supertenant.supermeter.data.base import BaseData


class CeleryData(BaseData):
    def __init__(self, resource_type):
        # type: (str) -> None
        super(CeleryData, self).__init__(resource_type, consts.INTEGRATION_MODULE_PYTHON_CELERY)

    def set_scheme(self, scheme):
        # type: (str) -> None
        self.set_tag(consts.LABEL_CELERY_BROKER_SCHEME, scheme)

    def set_host(self, host):
        # type: (str) -> None
        self.set_tag(consts.LABEL_CELERY_BROKER_HOST, host)

    def set_port(self, port):
        # type: (str) -> None
        self.set_tag(consts.LABEL_CELERY_BROKER_PORT, port)

    def set_task(self, task_name):
        # type: (str) -> None
        self.set_tag(consts.LABEL_CELERY_TASK, task_name)

    def set_task_id(self, task_id):
        # type: (str) -> None
        self.set_tag(consts.LABEL_CELERY_TASK_ID, task_id)

    def set_routing_key(self, routing_key):
        # type: (str) -> None
        self.set_tag(consts.LABEL_CELERY_ROUTING_KEY, routing_key)

    def set_exchange(self, exchange):
        # type: (Union[str, object]) -> None
        if isinstance(exchange, six.string_types):
            exchange_str = str(exchange)
        else:
            exchange_str = str(getattr(exchange, "name", ""))

        self.set_tag(consts.LABEL_CELERY_EXCHANGE, exchange_str)

    def set_success(self, success):
        # type: (str) -> None
        self.set_tag(consts.LABEL_CELERY_SUCCESS, success)

    def set_retry_reason(self, reason):
        # type: (str) -> None
        self.set_tag(consts.LABEL_CELERY_RETRY_REASON, reason)

    def set_arg(self, n, v):
        # type: (int, Any) -> None
        self.set_tag("%s.%s" % (consts.LABEL_CELERY_ARGS, str(n)), str(v))

    def set_kwarg(self, k, v):
        # type: (str, Any) -> None
        self.set_tag("%s.%s" % (consts.LABEL_CELERY_KWARGS, k), str(v))

    def inject_routing_data(self, headers):
        # type: (Dict[str, str]) -> None
        routing_key = self.get_tag(consts.LABEL_CELERY_ROUTING_KEY)
        if routing_key is not None:
            headers[consts.LABEL_CELERY_ROUTING_KEY] = routing_key
        exchange = self.get_tag(consts.LABEL_CELERY_EXCHANGE)
        if exchange is not None:
            headers[consts.LABEL_CELERY_EXCHANGE] = exchange

    def extract_routing_data(self, headers):
        # type: (Dict[str, str]) -> None
        routing_key = headers.get(consts.LABEL_CELERY_ROUTING_KEY)
        if routing_key is not None:
            self.set_routing_key(routing_key)
        exchange = headers.get(consts.LABEL_CELERY_EXCHANGE)
        if exchange is not None:
            self.set_exchange(exchange)


class CeleryClientData(CeleryData):
    def __init__(self):
        # type: () -> None
        super(CeleryClientData, self).__init__(consts.RESOURCE_TYPE_CELERY_CLIENT)
        self.set_span_type(consts.SPAN_TYPE_CLIENT_REQUEST)


class CeleryWorkerData(CeleryData):
    def __init__(self):
        # type: () -> None
        super(CeleryWorkerData, self).__init__(consts.RESOURCE_TYPE_CELERY_WORKER)
        self.set_span_type(consts.SPAN_TYPE_SERVER_REQUEST)