Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
supermeter / supermeter / __init__.py
Size: Mime:
import sys
import traceback
from os import environ

try:
    from supertenant.supermeter._version import __version__  # type: ignore
except ImportError:
    __version__ = "develop"

try:
    from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

    if TYPE_CHECKING:  # we only do this when running mypy
        from supertenant.superbrain import Brain
        from supertenant.supermeter.brainless_brain import BrainlessBrain
except ImportError:
    pass

ENV_KEY_EAGER_LOAD = "SUPERTENANT_SUPERMETER_EAGER_LOAD"
_brain = None  # type: Optional[Union[Brain, BrainlessBrain]]


def _get_brain():
    # type: () -> Optional[Union[Brain, BrainlessBrain]]
    global _brain
    return _brain


def _set_brain(b):
    # type: (Union[Brain, BrainlessBrain]) -> None
    global _brain
    _brain = b


def log_fatal(msg):
    # type: (str) -> None
    try:
        # CU-86bwvkkbd we can't use python logging here because we may implicitly create a new logging handler
        # and then the user's code add their own handler which will result in duplicate logs.
        sys.stderr.write("[supertenant-supermeter] FATAL: %s\n" % (msg,))  # py2.7: f-string
    except Exception:
        pass


def init(config_file_path=None, eager_load=False):
    # type: (Optional[str], bool) -> bool
    global _brain
    try:
        if not environ.get("SUPERTENANT_BRAINLESS", "f").lower() in ("t", "1", "true", "y", "yes"):
            from supertenant.superbrain import Brain

            _brain = Brain(__version__, config_file_path)
        else:
            from supertenant.supermeter.brainless_brain import BrainlessBrain

            _brain = BrainlessBrain()

        if (
            eager_load
            or environ.get(ENV_KEY_EAGER_LOAD, "f").lower() in ("t", "1", "true", "y", "yes")
            or _brain.is_serverless()
        ):
            return _brain.init_lib()
        else:
            return True
    except Exception as e:
        log_fatal("error when initializing brain: %r" % (e,))  # py2.7: f-string
        traceback.print_exc()
        return False


_instrument_called = False


def instrument():
    # type: () -> bool
    global _instrument_called
    if _instrument_called:
        return False
    _instrument_called = True
    brain = _get_brain()
    if brain is None:
        raise AssertionError("instrument() called before calling init() and/or checking init() returned True")

    try:
        # each import instruments the relevant package, so we import for the side effect.
        import supertenant.supermeter.task_manager  # noqa: F401
        from supertenant import consts
        from supertenant.supermeter.logger import log_integration_module_skipped_circuit_breaker

        instrumented_modules_defs = [
            # (module_const, module_name)
            (consts.INTEGRATION_MODULE_PYTHON_FLASK, "flask"),
            (consts.INTEGRATION_MODULE_PYTHON_CELERY, "celery.hooks"),
            (consts.INTEGRATION_MODULE_PYTHON_PSYCOPG2, "psycopg2.psycopg2_inst"),
            (consts.INTEGRATION_MODULE_PYTHON_PYMONGO, "pymongo.pymongo_inst"),
            # CU-860t33gba removed boto integration for now, until we make sure it supports
            # boto3 1.17+ and python 2.7.
            # (consts.INTEGRATION_MODULE_PYTHON_BOTO3, "boto3.boto3_inst"),
        ]
        if sys.version_info[0:3] >= (3, 5, 3):
            if sys.version_info[0:3] >= (3, 6, 0):
                # Django 3.2+ require Python 3.6+
                instrumented_modules_defs.append((consts.INTEGRATION_MODULE_PYTHON_DJANGO, "django.middleware_py3"))
            instrumented_modules_defs.append((consts.INTEGRATION_MODULE_PYTHON_FASTAPI, "fastapi.fastapi_inst_py3"))
        if sys.version_info[0:3] >= (3, 7, 0):
            instrumented_modules_defs.append((consts.INTEGRATION_MODULE_PYTHON_AIOHTTP, "aiohttp.client_py3"))
            instrumented_modules_defs.append((consts.INTEGRATION_MODULE_PYTHON_AIOHTTP, "aiohttp.server_py3"))
            instrumented_modules_defs.append((consts.INTEGRATION_MODULE_PYTHON_PYMYSQL, "pymysql.pymysql_inst_py3"))
        if brain.is_serverless():
            instrumented_modules_defs.append((consts.INTEGRATION_MODULE_PYTHON_PROCESSTASK, "processtask"))

        # This is a test hook to make sure we fail gracefully.
        if environ.get("SUPERTENANT_SUPERMETER_FAULT_HOOK", "f").lower() in ("t", "1", "true", "y", "yes"):
            instrumented_modules_defs.append(("fault_hook", "non_existent_hook"))

        for module_const, module_name in instrumented_modules_defs:
            if not brain.is_integration_circuit_breaker_enabled(module_const):
                try:
                    __import__("supertenant.supermeter.managers." + module_name)
                except Exception as e:
                    log_fatal(
                        "failed to import a hook %s: %r"
                        % (
                            module_name,
                            e,
                        )
                    )
                    traceback.print_exc()
            else:
                log_integration_module_skipped_circuit_breaker(module_const)

    except Exception as e:
        log_fatal("instrumentation failed: %r" % (e,))  # py2.7: f-string
        traceback.print_exc()
        return False
    return True


_init_and_instrument_called = False


def init_and_instrument(config_file_path=None, eager_load=False):
    # type: (Optional[str], bool) -> bool
    global _init_and_instrument_called
    if _init_and_instrument_called:
        return False
    _init_and_instrument_called = True
    if not init(config_file_path, eager_load):
        log_fatal("initialization failed, skipping instrumentation")
        return False
    return instrument()


# AUTOWRAPT hook:
_load_called = False


def _load(*args, **kwargs):
    # type: (List[Any], Dict[str, Any]) -> None
    global _load_called
    if _load_called:
        return
    _load_called = True
    try:
        if not init():
            log_fatal("autoload initialization failed, skipping instrumentation")
            return
        instrument()
    except Exception as e:
        log_fatal("error when autoloading: %r" % (e,))
        traceback.print_exc()