Repository URL to install this package:
|
Version:
0.8.1 ▾
|
from __future__ import absolute_import
try:
from typing import Any, Dict, Optional, Tuple
except ImportError:
pass
import celery
from supertenant.supermeter.scope_manager import Span
_SPAN_KEY = "_supertenant_spans"
def get_task_id(headers, body):
# type: (Optional[Dict[str, str]], Any) -> Optional[str]
id = None # type: Optional[str]
if headers is not None:
id = headers.get("id", None)
if id is None and isinstance(body, dict):
# version 1: https://docs.celeryq.dev/en/stable/internals/protocol.html#version-1
id = body.get("id", None)
return id
def task_catalog_push(task, task_id, span, is_consumer):
# type: (celery.Task[Any, Any], str, Span, bool) -> None
if not hasattr(task, _SPAN_KEY):
catalog = {} # type: Dict[Tuple[str, bool], Span]
setattr(task, _SPAN_KEY, catalog)
else:
catalog = getattr(task, _SPAN_KEY)
key = (task_id, is_consumer)
catalog[key] = span
def task_catalog_pop(task, task_id, is_consumer):
# type: (celery.Task[Any, Any], str, bool) -> Optional[Span]
catalog = getattr(task, _SPAN_KEY, None)
if catalog is None:
return None
key = (task_id, is_consumer)
return catalog.pop(key, None) # type: ignore
def task_catalog_get(task, task_id, is_consumer):
# type: (celery.Task[Any, Any], str, bool) -> Optional[Span]
catalog = getattr(task, _SPAN_KEY, None)
if catalog is None:
return None
key = (task_id, is_consumer)
return catalog.get(key, None) # type: ignore