Repository URL to install this package:
|
Version:
0.8.1 ▾
|
try:
from typing import Dict, Optional, Tuple
except ImportError:
pass
from os import environ
from supertenant import consts
from supertenant.superbrain import (
ENV_KEY_INTEGRATION_CIRCUIT_BREAKERS,
ENV_KEY_SERVERLESS,
ActionDict,
Brain,
PollKey,
SpanID,
)
class BrainlessBrain(Brain):
def __init__(self):
# type: () -> None
self._auto_inc_task_id = 0
self._integration_circuit_breakers = set(
environ.get(ENV_KEY_INTEGRATION_CIRCUIT_BREAKERS, "").lower().split(",")
)
self._serverless = environ.get(ENV_KEY_SERVERLESS, "").lower() in (
"y",
"yes",
"true",
"t",
"1",
)
def init_lib(self):
# type: () -> bool
return True
def open_span(self, task_id, span_type, input):
# type: (int, int, Dict[str, str]) -> Tuple[Optional[SpanID], Optional[ActionDict], Optional[PollKey]]
# returns uuid, ActionDict with action = ACTION_EXECUTE, None
return SpanID([]), {"Action": consts.ACTION_EXECUTE, "Definition": {}}, None
def poll_action(self, poll_key, span_id):
# type: (int, SpanID) -> Optional[ActionDict]
return {"Action": consts.ACTION_EXECUTE, "Definition": {}}
def close_span(self, span_id, data=None):
# type: (SpanID, Optional[Dict[str, str]]) -> bool
return True
def create_task(self, parent_task_id, task_id, task_type):
# type: (Optional[int], int, int) -> bool
return True
def create_task_auto_inc(self, parent_task_id, task_type):
# type: (Optional[int], int) -> int
self._auto_inc_task_id += 1
return self._auto_inc_task_id
def task_done(self, task_id):
# type: (int) -> bool
return True
def log(self, level, msg):
# type: (int, str) -> None
pass
def get_config(self):
# type: () -> Optional[str]
return None
def set_env(self, key, value):
# type: (str, str) -> None
pass
def shutdown(self, timeout_ms=0):
# type: (int) -> bool
return True
def is_serverless(self):
# type: () -> bool
return self._serverless
def is_integration_circuit_breaker_enabled(self, integration_module):
# type: (str) -> bool
return integration_module in self._integration_circuit_breakers