Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
supermeter / superbrain / __init__.py
Size: Mime:
try:
    from typing import Any, Dict, List, Optional, Tuple, Union, cast  # noqa: F401 (flake8 complains)

    # TODO: Fix types in action definition in proto/brain to avoid Any type here
    ActionDefinition = Dict[str, Any]
    ActionDict = Dict[str, Union[ActionDefinition, int]]
except ImportError:
    Any = None

    def cast(_, x):  # type: ignore
        return x


import json
import sys
import threading
from os import environ

from supertenant import consts

ENV_KEY_INTEGRATION_CIRCUIT_BREAKERS = "SUPERTENANT_SUPERMETER_INTEGRATION_CIRCUIT_BREAKERS"
ENV_KEY_SERVERLESS = "SUPERTENANT_SUPERBRAIN_SERVERLESS"  # same env key used in superbrain


class SpanID:
    def __init__(self, ffi_array, action=None):
        # type: (List[Any], Optional[Any]) -> None
        self._ffi_array = ffi_array
        self._action = action

    def __repr__(self):
        # type: () -> str
        return repr(self._ffi_array)


# py2.7: use enum.Enum with py3.4+
Action = int
PollKey = int


def _bytes_dict_to_char_pp(ffi, input):
    # type: (Any, Dict[str, str]) -> Tuple[Any, List[Any]]
    input_arr = []
    for k, v in input.items():
        input_arr.append(ffi.new("char[]", str(k).encode("utf-8", "ignore")))
        input_arr.append(ffi.new("char[]", str(v).encode("utf-8", "ignore")))
    # we must return input_arr otherwise cPython may free it
    return ffi.new("char *[]", input_arr), input_arr


# Using threading.local to keep action mapping to avoid locking on change
config_holder = threading.local()


# CU-86bwvf72a pypy 2.7 5.10 comes w/ cffi 1.11 that doesn't have a release() function.
def _cffi_release(cffi, ptr):
    # type: (Any, Any) -> None
    if hasattr(cffi, "release"):
        cffi.release(ptr)


class Brain:
    def __init__(self, supermeter_version, config_file_path=None):
        # type: (str, Optional[str]) -> None
        self.supermeter_version = supermeter_version
        self.config_file_path = config_file_path
        self._lib = None  # type: Optional[Any]
        self._ffi = None  # type: Optional[Any]
        self._initialized = False
        self._env_vars = []  # type: List[Tuple[str, str]]
        self._integration_circuit_breakers = set(
            environ.get(ENV_KEY_INTEGRATION_CIRCUIT_BREAKERS, "").lower().split(",")
        )
        self._serverless = environ.get(ENV_KEY_SERVERLESS, "").lower() in (
            "y",
            "yes",
            "true",
            "t",
            "1",
        )
        self._lock = threading.Lock()

    def init_lib(self):
        # type: () -> bool
        """
        initialization could happen from 2 requests at the same time and therefore should be thread-safe.
        the locking mechanism ensures the initialization only takes places once
        the _initialized is used to avoid the locking mechanism once the initialization is completed.
        """
        if self._initialized:
            return self._lib is not None

        with self._lock:
            if self._initialized:
                # previous thread already handled the init, avoid running twice
                return self._lib is not None
            self._lib = None
            self._ffi = None
            try:
                from _superbrain import ffi, lib  # type: ignore

                self._lib = lib
                self._ffi = cast(Any, ffi)
            except ImportError:
                import traceback

                sys.stderr.write("[supertenant-supermeter] fatal error: Brain.init_lib failed on ImportError:\n")
                traceback.print_exc()
            else:
                # CU-860t13kwj - set all env vars set in Python
                # py2.7 (need to do if/else in separate lines because of mypy3 vs. mypy2.7 checks)
                if sys.version_info >= (3, 0):
                    iteritems = environ.items
                else:
                    iteritems = environ.iteritems
                for key, value in iteritems():
                    if key.startswith("SUPERTENANT_") or key.startswith("SUPERBRAIN_"):
                        self._set_env(key, value)
                # Now set the explicit env vars asked by code
                for key, value in self._env_vars:
                    self._set_env(key, value)

                platform = "python"
                platform_version = sys.version
                sm_version_ptr = self._ffi.new("char[]", self.supermeter_version.encode("utf-8", "ignore"))
                if self.config_file_path:
                    config_file_ptr = self._ffi.new("char[]", self.config_file_path.encode("utf-8", "ignore"))
                else:
                    config_file_ptr = self._ffi.NULL
                platform_ptr = self._ffi.new("char[]", platform.encode("utf-8", "ignore"))
                platform_version_ptr = self._ffi.new("char[]", platform_version.encode("utf-8", "ignore"))
                if self._lib.Init(sm_version_ptr, config_file_ptr, platform_ptr, platform_version_ptr):
                    argv = getattr(sys, "argv", "<argv_not_found_in_sys_module>")
                    self.update_config()
                    self.log(
                        consts.BRAIN_LOG_LEVEL_INFO,
                        "Brain initialized, cmd: %s" % str(argv),
                    )
                else:
                    # no real point in logging here since we can't ship the log if brain failed to init.
                    self._lib = None
            self._initialized = True
            return self._lib is not None

    def update_config(self):
        # type: () -> bool
        if self.is_circuit_breaker_enabled():
            return False
        if self._lib is None or self._ffi is None:  # don't try to initialize
            self.log(consts.BRAIN_LOG_LEVEL_WARN, "update_config called before init_lib")
            return False

        try:
            alloc_size = 16 * 1024
            buffer = self._ffi.new("char[]", alloc_size)
            count_ptr = self._ffi.new("int*")
            count_ptr[0] = alloc_size
            max_attempts = 10

            while True:
                result = self._lib.GetActionConfigJson(count_ptr, buffer)
                if not result:
                    count = self._ffi.unpack(count_ptr, 1)[0]
                    _cffi_release(self._ffi, buffer)
                    if count <= alloc_size:
                        _cffi_release(self._ffi, count_ptr)
                        self.log(
                            consts.BRAIN_LOG_LEVEL_WARN,
                            "GetActionConfigJson returned false (count=%d alloc_size=%d)" % (count, alloc_size),
                        )
                        return False
                    max_attempts -= 1
                    if max_attempts <= 0:
                        _cffi_release(self._ffi, count_ptr)
                        self.log(
                            consts.BRAIN_LOG_LEVEL_WARN,
                            "GetActionConfigJson returned false (count=%d alloc_size=%d) after 10 retries"
                            % (count, alloc_size),
                        )
                        return False
                    alloc_size = count_ptr[0]
                    buffer = self._ffi.new("char[]", alloc_size)
                else:
                    break

            action_config = self._ffi.string(buffer)
            _cffi_release(self._ffi, buffer)
            _cffi_release(self._ffi, count_ptr)
            action_config_json = json.loads(action_config)
            config_holder._st_action_mapping = action_config_json
            return True
        except Exception as e:
            self.log(consts.BRAIN_LOG_LEVEL_ERROR, "Exception in update config: %s" % str(e))
        return False

    def shutdown(self, timeout_ms=0):
        # type: (int) -> bool
        if self._lib is None or self._ffi is None:  # don't try to initialize just to shut down
            return False
        lib = self._lib
        self._lib = None
        if self.is_circuit_breaker_enabled():
            return False
        return lib.Shutdown(timeout_ms)  # type: ignore

    def is_serverless(self):
        # type: () -> bool
        if not self._initialized:
            return self._serverless
        if self._lib is None or self._ffi is None:  # don't try to initialize
            return False
        if self.is_circuit_breaker_enabled():
            return False
        return self._lib.IsServerless()  # type: ignore

    def enable_circuit_breaker(self):
        # type: () -> None
        with self._lock:
            if self._initialized and self._lib is not None:
                self._lib.EnableCircuitBreaker()
            self._initialized = True
            self._lib = None
            self._ffi = None

    def is_circuit_breaker_enabled(self):
        # type: () -> bool
        if self._lib is None or self._ffi is None:  # don't try to initialize
            return False
        return self._lib.IsCircuitBreakerEnabled()  # type: ignore

    def is_integration_circuit_breaker_enabled(self, integration_module):
        # type: (str) -> bool
        if integration_module in self._integration_circuit_breakers:
            return True
        if self._lib is None or self._ffi is None:  # don't try to initialize
            return False
        integration_module_ptr = self._ffi.new("char[]", integration_module.encode("utf-8", "ignore"))
        return self._lib.IsIntegrationCircuitBreakerEnabled(integration_module_ptr)  # type: ignore

    def _get_action(self, action_ref):
        # type: (str) -> ActionDict
        action = {"Action": consts.ACTION_EXECUTE, "Definition": {}}  # type: ActionDict
        if action_ref != "0":
            if (
                getattr(config_holder, "_st_action_mapping", None) is not None
                and action_ref in config_holder._st_action_mapping
            ):
                action = config_holder._st_action_mapping[action_ref]
            else:
                # Getting new action mapping from brain and storing it in thread local
                update_success = self.update_config()
                if (
                    update_success
                    and getattr(config_holder, "_st_action_mapping", None) is not None
                    and action_ref in config_holder._st_action_mapping
                ):
                    action = config_holder._st_action_mapping[action_ref]
                else:
                    # If we got an unknown action and it was still unknown after getting a new mapping from brain it's a
                    # bug.
                    # It can happen if config was not updated before action and changed again before update_config -
                    # if that happened request action is lost in brain.
                    self.log(
                        consts.BRAIN_LOG_LEVEL_ERROR,
                        "config updated (%s)but still not recognizing action (%s). Defaulting to Execute."
                        % (config_holder._st_action_mapping, action_ref),
                    )
        return action

    def open_span(self, task_id, span_type, input):
        # type: (int, int, Dict[str, str]) -> Tuple[Optional[SpanID], Optional[ActionDict], Optional[PollKey]]
        if not self.init_lib():
            return None, None, None
        if self.is_circuit_breaker_enabled():
            return None, None, None
        try:
            # if self._lib.IsCircuitBreakerEnabled():
            #     return None, None, None
            arr_ptr, arr = _bytes_dict_to_char_pp(self._ffi, input)
            arr_len = len(arr)
            result = self._ffi.new("OpenSpanResult[]", 1)  # type: ignore
            success = self._lib.OpenSpan(task_id, span_type, arr_len, arr_ptr, result)  # type: ignore
        except:  # noqa: E722
            self._lib = None
            return None, None, None

        if success:
            action_ref = str(result[0].Action)
            # TODO: The part of the function that gets action from mapping is almost duplicated in poll_action
            # TODO: replace 0 with execute const
            action = self._get_action(action_ref)
            if action is None:
                return None, None, None
            # result must be kept by SpanID object, or else result is freed before we need it again
            return (
                SpanID(result[0].SpanID, result),
                action,
                PollKey(result[0].LieutenantKey),
            )
        else:
            self._lib = None
            return None, None, None

    def poll_action(self, poll_key, span_id):
        # type: (int, SpanID) -> Optional[ActionDict]
        if not self.init_lib():
            return None
        if self.is_circuit_breaker_enabled():
            return None
        try:
            result = self._ffi.new("OpenSpanResult[]", 1)  # type: ignore
            result[0].SpanID = span_id._ffi_array
            result[0].Action = consts.ACTION_EXECUTE
            result[0].LieutenantKey = poll_key
            action_res = self._lib.PollSpanAction(result)  # type: ignore
        except:  # noqa: E722
            self._lib = None
            return None

        action_ref = str(action_res)
        # TODO: replace 0 with execute const
        return self._get_action(action_ref)

    def close_span(self, span_id, data=None):
        # type: (SpanID, Optional[Dict[str, str]]) -> bool
        if not self.init_lib():
            return False
        if self.is_circuit_breaker_enabled():
            return False
        try:
            if data is not None:
                arr_ptr, arr = _bytes_dict_to_char_pp(self._ffi, data)
                arr_len = len(arr)
            else:
                arr_len = 0
                arr_ptr = self._ffi.NULL  # type: ignore
            return self._lib.CloseSpan(span_id._ffi_array, arr_len, arr_ptr)  # type: ignore
        except:  # noqa: E722
            self._lib = None
            return False

    def get_span_label(self, span_id, key):
        # type: (SpanID, str) -> Optional[str]
        if not self.init_lib():
            return None
        if self.is_circuit_breaker_enabled():
            return None

        if self._ffi is None or self._lib is None or span_id is None or key is None:
            # mostly for type checking, but also just to be safe
            return None

        try:
            alloc_size = 2048
            buffer = self._ffi.new("char[]", alloc_size)
            success = self._lib.GetSpanLabel(span_id._ffi_array, key.encode("utf-8", "ignore"), alloc_size, buffer)
            if not success:
                return None

            label = self._ffi.string(buffer)
            _cffi_release(self._ffi, buffer)
            return str(label.decode("utf-8", "ignore"))
        except:  # noqa: E722
            self._lib = None
            return None

    def create_task(self, parent_task_id, task_id, task_type):
        # type: (Optional[int], int, int) -> bool
        if not self.init_lib():
            return False
        if self.is_circuit_breaker_enabled():
            return False
        if parent_task_id is None:
            parent_task_id = 0
        try:
            return self._lib.CreateTask(parent_task_id, task_id, task_type)  # type: ignore
        except:  # noqa: E722
            self._lib = None
            return False

    def create_task_auto_inc(self, parent_task_id, task_type):
        # type: (Optional[int], int) -> int
        if not self.init_lib():
            return 0
        if self.is_circuit_breaker_enabled():
            return 0

        if parent_task_id is None:
            parent_task_id = 0
        try:
            return self._lib.CreateTaskAutoInc(parent_task_id, task_type)  # type: ignore
        except:  # noqa: E722
            self._lib = None
            return 0

    def task_done(self, task_id):
        # type: (int) -> bool
        if not self.init_lib():
            return False
        if self.is_circuit_breaker_enabled():
            return False

        try:
            return self._lib.FinishTask(task_id)  # type: ignore
        except:  # noqa: E722
            self._lib = None
            return False

    # TODO support fields when Go binding will support sending different types.
    def log(self, level, msg):
        # type: (int, str) -> None
        if self._lib is None or self._ffi is None:  # don't try to initialize
            return
        if self.is_circuit_breaker_enabled():
            return
        try:
            self._lib.Log(level, self._ffi.new("char[]", msg.encode("utf-8", "ignore")))
        except:  # noqa: E722
            self._lib = None

    def get_config(self):
        # type: () -> Optional[str]
        if self._lib is None or self._ffi is None:  # don't try to initialize
            return None
        if self.is_circuit_breaker_enabled():
            return None

        try:
            c_str = self._lib.GetConfig()
            config = self._ffi.string(c_str)  # type: str
            self._lib.FreeCString(c_str)
            return config
        except:  # noqa: E722
            self._lib = None
            return None

    def set_label_on_shared_data(self, task_id, key, value):
        # type: (int, str, str) -> bool
        if not self.init_lib():
            return False
        if self.is_circuit_breaker_enabled():
            return False

        if self._ffi is None:
            return False

        try:
            key_cstr = self._ffi.new("char[]", key.encode("utf-8", "ignore"))
            value_cstr = self._ffi.new("char[]", value.encode("utf-8", "ignore"))
            return self._lib.SetLabelOnSharedData(task_id, key_cstr, value_cstr)  # type: ignore
        except:  # noqa: E722
            self._lib = None
            return False

    def set_env(self, key, value):
        # type: (str, str) -> None
        if self._lib is None or self._ffi is None:
            self._env_vars.append((key, value))
        else:
            self._set_env(key, value)

    def _set_env(self, key, value):
        # type: (str, str) -> bool
        if self.is_circuit_breaker_enabled():
            return False
        try:
            key_cstr = self._ffi.new("char[]", key.encode("utf-8", "ignore"))  # type: ignore
            value_cstr = self._ffi.new("char[]", value.encode("utf-8", "ignore"))  # type: ignore
            self._lib.Setenv(key_cstr, value_cstr)  # type: ignore
            return True
        except:  # noqa: E722
            self._lib = None
            return False