Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
from __future__ import annotations

import datetime as dt
import enum
import typing
from typing import Callable, Dict, Generic, Iterable, List, Optional, Tuple, TypeVar, cast

from django.db import models

from .utils import item, localtime


SIDE_EFFECTS_KEY = "side_effects"

DEFAULT_EVENTS_RELATED_NAME = "events"
DEFAULT_EVENT_TYPE_NAME = "event_type"

EventType = TypeVar("EventType", bound=enum.Enum)
EventHandlerCallable = Callable[["BaseProcess", "BaseProcessEvent", object], None]


class CallableEvent(typing.Protocol):
    def __call__(
        self,
        event_type: enum.Enum,
        data: dict | None = None,
        occurred_at: dt.datetime | None = None,
        **kwargs: object,
    ) -> BaseProcessEvent: ...


def _get_event_data_to_compare(data: Dict) -> Dict:
    # Side effects in data means get_or_create depends on the side effect statuses changing
    # To avoid this, we can expand the data and not compare the side_effects when get_or_creating
    # TODO: store side_effects in a separate field/model rather than in the data as a long term fix
    return {f"data__{key}": value for key, value in data.items() if key != SIDE_EFFECTS_KEY}


def _record_event(
    event_handlers: Dict[EventType, EventHandlerCallable],
    process: BaseProcess,
    event_type: EventType,
    *,
    data: Optional[Dict] = None,
    user: Optional[models.Model] = None,
    occurred_at: Optional[dt.datetime] = None,
    events_related_name: Optional[str] = None,
    event_type_name: Optional[str] = None,
    filter_on_data: bool = True,
    filter_on_occurred_at: bool = False,
    unique_together: Optional[Iterable[str]] = None,
) -> BaseProcessEvent:
    """
    Record an event and tag with any pending side effects found on the event_handler.
    """

    def _meets_conditions(conditions: List[Callable]) -> bool:
        return all(condition(process) for condition in conditions)

    # set defaults
    occurred_at = occurred_at or localtime.now()
    data = data or {}

    # default values for event and process values
    events_related_name = events_related_name or DEFAULT_EVENTS_RELATED_NAME
    event_type_name = event_type_name or DEFAULT_EVENT_TYPE_NAME

    # We can use get_or_create to prevent duplication of events with the same data.
    # So that we don't re-create events when we retry failed ones
    get_or_create_params: Dict = {event_type_name: event_type}
    events_query_set = getattr(process, events_related_name)

    if unique_together:
        all_fields = {"occurred_at": occurred_at, "data": data, **get_or_create_params}
        filtering_fields, default_fields = item.split(all_fields, unique_together)

        event, created = events_query_set.update_or_create(
            **filtering_fields, defaults=default_fields
        )
    else:
        if filter_on_data:
            get_or_create_params.update(_get_event_data_to_compare(data))
        if user:
            get_or_create_params["support_user"] = user
        if filter_on_occurred_at:
            get_or_create_params["occurred_at"] = occurred_at

        event, created = events_query_set.get_or_create(
            **get_or_create_params,
            defaults={"occurred_at": occurred_at, "data": data},
        )

    # Create a list of side effects from the mapping in the data layer. These will run when
    # we build the event. The value represents the status:
    # None - not run
    # {str} - error message
    # The key is deleted once the side effect is successfully completed
    if created and (handler := event_handlers.get(event_type)):
        side_effects = getattr(handler, SIDE_EFFECTS_KEY, {})
        event.side_effects = {
            name: None
            for name, (_, conditions) in side_effects.items()
            if _meets_conditions(conditions)
        }

    event.save()
    return event


class BaseProcessEventType(models.base.ModelBase):
    """
    Ensures that any subclass of `BaseProcessEvent` defines an `event_type` CharField.

    Functionality that relies on this field being present will be added to the base class,
    so it's important that we force developers to define it on their event models.
    """

    def __new__(
        mcs, name: str, bases: Tuple[models.base.ModelBase, ...], attrs: Dict
    ) -> BaseProcessEventType:
        model_meta = attrs.get("Meta", None)
        if (
            model_meta is not None
            and not getattr(model_meta, "abstract", False)
            and not getattr(model_meta, "proxy", False)
        ):
            fields = {k: v for (k, v) in attrs.items() if isinstance(v, models.Field)}

            if "event_type" not in fields:
                raise RuntimeError("Must define 'event_type' field on 'BaseProcessEvent'!")
            elif not isinstance(fields["event_type"], models.CharField):
                raise RuntimeError("ProcessEvent 'event_type' must be a 'CharField'!")

        return super().__new__(mcs, name, bases, attrs)


class BaseProcessEvent(models.Model, metaclass=BaseProcessEventType):
    """
    Defines an interface for an ProcessEvent's side effects.

    Any subclass must define at least one of the two JSON model fields:
        - `side_effects`    (preferred)
        - `data`            (used for storing actual event data but side effects are tacked on)

    The need to deserialize data from event data dicts to more domain-specific objects
    has made it desirable to have the `side_effects` record separate from the event
    data itself. For backwards compatibility with existing ProcessEvent subclasses,
    we've defined a Parent class that can be applied to said subclasses without changing
    their functionality or table schema.

    TODO:   We've added a metaclass, so enforce that the event models has defined either
            `data` or `side_effects` there.
    """

    class Meta:
        abstract = True

    @property
    def side_effects(self) -> object:
        if hasattr(self, "_side_effects"):
            return self._side_effects

        if hasattr(self, "data"):
            if self.data is None:
                return None

            if not (isinstance(self.data, dict)):
                raise TypeError("Invalid 'data' attribute; must be JSON serialisable or None.")

            return self.data.get("side_effects")

        raise TypeError("Invalid subclass; must define 'data' or '_side_effects' model field.")

    @side_effects.setter
    def side_effects(self, val: Dict) -> None:
        self._side_effects: Optional[Dict]
        if hasattr(self, "_side_effects"):
            self._side_effects = val
        elif hasattr(self, "data"):
            self.data[SIDE_EFFECTS_KEY] = val

        self.save()

    @side_effects.deleter
    def side_effects(self) -> None:
        if hasattr(self, "_side_effects"):
            self._side_effects = None
        elif hasattr(self, "data"):
            if self.data is None:
                return

            del self.data[SIDE_EFFECTS_KEY]

        self.save()


class BaseProcessType(models.base.ModelBase):
    """
    Ensures that any subclass of `BaseProcessType` have their own event_handler_registry
    """

    def __new__(mcs, *args: object, **kwargs: object) -> BaseProcessType:
        obj = super().__new__(mcs, *args, **kwargs)
        setattr(obj, "event_handler_registry", {})
        return cast(BaseProcessType, obj)


class BaseProcess(Generic[EventType], models.Model, metaclass=BaseProcessType):
    """
    Abstract class that all processes should inherit from.

    The purpose of this is to standardise and simply the correct process/event functionality.
    Namely replacing the rest of the functionality in this module and 'domain.registry'.
    """

    event_handler_registry: Dict[EventType, EventHandlerCallable] = {}

    created_at: models.DateTimeField = models.DateTimeField(auto_now_add=True)
    updated_at: models.DateTimeField = models.DateTimeField(auto_now=True)

    class Meta:
        abstract = True

    @classmethod
    def handles_event(cls, event_type: EventType) -> Callable:
        def handler_decorator(func: Callable) -> Callable:
            cls.event_handler_registry[event_type] = func
            return func

        return handler_decorator

    @classmethod
    def has_side_effect(
        cls, action: Callable, conditions: List[Callable] | None = None
    ) -> Callable:
        if conditions is None:
            conditions = []

        def handler_decorator(func: Callable) -> Callable:
            side_effects = getattr(func, "side_effects", {})
            side_effects[action.__name__] = (action, conditions)
            setattr(func, "side_effects", side_effects)

            return func

        return handler_decorator

    def record_event(
        self,
        event_type: EventType,
        data: Optional[Dict] = None,
        occurred_at: Optional[dt.datetime] = None,
        **kwargs: object,
    ) -> BaseProcessEvent:
        return _record_event(
            self.event_handler_registry,
            self,
            event_type,
            data=data,
            occurred_at=occurred_at,
        )

    def build(self) -> BaseProcess:
        """
        TODO:   Implement. Subclasses should not have to override this.
        """
        raise NotImplementedError