Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

edgify / rook   python

Repository URL to install this package:

Version: 0.1.176 

/ augs / aug.py

"""This module implements the Aug class."""
import uuid

from rook.logger import logger
from rook.processor.error import Error
from rook.processor.namespaces.container_namespace import ContainerNamespace
from rook.processor.namespaces.stack_namespace import StackNamespace
from rook.processor.namespaces.python_utils_namespace import PythonUtilsNamespace
from rook.processor.namespaces.trace_namespace import TraceNamespace
from rook.processor.namespaces.process_state_namespace import ProcessStateNamespace
from rook.user_warnings import UserWarnings

from rook.exceptions import RookRuleMaxExecutionTimeReached, RookRuleRateLimited, RookGlobalRuleRateLimited
from rook.augs.aug_rate_limiter import AugRateLimiter
from rook.config import RateLimiter
import rook.utils as utils


class Aug(object):
    """The Aug class is the skeleton that holds together all the components that define a modification to the application.

    This class brings together the following elements:
    - location - specifies when to run the modification.
    - extractor - specifies attributes to extract from the application's state before evaluating the modification.
    - condition - specifies an optional filter as to when to run the modification.
    - action - specifies the modification to preform.
    """

    def __init__(self, aug_id, extractor, condition, action, global_rate_limit=AugRateLimiter(None, None),
                 max_aug_execution_time_ns=0, rate_limits_ns=None):
        """Build an Aug object from the individual elements."""
        self.aug_id = aug_id
        self._extractor = extractor
        self._condition = condition
        self._action = action
        self._max_aug_time = max_aug_execution_time_ns
        self._enabled = True
        self._global_rate_limiter = global_rate_limit

        if rate_limits_ns:
            self._rate_limiter = AugRateLimiter(rate_limits_ns[0], rate_limits_ns[1])
        else:
            self._rate_limiter = AugRateLimiter(None, None)

    def execute(self, frame_namespace, extracted, output):
        """Called by the trigger service to run the extractor, condition and action."""
        if not self._enabled:
            return

        if output.is_user_messages_queue_full():
            logger.warn("Skipping aug ({}) execution because the queue is full".format(self.aug_id))
            return

        now_ns = utils.get_most_accurate_time_stamp_nano_seconds()
        rate_limits_keys = self._allow(now_ns)

        if Aug._enforce_rate_limits(rate_limits_keys):
            return

        try:
            if self._extractor:
                self._extractor.execute(frame_namespace, extracted)

            store = ContainerNamespace({})

            namespace = ContainerNamespace({
                'frame': frame_namespace,
                'stack': StackNamespace(frame_namespace),
                'extracted': ContainerNamespace(extracted),
                'store': store,
                'temp': ContainerNamespace({}),
                'python': PythonUtilsNamespace(),
                'utils': PythonUtilsNamespace(),
                'trace': TraceNamespace(),
                'state': ProcessStateNamespace(),
            })

            if not self._condition or self._condition.evaluate(namespace, extracted):
                msg_id = uuid.uuid4().hex

                logger.info("Executing aug-\t%s (msg ID %s)", self.aug_id, msg_id)
                self._action.execute(self.aug_id, msg_id, namespace, output)
        finally:
            duration = max(utils.get_most_accurate_time_stamp_nano_seconds() - now_ns, RateLimiter.MIN_RATE_LIMIT_VALUE_NS)

            if 0 < self._max_aug_time < duration:
                UserWarnings.set_error(Error(exc=RookRuleMaxExecutionTimeReached()))
                self._enabled = False

            self._record_rate_limits(rate_limits_keys, duration)

    def _allow(self, now_ns):
        return self._rate_limiter.allow(now_ns), self._global_rate_limiter.allow(now_ns)

    @staticmethod
    def _enforce_rate_limits(rate_limits_keys):
        if rate_limits_keys[0] is None:
            UserWarnings.send_warning(Error(exc=RookRuleRateLimited()))
            return True

        if rate_limits_keys[1] is None:
            UserWarnings.send_warning(Error(exc=RookGlobalRuleRateLimited()))
            return True

        return False

    def _record_rate_limits(self, rate_limits_keys, duration):
        self._rate_limiter.record(rate_limits_keys[0], duration)
        self._global_rate_limiter.record(rate_limits_keys[1], duration)