Repository URL to install this package:
|
Version:
4.2.35 ▾
|
import json
from webob import exc
from workloadmgr.common import context as wlm_context
from workloadmgr.volume import cinder
from workloadmgr.keymanager import barbican
from workloadmgr.compute import nova
from workloadmgr.api.validation_models.validators import BaseValidator
from workloadmgr import exception as wlm_exception
from workloadmgr import workloads as workloadAPI
from workloadmgr.vault import vault
class WorkloadValidator(BaseValidator):
"""
Workload Validator Class.
Does All Validation Operations related to workloads.
partial Implementation.
TODO: Need to implement going further.
"""
def __init__(self, context=None, body=None, secret_ref=None, ):
super(WorkloadValidator,self).__init__()
self.body = body
self.context = context
self.encryption = body.get('encryption',False)
self.secret_uuid = body.get('secret_uuid')
if body.get('encryption'):
self.barbican_api = barbican.API()
self.cinder_api = cinder.API()
self.nova_api = nova.API()
def _validate_req_body(self):
raise NotImplementedError()
def _validate_workload_name(self):
raise NotImplementedError()
def _validate_workload_jobschedule(self):
raise NotImplementedError()
def _validate_workload_secrets(self):
"""
* Validate the user secret from key-manager.
* If secret ref exists then only allow to create Workload.
optimized by breaking the loop when first occurence is met.
"""
if self.encryption:
try:
secrets = self.barbican_api.list_all_secret(self.context)
found = False
if secrets:
for secret in secrets:
# break when unique secret ref exists.
if self.secret_uuid in secret.secret_ref and \
secret.payload and \
json.loads(
self.barbican_api.get_secret_metadata(self.context,
self.secret_uuid)).get('metadata') == {}:
found = True
break
if not found:
message = 'please provide valid secret uuid from secret key manager' \
'or secret should have valid payload.'
raise wlm_exception.ErrorOccurred(message)
except Exception as error:
raise exc.HTTPServerError(explanation=str(error))
def _validate_logical_path_for_workload_encryption(self):
"""
#################################
Not Supported-
#################################
Qcow2:-
1. If workload_encryption is False and Image/s is Encrypted.
2. If workload_encryption is True and Image/s is Encrypted.
Cinder:-
1. If workload_encryption is False and Image/s is Encrypted.
* This method ensures only correct logical paths is hit for snapshots.
* If Above path comes, simply returns False.
"""
if not self.encryption:
try:
instances = self.body['instances']
is_workload_encrypted = self.body.get('encryption', False)
for instance in instances:
workload_instance = self.nova_api.get_server_by_id(self.context, instance.get('instance-id'))
attached_volumes = getattr(workload_instance, 'os-extended-volumes:volumes_attached')
for volume in attached_volumes:
volume_encrypted = self.cinder_api.is_volume_encrypted(self.context, volume['id'])
if not is_workload_encrypted and volume_encrypted:
message = 'UnEncrypted workload cannot have Encrypted Volume.'
raise wlm_exception.ErrorOccurred(message)
except Exception as error:
raise exc.HTTPServerError(explanation=str(error))
def _ensure_trust(self):
workload_api = workloadAPI.API()
trust = workload_api.trust_list(self.context, get_hidden=True, is_cloud_admin=False)
if not trust:
trust = workload_api.trust_create(self.context, vault.CONF.trustee_role)
return trust
def validate_workload(func):
"""
Factory method for validating workloads.
* This Validation Decorator make sure All types of Validation are covered
* before hitting workload API.
TODO: Need to implement for all.
"""
def inner(*args, **kwargs):
context = kwargs['req'].environ["workloadmgr.context"]
body = kwargs['body']['workload']
wv = WorkloadValidator(context=context, body=body)
wv._ensure_trust()
wv._validate_workload_secrets()
wv._validate_logical_path_for_workload_encryption()
return func(*args, **kwargs)
return inner