Repository URL to install this package:
|
Version:
0.8.1 ▾
|
try:
from typing import Any, Dict, List, Optional, Tuple, Union, cast # noqa: F401 (flake8 complains)
# TODO: Fix types in action definition in proto/brain to avoid Any type here
ActionDefinition = Dict[str, Any]
ActionDict = Dict[str, Union[ActionDefinition, int]]
except ImportError:
Any = None
def cast(_, x): # type: ignore
return x
import json
import sys
import threading
from os import environ
from supertenant import consts
ENV_KEY_INTEGRATION_CIRCUIT_BREAKERS = "SUPERTENANT_SUPERMETER_INTEGRATION_CIRCUIT_BREAKERS"
ENV_KEY_SERVERLESS = "SUPERTENANT_SUPERBRAIN_SERVERLESS" # same env key used in superbrain
class SpanID:
def __init__(self, ffi_array, action=None):
# type: (List[Any], Optional[Any]) -> None
self._ffi_array = ffi_array
self._action = action
def __repr__(self):
# type: () -> str
return repr(self._ffi_array)
# py2.7: use enum.Enum with py3.4+
Action = int
PollKey = int
def _bytes_dict_to_char_pp(ffi, input):
# type: (Any, Dict[str, str]) -> Tuple[Any, List[Any]]
input_arr = []
for k, v in input.items():
input_arr.append(ffi.new("char[]", str(k).encode("utf-8", "ignore")))
input_arr.append(ffi.new("char[]", str(v).encode("utf-8", "ignore")))
# we must return input_arr otherwise cPython may free it
return ffi.new("char *[]", input_arr), input_arr
# Using threading.local to keep action mapping to avoid locking on change
config_holder = threading.local()
# CU-86bwvf72a pypy 2.7 5.10 comes w/ cffi 1.11 that doesn't have a release() function.
def _cffi_release(cffi, ptr):
# type: (Any, Any) -> None
if hasattr(cffi, "release"):
cffi.release(ptr)
class Brain:
def __init__(self, supermeter_version, config_file_path=None):
# type: (str, Optional[str]) -> None
self.supermeter_version = supermeter_version
self.config_file_path = config_file_path
self._lib = None # type: Optional[Any]
self._ffi = None # type: Optional[Any]
self._initialized = False
self._env_vars = [] # type: List[Tuple[str, str]]
self._integration_circuit_breakers = set(
environ.get(ENV_KEY_INTEGRATION_CIRCUIT_BREAKERS, "").lower().split(",")
)
self._serverless = environ.get(ENV_KEY_SERVERLESS, "").lower() in (
"y",
"yes",
"true",
"t",
"1",
)
self._lock = threading.Lock()
def init_lib(self):
# type: () -> bool
"""
initialization could happen from 2 requests at the same time and therefore should be thread-safe.
the locking mechanism ensures the initialization only takes places once
the _initialized is used to avoid the locking mechanism once the initialization is completed.
"""
if self._initialized:
return self._lib is not None
with self._lock:
if self._initialized:
# previous thread already handled the init, avoid running twice
return self._lib is not None
self._lib = None
self._ffi = None
try:
from _superbrain import ffi, lib # type: ignore
self._lib = lib
self._ffi = cast(Any, ffi)
except ImportError:
import traceback
sys.stderr.write("[supertenant-supermeter] fatal error: Brain.init_lib failed on ImportError:\n")
traceback.print_exc()
else:
# CU-860t13kwj - set all env vars set in Python
# py2.7 (need to do if/else in separate lines because of mypy3 vs. mypy2.7 checks)
if sys.version_info >= (3, 0):
iteritems = environ.items
else:
iteritems = environ.iteritems
for key, value in iteritems():
if key.startswith("SUPERTENANT_") or key.startswith("SUPERBRAIN_"):
self._set_env(key, value)
# Now set the explicit env vars asked by code
for key, value in self._env_vars:
self._set_env(key, value)
platform = "python"
platform_version = sys.version
sm_version_ptr = self._ffi.new("char[]", self.supermeter_version.encode("utf-8", "ignore"))
if self.config_file_path:
config_file_ptr = self._ffi.new("char[]", self.config_file_path.encode("utf-8", "ignore"))
else:
config_file_ptr = self._ffi.NULL
platform_ptr = self._ffi.new("char[]", platform.encode("utf-8", "ignore"))
platform_version_ptr = self._ffi.new("char[]", platform_version.encode("utf-8", "ignore"))
if self._lib.Init(sm_version_ptr, config_file_ptr, platform_ptr, platform_version_ptr):
argv = getattr(sys, "argv", "<argv_not_found_in_sys_module>")
self.update_config()
self.log(
consts.BRAIN_LOG_LEVEL_INFO,
"Brain initialized, cmd: %s" % str(argv),
)
else:
# no real point in logging here since we can't ship the log if brain failed to init.
self._lib = None
self._initialized = True
return self._lib is not None
def update_config(self):
# type: () -> bool
if self.is_circuit_breaker_enabled():
return False
if self._lib is None or self._ffi is None: # don't try to initialize
self.log(consts.BRAIN_LOG_LEVEL_WARN, "update_config called before init_lib")
return False
try:
alloc_size = 16 * 1024
buffer = self._ffi.new("char[]", alloc_size)
count_ptr = self._ffi.new("int*")
count_ptr[0] = alloc_size
max_attempts = 10
while True:
result = self._lib.GetActionConfigJson(count_ptr, buffer)
if not result:
count = self._ffi.unpack(count_ptr, 1)[0]
_cffi_release(self._ffi, buffer)
if count <= alloc_size:
_cffi_release(self._ffi, count_ptr)
self.log(
consts.BRAIN_LOG_LEVEL_WARN,
"GetActionConfigJson returned false (count=%d alloc_size=%d)" % (count, alloc_size),
)
return False
max_attempts -= 1
if max_attempts <= 0:
_cffi_release(self._ffi, count_ptr)
self.log(
consts.BRAIN_LOG_LEVEL_WARN,
"GetActionConfigJson returned false (count=%d alloc_size=%d) after 10 retries"
% (count, alloc_size),
)
return False
alloc_size = count_ptr[0]
buffer = self._ffi.new("char[]", alloc_size)
else:
break
action_config = self._ffi.string(buffer)
_cffi_release(self._ffi, buffer)
_cffi_release(self._ffi, count_ptr)
action_config_json = json.loads(action_config)
config_holder._st_action_mapping = action_config_json
return True
except Exception as e:
self.log(consts.BRAIN_LOG_LEVEL_ERROR, "Exception in update config: %s" % str(e))
return False
def shutdown(self, timeout_ms=0):
# type: (int) -> bool
if self._lib is None or self._ffi is None: # don't try to initialize just to shut down
return False
lib = self._lib
self._lib = None
if self.is_circuit_breaker_enabled():
return False
return lib.Shutdown(timeout_ms) # type: ignore
def is_serverless(self):
# type: () -> bool
if not self._initialized:
return self._serverless
if self._lib is None or self._ffi is None: # don't try to initialize
return False
if self.is_circuit_breaker_enabled():
return False
return self._lib.IsServerless() # type: ignore
def enable_circuit_breaker(self):
# type: () -> None
with self._lock:
if self._initialized and self._lib is not None:
self._lib.EnableCircuitBreaker()
self._initialized = True
self._lib = None
self._ffi = None
def is_circuit_breaker_enabled(self):
# type: () -> bool
if self._lib is None or self._ffi is None: # don't try to initialize
return False
return self._lib.IsCircuitBreakerEnabled() # type: ignore
def is_integration_circuit_breaker_enabled(self, integration_module):
# type: (str) -> bool
if integration_module in self._integration_circuit_breakers:
return True
if self._lib is None or self._ffi is None: # don't try to initialize
return False
integration_module_ptr = self._ffi.new("char[]", integration_module.encode("utf-8", "ignore"))
return self._lib.IsIntegrationCircuitBreakerEnabled(integration_module_ptr) # type: ignore
def _get_action(self, action_ref):
# type: (str) -> ActionDict
action = {"Action": consts.ACTION_EXECUTE, "Definition": {}} # type: ActionDict
if action_ref != "0":
if (
getattr(config_holder, "_st_action_mapping", None) is not None
and action_ref in config_holder._st_action_mapping
):
action = config_holder._st_action_mapping[action_ref]
else:
# Getting new action mapping from brain and storing it in thread local
update_success = self.update_config()
if (
update_success
and getattr(config_holder, "_st_action_mapping", None) is not None
and action_ref in config_holder._st_action_mapping
):
action = config_holder._st_action_mapping[action_ref]
else:
# If we got an unknown action and it was still unknown after getting a new mapping from brain it's a
# bug.
# It can happen if config was not updated before action and changed again before update_config -
# if that happened request action is lost in brain.
self.log(
consts.BRAIN_LOG_LEVEL_ERROR,
"config updated (%s)but still not recognizing action (%s). Defaulting to Execute."
% (config_holder._st_action_mapping, action_ref),
)
return action
def open_span(self, task_id, span_type, input):
# type: (int, int, Dict[str, str]) -> Tuple[Optional[SpanID], Optional[ActionDict], Optional[PollKey]]
if not self.init_lib():
return None, None, None
if self.is_circuit_breaker_enabled():
return None, None, None
try:
# if self._lib.IsCircuitBreakerEnabled():
# return None, None, None
arr_ptr, arr = _bytes_dict_to_char_pp(self._ffi, input)
arr_len = len(arr)
result = self._ffi.new("OpenSpanResult[]", 1) # type: ignore
success = self._lib.OpenSpan(task_id, span_type, arr_len, arr_ptr, result) # type: ignore
except: # noqa: E722
self._lib = None
return None, None, None
if success:
action_ref = str(result[0].Action)
# TODO: The part of the function that gets action from mapping is almost duplicated in poll_action
# TODO: replace 0 with execute const
action = self._get_action(action_ref)
if action is None:
return None, None, None
# result must be kept by SpanID object, or else result is freed before we need it again
return (
SpanID(result[0].SpanID, result),
action,
PollKey(result[0].LieutenantKey),
)
else:
self._lib = None
return None, None, None
def poll_action(self, poll_key, span_id):
# type: (int, SpanID) -> Optional[ActionDict]
if not self.init_lib():
return None
if self.is_circuit_breaker_enabled():
return None
try:
result = self._ffi.new("OpenSpanResult[]", 1) # type: ignore
result[0].SpanID = span_id._ffi_array
result[0].Action = consts.ACTION_EXECUTE
result[0].LieutenantKey = poll_key
action_res = self._lib.PollSpanAction(result) # type: ignore
except: # noqa: E722
self._lib = None
return None
action_ref = str(action_res)
# TODO: replace 0 with execute const
return self._get_action(action_ref)
def close_span(self, span_id, data=None):
# type: (SpanID, Optional[Dict[str, str]]) -> bool
if not self.init_lib():
return False
if self.is_circuit_breaker_enabled():
return False
try:
if data is not None:
arr_ptr, arr = _bytes_dict_to_char_pp(self._ffi, data)
arr_len = len(arr)
else:
arr_len = 0
arr_ptr = self._ffi.NULL # type: ignore
return self._lib.CloseSpan(span_id._ffi_array, arr_len, arr_ptr) # type: ignore
except: # noqa: E722
self._lib = None
return False
def get_span_label(self, span_id, key):
# type: (SpanID, str) -> Optional[str]
if not self.init_lib():
return None
if self.is_circuit_breaker_enabled():
return None
if self._ffi is None or self._lib is None or span_id is None or key is None:
# mostly for type checking, but also just to be safe
return None
try:
alloc_size = 2048
buffer = self._ffi.new("char[]", alloc_size)
success = self._lib.GetSpanLabel(span_id._ffi_array, key.encode("utf-8", "ignore"), alloc_size, buffer)
if not success:
return None
label = self._ffi.string(buffer)
_cffi_release(self._ffi, buffer)
return str(label.decode("utf-8", "ignore"))
except: # noqa: E722
self._lib = None
return None
def create_task(self, parent_task_id, task_id, task_type):
# type: (Optional[int], int, int) -> bool
if not self.init_lib():
return False
if self.is_circuit_breaker_enabled():
return False
if parent_task_id is None:
parent_task_id = 0
try:
return self._lib.CreateTask(parent_task_id, task_id, task_type) # type: ignore
except: # noqa: E722
self._lib = None
return False
def create_task_auto_inc(self, parent_task_id, task_type):
# type: (Optional[int], int) -> int
if not self.init_lib():
return 0
if self.is_circuit_breaker_enabled():
return 0
if parent_task_id is None:
parent_task_id = 0
try:
return self._lib.CreateTaskAutoInc(parent_task_id, task_type) # type: ignore
except: # noqa: E722
self._lib = None
return 0
def task_done(self, task_id):
# type: (int) -> bool
if not self.init_lib():
return False
if self.is_circuit_breaker_enabled():
return False
try:
return self._lib.FinishTask(task_id) # type: ignore
except: # noqa: E722
self._lib = None
return False
# TODO support fields when Go binding will support sending different types.
def log(self, level, msg):
# type: (int, str) -> None
if self._lib is None or self._ffi is None: # don't try to initialize
return
if self.is_circuit_breaker_enabled():
return
try:
self._lib.Log(level, self._ffi.new("char[]", msg.encode("utf-8", "ignore")))
except: # noqa: E722
self._lib = None
def get_config(self):
# type: () -> Optional[str]
if self._lib is None or self._ffi is None: # don't try to initialize
return None
if self.is_circuit_breaker_enabled():
return None
try:
c_str = self._lib.GetConfig()
config = self._ffi.string(c_str) # type: str
self._lib.FreeCString(c_str)
return config
except: # noqa: E722
self._lib = None
return None
def set_label_on_shared_data(self, task_id, key, value):
# type: (int, str, str) -> bool
if not self.init_lib():
return False
if self.is_circuit_breaker_enabled():
return False
if self._ffi is None:
return False
try:
key_cstr = self._ffi.new("char[]", key.encode("utf-8", "ignore"))
value_cstr = self._ffi.new("char[]", value.encode("utf-8", "ignore"))
return self._lib.SetLabelOnSharedData(task_id, key_cstr, value_cstr) # type: ignore
except: # noqa: E722
self._lib = None
return False
def set_env(self, key, value):
# type: (str, str) -> None
if self._lib is None or self._ffi is None:
self._env_vars.append((key, value))
else:
self._set_env(key, value)
def _set_env(self, key, value):
# type: (str, str) -> bool
if self.is_circuit_breaker_enabled():
return False
try:
key_cstr = self._ffi.new("char[]", key.encode("utf-8", "ignore")) # type: ignore
value_cstr = self._ffi.new("char[]", value.encode("utf-8", "ignore")) # type: ignore
self._lib.Setenv(key_cstr, value_cstr) # type: ignore
return True
except: # noqa: E722
self._lib = None
return False