Repository URL to install this package:
|
Version:
0.1.3 ▾
|
from __future__ import annotations
import datetime as dt
import enum
import typing
from typing import Callable, Dict, Generic, Iterable, List, Optional, Tuple, TypeVar, cast
from django.db import models
from .utils import item, localtime
SIDE_EFFECTS_KEY = "side_effects"
DEFAULT_EVENTS_RELATED_NAME = "events"
DEFAULT_EVENT_TYPE_NAME = "event_type"
EventType = TypeVar("EventType", bound=enum.Enum)
EventHandlerCallable = Callable[["BaseProcess", "BaseProcessEvent", object], None]
class CallableEvent(typing.Protocol):
def __call__(
self,
event_type: enum.Enum,
data: dict | None = None,
occurred_at: dt.datetime | None = None,
**kwargs: object,
) -> BaseProcessEvent: ...
def _get_event_data_to_compare(data: Dict) -> Dict:
# Side effects in data means get_or_create depends on the side effect statuses changing
# To avoid this, we can expand the data and not compare the side_effects when get_or_creating
# TODO: store side_effects in a separate field/model rather than in the data as a long term fix
return {f"data__{key}": value for key, value in data.items() if key != SIDE_EFFECTS_KEY}
def _record_event(
event_handlers: Dict[EventType, EventHandlerCallable],
process: BaseProcess,
event_type: EventType,
*,
data: Optional[Dict] = None,
user: Optional[models.Model] = None,
occurred_at: Optional[dt.datetime] = None,
events_related_name: Optional[str] = None,
event_type_name: Optional[str] = None,
filter_on_data: bool = True,
filter_on_occurred_at: bool = False,
unique_together: Optional[Iterable[str]] = None,
) -> BaseProcessEvent:
"""
Record an event and tag with any pending side effects found on the event_handler.
"""
def _meets_conditions(conditions: List[Callable]) -> bool:
return all(condition(process) for condition in conditions)
# set defaults
occurred_at = occurred_at or localtime.now()
data = data or {}
# default values for event and process values
events_related_name = events_related_name or DEFAULT_EVENTS_RELATED_NAME
event_type_name = event_type_name or DEFAULT_EVENT_TYPE_NAME
# We can use get_or_create to prevent duplication of events with the same data.
# So that we don't re-create events when we retry failed ones
get_or_create_params: Dict = {event_type_name: event_type}
events_query_set = getattr(process, events_related_name)
if unique_together:
all_fields = {"occurred_at": occurred_at, "data": data, **get_or_create_params}
filtering_fields, default_fields = item.split(all_fields, unique_together)
event, created = events_query_set.update_or_create(
**filtering_fields, defaults=default_fields
)
else:
if filter_on_data:
get_or_create_params.update(_get_event_data_to_compare(data))
if user:
get_or_create_params["support_user"] = user
if filter_on_occurred_at:
get_or_create_params["occurred_at"] = occurred_at
event, created = events_query_set.get_or_create(
**get_or_create_params,
defaults={"occurred_at": occurred_at, "data": data},
)
# Create a list of side effects from the mapping in the data layer. These will run when
# we build the event. The value represents the status:
# None - not run
# {str} - error message
# The key is deleted once the side effect is successfully completed
if created and (handler := event_handlers.get(event_type)):
side_effects = getattr(handler, SIDE_EFFECTS_KEY, {})
event.side_effects = {
name: None
for name, (_, conditions) in side_effects.items()
if _meets_conditions(conditions)
}
event.save()
return event
class BaseProcessEventType(models.base.ModelBase):
"""
Ensures that any subclass of `BaseProcessEvent` defines an `event_type` CharField.
Functionality that relies on this field being present will be added to the base class,
so it's important that we force developers to define it on their event models.
"""
def __new__(
mcs, name: str, bases: Tuple[models.base.ModelBase, ...], attrs: Dict
) -> BaseProcessEventType:
model_meta = attrs.get("Meta", None)
if (
model_meta is not None
and not getattr(model_meta, "abstract", False)
and not getattr(model_meta, "proxy", False)
):
fields = {k: v for (k, v) in attrs.items() if isinstance(v, models.Field)}
if "event_type" not in fields:
raise RuntimeError("Must define 'event_type' field on 'BaseProcessEvent'!")
elif not isinstance(fields["event_type"], models.CharField):
raise RuntimeError("ProcessEvent 'event_type' must be a 'CharField'!")
return super().__new__(mcs, name, bases, attrs)
class BaseProcessEvent(models.Model, metaclass=BaseProcessEventType):
"""
Defines an interface for an ProcessEvent's side effects.
Any subclass must define at least one of the two JSON model fields:
- `side_effects` (preferred)
- `data` (used for storing actual event data but side effects are tacked on)
The need to deserialize data from event data dicts to more domain-specific objects
has made it desirable to have the `side_effects` record separate from the event
data itself. For backwards compatibility with existing ProcessEvent subclasses,
we've defined a Parent class that can be applied to said subclasses without changing
their functionality or table schema.
TODO: We've added a metaclass, so enforce that the event models has defined either
`data` or `side_effects` there.
"""
class Meta:
abstract = True
@property
def side_effects(self) -> object:
if hasattr(self, "_side_effects"):
return self._side_effects
if hasattr(self, "data"):
if self.data is None:
return None
if not (isinstance(self.data, dict)):
raise TypeError("Invalid 'data' attribute; must be JSON serialisable or None.")
return self.data.get("side_effects")
raise TypeError("Invalid subclass; must define 'data' or '_side_effects' model field.")
@side_effects.setter
def side_effects(self, val: Dict) -> None:
self._side_effects: Optional[Dict]
if hasattr(self, "_side_effects"):
self._side_effects = val
elif hasattr(self, "data"):
self.data[SIDE_EFFECTS_KEY] = val
self.save()
@side_effects.deleter
def side_effects(self) -> None:
if hasattr(self, "_side_effects"):
self._side_effects = None
elif hasattr(self, "data"):
if self.data is None:
return
del self.data[SIDE_EFFECTS_KEY]
self.save()
class BaseProcessType(models.base.ModelBase):
"""
Ensures that any subclass of `BaseProcessType` have their own event_handler_registry
"""
def __new__(mcs, *args: object, **kwargs: object) -> BaseProcessType:
obj = super().__new__(mcs, *args, **kwargs)
setattr(obj, "event_handler_registry", {})
return cast(BaseProcessType, obj)
class BaseProcess(Generic[EventType], models.Model, metaclass=BaseProcessType):
"""
Abstract class that all processes should inherit from.
The purpose of this is to standardise and simply the correct process/event functionality.
Namely replacing the rest of the functionality in this module and 'domain.registry'.
"""
event_handler_registry: Dict[EventType, EventHandlerCallable] = {}
created_at: models.DateTimeField = models.DateTimeField(auto_now_add=True)
updated_at: models.DateTimeField = models.DateTimeField(auto_now=True)
class Meta:
abstract = True
@classmethod
def handles_event(cls, event_type: EventType) -> Callable:
def handler_decorator(func: Callable) -> Callable:
cls.event_handler_registry[event_type] = func
return func
return handler_decorator
@classmethod
def has_side_effect(
cls, action: Callable, conditions: List[Callable] | None = None
) -> Callable:
if conditions is None:
conditions = []
def handler_decorator(func: Callable) -> Callable:
side_effects = getattr(func, "side_effects", {})
side_effects[action.__name__] = (action, conditions)
setattr(func, "side_effects", side_effects)
return func
return handler_decorator
def record_event(
self,
event_type: EventType,
data: Optional[Dict] = None,
occurred_at: Optional[dt.datetime] = None,
**kwargs: object,
) -> BaseProcessEvent:
return _record_event(
self.event_handler_registry,
self,
event_type,
data=data,
occurred_at=occurred_at,
)
def build(self) -> BaseProcess:
"""
TODO: Implement. Subclasses should not have to override this.
"""
raise NotImplementedError