Repository URL to install this package:
|
Version:
0.8.1 ▾
|
try:
from typing import Optional, Tuple
from supertenant.superbrain import ActionDict, PollKey, SpanID
except ImportError:
pass
from supertenant.supermeter.data.celery_data import CeleryData
from supertenant.supermeter.managers.basic_manager import BaseManager
class ClientWorkerManager(BaseManager):
def __init__(self):
# type: () -> None
super(ClientWorkerManager, self).__init__()
@classmethod
def open_span(cls, data):
# type: (CeleryData) -> Tuple[Optional[SpanID], Optional[ActionDict], Optional[PollKey]]
span_type = data.get_span_type()
if span_type is None:
raise Exception("expected a span_type")
return super(ClientWorkerManager, cls).open_span_with_type(span_type, data)