Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
lib-py-b2b / policy.py
Size: Mime:
from typing import Callable

from lib_b2b.errors import PolicyViolation
import logging
from os import environ
import abc
import re

logger = logging.getLogger(__name__)


class Operation:
    @abc.abstractmethod
    def operation_identifier(self) -> str:
        raise NotImplementedError

    @abc.abstractmethod
    def subject(self) -> str:
        raise NotImplementedError


class Predicate:
    @abc.abstractmethod
    def evaluate(self, obj, with_: Operation) -> bool:
        pass


class Policy:
    SUBJECT_ALL = '.*'

    def __init__(self, name: str, predicates: [Predicate] = None, subject: str = SUBJECT_ALL, policies: ['Policy'] = None):
        """
        Creates a Policy object for use in governing the execution of operations
        :param name: name of the policy - useful for persistence and logging
        :type name: str
        :param predicates: the predicates that make up the policy conditions
        :type predicates: list of Predicates
        :param subject: the subject to which the policy applies
        :type subject:
        :param policies: a Policy object can be composed from other Policy objects
        :type policies: a list of Policy objects
        """
        if policies is None:
            policies = []
        if predicates is None:
            predicates = []
        self.policies = policies
        self.name = name
        self.predicates = predicates
        self.subject = subject

    def is_applicable(self, operation: Operation):
        if re.match(self.subject, operation.subject()):
            return True
        else:
            return any(p.is_applicable(operation) for p in self.policies)

    def is_permitted(self, operation: Operation, obj):
        for p in self.predicates:
            if not p.evaluate(obj, operation):
                logger.warning(f"Policy [{self.name}] failed for {operation.operation_identifier()} "
                               f"due to rule [{p.name}].")
                raise PolicyViolation(message=p.name)
        applicable_policies = [policy for policy in self.policies if policy.is_applicable(operation)]
        if not applicable_policies:
            return True
        else:
            return any(policy.is_permitted(operation, obj) for policy in applicable_policies)


class SimplePredicate(Predicate):
    """
    Boolean value from customer account configuration
    """
    def __init__(self, fn: Callable, name: str):
        """
        Evaluates a simple function for true or false
        :param fn: the function or lambda
        :type fn: function/lambda
        :param name: the name of the predicate
        :type name: str
        """
        self.fn = fn
        self.name = name

    def evaluate(self, obj, with_: Operation) -> bool:
        return self.fn(with_)


#
# class ComparisonPredicate(Predicate):
#     """
#     Allows for comparison
#     """
#
#     def applies(self, action: A) -> bool:
#         raise NotImplementedError
#
#     def evaluate(self, with_: A) -> bool:
#         raise NotImplementedError
#
#     @staticmethod
#     def from_dict(data: dict):
#         raise NotImplementedError
#
#     def as_dict(self) -> dict:
#         raise NotImplementedError
#