Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
wbfsm / wbfsm / models.py
Size: Mime:
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from django.dispatch import receiver

from django_fsm.signals import post_transition
from .exceptions import ChangeInstanceForbiddenException, DeleteInstanceForbiddenException

class ProtectDataMixin:
    def save(self, *args, **kwargs):
        if self.id:
            old_status = self.__class__.objects.get(id=self.id).status
            new_status = self.status
            
            if old_status == new_status and old_status not in self.STATUS_CHANGE_INSTANCE:
                raise ChangeInstanceForbiddenException(f"Can only change data when in one of the following states: {', '.join(self.STATUS_CHANGE_INSTANCE)}")

        super().save(*args, **kwargs)

    def delete(self, *args, **kwargs):
        if self.status not in self.STATUS_DELETE_INSTANCE:
            raise DeleteInstanceForbiddenException(f"Can only delete it when in one of the following states: {', '.join(self.STATUS_DELETE_INSTANCE)}")
        super().delete(*args, **kwargs)

@receiver(post_transition)
def send_post_transition_websocket(sender, instance, name, source, target, **kwargs):
    if hasattr(sender, 'DEFAULT_IDENTIFIER'):
        channel_layer = get_channel_layer()
        payload = {
            'type': 'broadcast_state_change',
            'content': {
                'X-Identifier': sender.DEFAULT_IDENTIFIER,
                'data': [{
                    'id': instance.id,
                    'status': target,
                }]
            }
        }
        group_send = async_to_sync(channel_layer.group_send)
        group_send('wbfsm-broadcast', payload)