Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
supermeter / supermeter / brainless_brain.py
Size: Mime:
try:
    from typing import Dict, Optional, Tuple
except ImportError:
    pass

from os import environ

from supertenant import consts
from supertenant.superbrain import (
    ENV_KEY_INTEGRATION_CIRCUIT_BREAKERS,
    ENV_KEY_SERVERLESS,
    ActionDict,
    Brain,
    PollKey,
    SpanID,
)


class BrainlessBrain(Brain):
    def __init__(self):
        # type: () -> None
        self._auto_inc_task_id = 0
        self._integration_circuit_breakers = set(
            environ.get(ENV_KEY_INTEGRATION_CIRCUIT_BREAKERS, "").lower().split(",")
        )
        self._serverless = environ.get(ENV_KEY_SERVERLESS, "").lower() in (
            "y",
            "yes",
            "true",
            "t",
            "1",
        )

    def init_lib(self):
        # type: () -> bool
        return True

    def open_span(self, task_id, span_type, input):
        # type: (int, int, Dict[str, str]) -> Tuple[Optional[SpanID], Optional[ActionDict], Optional[PollKey]]
        # returns uuid, ActionDict with action = ACTION_EXECUTE, None
        return SpanID([]), {"Action": consts.ACTION_EXECUTE, "Definition": {}}, None

    def poll_action(self, poll_key, span_id):
        # type: (int, SpanID) -> Optional[ActionDict]
        return {"Action": consts.ACTION_EXECUTE, "Definition": {}}

    def close_span(self, span_id, data=None):
        # type: (SpanID, Optional[Dict[str, str]]) -> bool
        return True

    def create_task(self, parent_task_id, task_id, task_type):
        # type: (Optional[int], int, int) -> bool
        return True

    def create_task_auto_inc(self, parent_task_id, task_type):
        # type: (Optional[int], int) -> int
        self._auto_inc_task_id += 1
        return self._auto_inc_task_id

    def task_done(self, task_id):
        # type: (int) -> bool
        return True

    def log(self, level, msg):
        # type: (int, str) -> None
        pass

    def get_config(self):
        # type: () -> Optional[str]
        return None

    def set_env(self, key, value):
        # type: (str, str) -> None
        pass

    def shutdown(self, timeout_ms=0):
        # type: (int) -> bool
        return True

    def is_serverless(self):
        # type: () -> bool
        return self._serverless

    def is_integration_circuit_breaker_enabled(self, integration_module):
        # type: (str) -> bool
        return integration_module in self._integration_circuit_breakers