Repository URL to install this package:
|
Version:
0.4.200 ▾
|
lib-py-b2b
/
policy.py
|
|---|
from typing import Callable
from lib_b2b.errors import PolicyViolation
import logging
from os import environ
import abc
import re
logger = logging.getLogger(__name__)
class Operation:
@abc.abstractmethod
def operation_identifier(self) -> str:
raise NotImplementedError
@abc.abstractmethod
def subject(self) -> str:
raise NotImplementedError
class Predicate:
@abc.abstractmethod
def evaluate(self, obj, with_: Operation) -> bool:
pass
class Policy:
SUBJECT_ALL = '.*'
def __init__(self, name: str, predicates: [Predicate] = None, subject: str = SUBJECT_ALL, policies: ['Policy'] = None):
"""
Creates a Policy object for use in governing the execution of operations
:param name: name of the policy - useful for persistence and logging
:type name: str
:param predicates: the predicates that make up the policy conditions
:type predicates: list of Predicates
:param subject: the subject to which the policy applies
:type subject:
:param policies: a Policy object can be composed from other Policy objects
:type policies: a list of Policy objects
"""
if policies is None:
policies = []
if predicates is None:
predicates = []
self.policies = policies
self.name = name
self.predicates = predicates
self.subject = subject
def is_applicable(self, operation: Operation):
if re.match(self.subject, operation.subject()):
return True
else:
return any(p.is_applicable(operation) for p in self.policies)
def is_permitted(self, operation: Operation, obj):
for p in self.predicates:
if not p.evaluate(obj, operation):
logger.warning(f"Policy [{self.name}] failed for {operation.operation_identifier()} "
f"due to rule [{p.name}].")
raise PolicyViolation(message=p.name)
applicable_policies = [policy for policy in self.policies if policy.is_applicable(operation)]
if not applicable_policies:
return True
else:
return any(policy.is_permitted(operation, obj) for policy in applicable_policies)
class SimplePredicate(Predicate):
"""
Boolean value from customer account configuration
"""
def __init__(self, fn: Callable, name: str):
"""
Evaluates a simple function for true or false
:param fn: the function or lambda
:type fn: function/lambda
:param name: the name of the predicate
:type name: str
"""
self.fn = fn
self.name = name
def evaluate(self, obj, with_: Operation) -> bool:
return self.fn(with_)
#
# class ComparisonPredicate(Predicate):
# """
# Allows for comparison
# """
#
# def applies(self, action: A) -> bool:
# raise NotImplementedError
#
# def evaluate(self, with_: A) -> bool:
# raise NotImplementedError
#
# @staticmethod
# def from_dict(data: dict):
# raise NotImplementedError
#
# def as_dict(self) -> dict:
# raise NotImplementedError
#