Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
dj-kaos / governance / roles.py
Size: Mime:
from functools import reduce
from operator import or_
from typing import Sequence

from django.apps import apps as global_apps
from django.core.exceptions import ImproperlyConfigured
from django.db.models import Q


class Role:
    group_model_name = 'auth.Group'
    permissions_model_name = 'auth.Permission'
    group_name_prefix = 'system::'

    def __init__(self, name: str, permission_descriptors: Sequence[str], mk_delete=False):
        self.name = name
        self.group_name = self.group_name_prefix + name
        self.permission_descriptors = permission_descriptors
        self.mk_delete = mk_delete

    @property
    def group(self):
        return global_apps.get_model(self.group_model_name).objects.get(name=self.group_name)

    def get_or_create_group(self, sender, apps=None, using='default'):
        apps = apps or global_apps
        sender_app_label = sender.label
        group_qs = apps.get_model(self.group_model_name).objects.using(using)
        permission_qs = apps.get_model(self.permissions_model_name).objects.using(using)

        if self.mk_delete:
            if self.permission_descriptors:
                raise ImproperlyConfigured(f"Permissions should be empty when marking a role to delete: {self.name}")
            count, _ = group_qs.filter(name=self.group_name).delete()
            if count:
                print(f"Deleted group {self.group_name}")
            return

        descriptors = [
            d.split('.', maxsplit=1)
            for d in self.permission_descriptors
            if d.startswith(sender_app_label + '.')
        ]
        group, created = group_qs.get_or_create(name=self.group_name)
        if created:
            print(f"Created group {self.group_name}")
        group.permissions.remove(
            *permission_qs.filter(content_type__app_label=sender_app_label)
        )
        if descriptors:
            q_permissions = reduce(
                or_,
                (Q(content_type__app_label=app_label, codename=codename) for app_label, codename in descriptors)
            )
            permissions = permission_qs.filter(q_permissions)
            group.permissions.add(*permissions)
        return group, created

    def is_member(self, user):
        return user in self.group.user_set.all()


class RolesRegistry:
    """
    A central registry to keep track of all different roles in the application.
    Register your roles with @register decorator.
    """

    def __init__(self):
        self.registered_roles: set[Role] = set()

    def register(self, role):
        """
        Register a role in the application

        :param role: The role instance
        :return: `role`
        """
        self.registered_roles.add(role)
        return role


role_registry = RolesRegistry()
register_role = role_registry.register


class RolesContainerMeta(type):
    def __new__(cls, name, bases, attrs, **kwargs):
        registry = attrs.get('registry', role_registry)

        for v in attrs.values():
            if isinstance(v, Role):
                registry.register(v)

        return super().__new__(cls, name, bases, attrs, **kwargs)


class RolesContainer(metaclass=RolesContainerMeta):
    registry: RolesRegistry = role_registry


__all__ = (
    'Role',
    'RolesRegistry',
    'role_registry',
    'register_role',
    'RolesContainerMeta',
    'RolesContainer',
)