Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
event-sourcing / src / event_sourcing / registry.py
Size: Mime:
import dataclasses
import functools
import logging
from typing import Any, Callable, Dict, Mapping, Optional, Tuple, TypeVar

from django.db import models, transaction

from .utils import errors


logger = logging.getLogger("__name__")

# A sentinel object for telling the build process to remove a successfully run action
DELETE_KEY = object()


Process = TypeVar("Process", bound=models.Model)
Event = TypeVar("Event", bound=models.Model)
Context = TypeVar("Context")
SideEffect = Callable[[Process, Event, Context], None]
Condition = Callable[[Process], bool]
EventHandler = Callable[[Process, Event, Context], None]


@dataclasses.dataclass
class ContextEmpty:
    pass


class BaseProcessError(Exception):
    """
    Raised when an action incurs an error which is expected.

    This should be subclassed by a process-specific module. This only
    allows us to catch them in the registry.
    """


def create_process_builder(
    handler_mapping: Dict[str, Callable],
    *,
    context_creator: Optional[Callable] = None,
    get_status_from_context: Optional[Callable] = None,
    error_handler: Optional[Callable] = None,
) -> functools.partial:
    """
    Creates a build process function, responsible for processes events
    for a given process.

    :param handler_mapping:
        Dict of event_type to the handle for that event.

    :param context_creator: A callable that initiates the context.

    :param get_status_from_context:
        An optional callable that takes the context and returns the
        process status to be set.

    :param error_handler:
        A function that runs when errors are thrown from event handlers.
        Specifying this argument stops errors being thrown.
    """
    return functools.partial(
        _build_process,
        handler_mapping,
        context_creator=context_creator,
        get_status_from_context=get_status_from_context,
        error_handler=error_handler,
    )


# Requirements:
# Build processes doesn't throw when failed, but returns the context with errors
# Only runs when building flows for comparison


def _build_process(
    handler_mapping: Dict[str, Callable],
    process: models.Model,
    *,
    context_creator: Optional[Callable] = None,
    get_status_from_context: Optional[Callable] = None,
    error_handler: Optional[Callable] = None,
    run_actions: bool = True,
) -> object:
    context: object = context_creator() if context_creator else ContextEmpty()

    should_rebuild = apply_events(
        process,
        context,
        handler_mapping,
        run_actions=run_actions,
        error_handler=error_handler,
    )

    # Set the status given which events have happned
    if get_status_from_context:
        process.status = get_status_from_context(context)  # type: ignore[attr-defined]
    process.save()

    if should_rebuild:
        # Rebuild the process if any side effects have been run
        return _build_process(
            handler_mapping,
            process,
            context_creator=context_creator,
            get_status_from_context=get_status_from_context,
            run_actions=run_actions,
            error_handler=error_handler,
        )

    return context


def apply_events(
    process: models.Model,
    context: object,
    handler_mapping: Dict,
    run_actions: bool = True,
    error_handler: Optional[Callable] = None,
) -> bool:
    """
    For a given process, loop through all events and build the process
    and context.

    :returns:
        True if any actions were completed, meaning the process should
        rebuild again.
    """
    should_rebuild = False

    events = process.events.order_by("occurred_at", "id")  # type: ignore[attr-defined]

    for event in events:
        if event.data is None:
            # Some handlers rely on there being at least an empty dict which some early events
            # don't have. It'll be easier to fill it in here than handle it in every handler.
            event.data = {}
            event.save()

        # Apply any status updates first
        handler = handler_mapping.get(event.event_type)
        if not handler:
            # There's nothing to run
            continue

        # Update the process and context with data from the event
        try:
            handler(process, event, context)
        except Exception as error:
            if error_handler:
                error_handler(event, context, error)
            else:
                raise

        is_already_running_side_effects = _is_running_side_effects.get(process, False)

        # It is possible for a side-effect to invoke a command that triggers a build of the process.
        # At that point, the side-effect would not yet have completed and so would still be in the
        # list of side-effects for the event.
        # That would cause the side-effect to be re-run, which would invoke the command again and
        # trigger another build of the process, and so on infinitely until the stack explodes. To
        # avoid this, do not run side-effects if they are already being run higher up the call
        # stack.
        if run_actions and not is_already_running_side_effects:
            try:
                _is_running_side_effects[process] = True
                # Try and run any actions assigned to the event and save whether any ran
                should_rebuild = should_rebuild or run_event_side_effects(
                    handler, process, event, context
                )
            finally:
                del _is_running_side_effects[process]

    return should_rebuild


def run_event_side_effects(
    handler: EventHandler, process: models.Model, event: Any, context: object
) -> bool:
    """
    For a given event, run any pending actions.

    We look for a 'side_effects' key in the event.data JSON field and use the registry build in
    the data layer to lookup the functions.

    Returns:
        True if any side effects have been run else False
    """
    if event.side_effects:
        for side_effect_name, _ in event.side_effects.items():
            registered_side_effects: Mapping[str, Tuple[SideEffect, Condition]] = getattr(
                handler, "side_effects", None
            )  # type: ignore[assignment]
            # Get the side effect, conditions pair
            side_effect, _ = registered_side_effects.get(side_effect_name, [None, None])

            if side_effect:
                try:
                    with transaction.atomic():
                        side_effect(process, event, context)
                    # If the side effect has completed successfully, we'll delete the key
                    # We can't change the size of the dict during iteration, so we'll tag it and
                    # delete afterwards
                    event.side_effects[side_effect_name] = DELETE_KEY
                except BaseProcessError as e:
                    # These are errors in the process which we're expecting to happen.
                    event.side_effects[side_effect_name] = errors.format_exc(
                        exception=e, prefer_bare_message=True
                    )
                except Exception as e:
                    logger.exception("Unable to complete side effect in %s", process)
                    event.side_effects[side_effect_name] = errors.format_exc(
                        exception=e, prefer_bare_message=True
                    )

                # Delete the successfully run actions
                keys_to_delete = [
                    key for key in event.side_effects if event.side_effects[key] == DELETE_KEY
                ]
                for key in keys_to_delete:
                    del event.side_effects[key]
                if not event.side_effects:
                    del event.side_effects
                event.save()

                # Finally we'll signal to the process if any side effects have been run.
                if keys_to_delete:
                    return True

    return False


_is_running_side_effects: Dict[models.Model, bool] = {}
"""
Records whether events are currently being applied to a process.
"""