Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
supermeter / supermeter / managers / celery / catalog.py
Size: Mime:
from __future__ import absolute_import

try:
    from typing import Any, Dict, Optional, Tuple
except ImportError:
    pass

import celery

from supertenant.supermeter.scope_manager import Span

_SPAN_KEY = "_supertenant_spans"


def get_task_id(headers, body):
    # type: (Optional[Dict[str, str]], Any) -> Optional[str]
    id = None  # type: Optional[str]
    if headers is not None:
        id = headers.get("id", None)
    if id is None and isinstance(body, dict):
        # version 1: https://docs.celeryq.dev/en/stable/internals/protocol.html#version-1
        id = body.get("id", None)
    return id


def task_catalog_push(task, task_id, span, is_consumer):
    # type: (celery.Task[Any, Any], str, Span, bool) -> None
    if not hasattr(task, _SPAN_KEY):
        catalog = {}  # type: Dict[Tuple[str, bool], Span]
        setattr(task, _SPAN_KEY, catalog)
    else:
        catalog = getattr(task, _SPAN_KEY)

    key = (task_id, is_consumer)
    catalog[key] = span


def task_catalog_pop(task, task_id, is_consumer):
    # type: (celery.Task[Any, Any], str, bool) -> Optional[Span]
    catalog = getattr(task, _SPAN_KEY, None)
    if catalog is None:
        return None

    key = (task_id, is_consumer)
    return catalog.pop(key, None)  # type: ignore


def task_catalog_get(task, task_id, is_consumer):
    # type: (celery.Task[Any, Any], str, bool) -> Optional[Span]
    catalog = getattr(task, _SPAN_KEY, None)
    if catalog is None:
        return None

    key = (task_id, is_consumer)
    return catalog.get(key, None)  # type: ignore