Repository URL to install this package:
|
Version:
0.1.3 ▾
|
import dataclasses
import functools
import logging
from typing import Any, Callable, Dict, Mapping, Optional, Tuple, TypeVar
from django.db import models, transaction
from .utils import errors
logger = logging.getLogger("__name__")
# A sentinel object for telling the build process to remove a successfully run action
DELETE_KEY = object()
Process = TypeVar("Process", bound=models.Model)
Event = TypeVar("Event", bound=models.Model)
Context = TypeVar("Context")
SideEffect = Callable[[Process, Event, Context], None]
Condition = Callable[[Process], bool]
EventHandler = Callable[[Process, Event, Context], None]
@dataclasses.dataclass
class ContextEmpty:
pass
class BaseProcessError(Exception):
"""
Raised when an action incurs an error which is expected.
This should be subclassed by a process-specific module. This only
allows us to catch them in the registry.
"""
def create_process_builder(
handler_mapping: Dict[str, Callable],
*,
context_creator: Optional[Callable] = None,
get_status_from_context: Optional[Callable] = None,
error_handler: Optional[Callable] = None,
) -> functools.partial:
"""
Creates a build process function, responsible for processes events
for a given process.
:param handler_mapping:
Dict of event_type to the handle for that event.
:param context_creator: A callable that initiates the context.
:param get_status_from_context:
An optional callable that takes the context and returns the
process status to be set.
:param error_handler:
A function that runs when errors are thrown from event handlers.
Specifying this argument stops errors being thrown.
"""
return functools.partial(
_build_process,
handler_mapping,
context_creator=context_creator,
get_status_from_context=get_status_from_context,
error_handler=error_handler,
)
# Requirements:
# Build processes doesn't throw when failed, but returns the context with errors
# Only runs when building flows for comparison
def _build_process(
handler_mapping: Dict[str, Callable],
process: models.Model,
*,
context_creator: Optional[Callable] = None,
get_status_from_context: Optional[Callable] = None,
error_handler: Optional[Callable] = None,
run_actions: bool = True,
) -> object:
context: object = context_creator() if context_creator else ContextEmpty()
should_rebuild = apply_events(
process,
context,
handler_mapping,
run_actions=run_actions,
error_handler=error_handler,
)
# Set the status given which events have happned
if get_status_from_context:
process.status = get_status_from_context(context) # type: ignore[attr-defined]
process.save()
if should_rebuild:
# Rebuild the process if any side effects have been run
return _build_process(
handler_mapping,
process,
context_creator=context_creator,
get_status_from_context=get_status_from_context,
run_actions=run_actions,
error_handler=error_handler,
)
return context
def apply_events(
process: models.Model,
context: object,
handler_mapping: Dict,
run_actions: bool = True,
error_handler: Optional[Callable] = None,
) -> bool:
"""
For a given process, loop through all events and build the process
and context.
:returns:
True if any actions were completed, meaning the process should
rebuild again.
"""
should_rebuild = False
events = process.events.order_by("occurred_at", "id") # type: ignore[attr-defined]
for event in events:
if event.data is None:
# Some handlers rely on there being at least an empty dict which some early events
# don't have. It'll be easier to fill it in here than handle it in every handler.
event.data = {}
event.save()
# Apply any status updates first
handler = handler_mapping.get(event.event_type)
if not handler:
# There's nothing to run
continue
# Update the process and context with data from the event
try:
handler(process, event, context)
except Exception as error:
if error_handler:
error_handler(event, context, error)
else:
raise
is_already_running_side_effects = _is_running_side_effects.get(process, False)
# It is possible for a side-effect to invoke a command that triggers a build of the process.
# At that point, the side-effect would not yet have completed and so would still be in the
# list of side-effects for the event.
# That would cause the side-effect to be re-run, which would invoke the command again and
# trigger another build of the process, and so on infinitely until the stack explodes. To
# avoid this, do not run side-effects if they are already being run higher up the call
# stack.
if run_actions and not is_already_running_side_effects:
try:
_is_running_side_effects[process] = True
# Try and run any actions assigned to the event and save whether any ran
should_rebuild = should_rebuild or run_event_side_effects(
handler, process, event, context
)
finally:
del _is_running_side_effects[process]
return should_rebuild
def run_event_side_effects(
handler: EventHandler, process: models.Model, event: Any, context: object
) -> bool:
"""
For a given event, run any pending actions.
We look for a 'side_effects' key in the event.data JSON field and use the registry build in
the data layer to lookup the functions.
Returns:
True if any side effects have been run else False
"""
if event.side_effects:
for side_effect_name, _ in event.side_effects.items():
registered_side_effects: Mapping[str, Tuple[SideEffect, Condition]] = getattr(
handler, "side_effects", None
) # type: ignore[assignment]
# Get the side effect, conditions pair
side_effect, _ = registered_side_effects.get(side_effect_name, [None, None])
if side_effect:
try:
with transaction.atomic():
side_effect(process, event, context)
# If the side effect has completed successfully, we'll delete the key
# We can't change the size of the dict during iteration, so we'll tag it and
# delete afterwards
event.side_effects[side_effect_name] = DELETE_KEY
except BaseProcessError as e:
# These are errors in the process which we're expecting to happen.
event.side_effects[side_effect_name] = errors.format_exc(
exception=e, prefer_bare_message=True
)
except Exception as e:
logger.exception("Unable to complete side effect in %s", process)
event.side_effects[side_effect_name] = errors.format_exc(
exception=e, prefer_bare_message=True
)
# Delete the successfully run actions
keys_to_delete = [
key for key in event.side_effects if event.side_effects[key] == DELETE_KEY
]
for key in keys_to_delete:
del event.side_effects[key]
if not event.side_effects:
del event.side_effects
event.save()
# Finally we'll signal to the process if any side effects have been run.
if keys_to_delete:
return True
return False
_is_running_side_effects: Dict[models.Model, bool] = {}
"""
Records whether events are currently being applied to a process.
"""