Repository URL to install this package:
|
Version:
0.8.1 ▾
|
import asyncio
try:
from typing import Optional, Tuple
from supertenant.superbrain import ActionDefinition, ActionDict, PollKey, SpanID
except ImportError:
pass
from supertenant import consts
from supertenant.supermeter.scope_manager import scope_manager
from .consts import (
ACTION_DEF_KEY,
ACTION_KEY,
DAY_IN_NANOSECONDS,
DELAY_DURATION_KEY,
NANOSECONDS_IN_SECOND,
POLL_INTERVAL_KEY,
)
class AsyncActions(object):
@classmethod
async def get_action(cls, span_id, action, poll_key):
# type: (Optional[SpanID], Optional[ActionDict], Optional[PollKey]) -> Tuple[Optional[int], Optional[ActionDefinition]] # noqa: E501
curr_action = None # type: Optional[int]
curr_def = None # type: Optional[ActionDefinition]
poll_interval = None
delay_duration = None
if action is not None:
curr_action = action.get(ACTION_KEY) # type: ignore
curr_def = action.get(ACTION_DEF_KEY) # type: ignore
if curr_def is not None and type(curr_def) == dict:
try:
poll_interval = curr_def.get(POLL_INTERVAL_KEY)
except Exception:
pass
try:
delay_duration = curr_def.get(DELAY_DURATION_KEY)
except Exception:
pass
if span_id is not None and curr_action == consts.ACTION_DELAY and delay_duration is not None:
if delay_duration > DAY_IN_NANOSECONDS or delay_duration < 0:
# Too long delay or negative, ignore action
# TODO: Log
return consts.ACTION_EXECUTE, None
await asyncio.sleep(delay_duration / NANOSECONDS_IN_SECOND)
curr_action = consts.ACTION_EXECUTE
elif (
span_id is not None
and curr_action == consts.ACTION_WAIT
and poll_key is not None
and poll_interval is not None
):
n = 0
while curr_action == consts.ACTION_WAIT and n < 100000: # TODO: should be configurable
await asyncio.sleep(poll_interval / NANOSECONDS_IN_SECOND)
action = scope_manager.poll_action(span_id, poll_key)
if action is not None:
curr_action = action.get(ACTION_KEY) # type: ignore
curr_def = action.get(ACTION_DEF_KEY) # type: ignore
else:
curr_action = consts.ACTION_EXECUTE
n = n + 1
if curr_action == consts.ACTION_WAIT:
curr_action = consts.ACTION_REJECT
return curr_action, curr_def