Repository URL to install this package:
|
Version:
0.8.1 ▾
|
try:
from typing import Any, Dict, Union
except ImportError:
pass
import six
from supertenant import consts
from supertenant.supermeter.data.base import BaseData
class CeleryData(BaseData):
def __init__(self, resource_type):
# type: (str) -> None
super(CeleryData, self).__init__(resource_type, consts.INTEGRATION_MODULE_PYTHON_CELERY)
def set_scheme(self, scheme):
# type: (str) -> None
self.set_tag(consts.LABEL_CELERY_BROKER_SCHEME, scheme)
def set_host(self, host):
# type: (str) -> None
self.set_tag(consts.LABEL_CELERY_BROKER_HOST, host)
def set_port(self, port):
# type: (str) -> None
self.set_tag(consts.LABEL_CELERY_BROKER_PORT, port)
def set_task(self, task_name):
# type: (str) -> None
self.set_tag(consts.LABEL_CELERY_TASK, task_name)
def set_task_id(self, task_id):
# type: (str) -> None
self.set_tag(consts.LABEL_CELERY_TASK_ID, task_id)
def set_routing_key(self, routing_key):
# type: (str) -> None
self.set_tag(consts.LABEL_CELERY_ROUTING_KEY, routing_key)
def set_exchange(self, exchange):
# type: (Union[str, object]) -> None
if isinstance(exchange, six.string_types):
exchange_str = str(exchange)
else:
exchange_str = str(getattr(exchange, "name", ""))
self.set_tag(consts.LABEL_CELERY_EXCHANGE, exchange_str)
def set_success(self, success):
# type: (str) -> None
self.set_tag(consts.LABEL_CELERY_SUCCESS, success)
def set_retry_reason(self, reason):
# type: (str) -> None
self.set_tag(consts.LABEL_CELERY_RETRY_REASON, reason)
def set_arg(self, n, v):
# type: (int, Any) -> None
self.set_tag("%s.%s" % (consts.LABEL_CELERY_ARGS, str(n)), str(v))
def set_kwarg(self, k, v):
# type: (str, Any) -> None
self.set_tag("%s.%s" % (consts.LABEL_CELERY_KWARGS, k), str(v))
def inject_routing_data(self, headers):
# type: (Dict[str, str]) -> None
routing_key = self.get_tag(consts.LABEL_CELERY_ROUTING_KEY)
if routing_key is not None:
headers[consts.LABEL_CELERY_ROUTING_KEY] = routing_key
exchange = self.get_tag(consts.LABEL_CELERY_EXCHANGE)
if exchange is not None:
headers[consts.LABEL_CELERY_EXCHANGE] = exchange
def extract_routing_data(self, headers):
# type: (Dict[str, str]) -> None
routing_key = headers.get(consts.LABEL_CELERY_ROUTING_KEY)
if routing_key is not None:
self.set_routing_key(routing_key)
exchange = headers.get(consts.LABEL_CELERY_EXCHANGE)
if exchange is not None:
self.set_exchange(exchange)
class CeleryClientData(CeleryData):
def __init__(self):
# type: () -> None
super(CeleryClientData, self).__init__(consts.RESOURCE_TYPE_CELERY_CLIENT)
self.set_span_type(consts.SPAN_TYPE_CLIENT_REQUEST)
class CeleryWorkerData(CeleryData):
def __init__(self):
# type: () -> None
super(CeleryWorkerData, self).__init__(consts.RESOURCE_TYPE_CELERY_WORKER)
self.set_span_type(consts.SPAN_TYPE_SERVER_REQUEST)