Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import json

from webob import exc
from workloadmgr.common import context as wlm_context
from workloadmgr.volume import cinder
from workloadmgr.keymanager import barbican
from workloadmgr.compute import nova
from workloadmgr.api.validation_models.validators import BaseValidator
from workloadmgr import exception as wlm_exception
from workloadmgr import workloads as workloadAPI
from workloadmgr.vault import vault


class WorkloadValidator(BaseValidator):
    """
        Workload Validator Class.
        Does All Validation Operations related to workloads.
        partial Implementation.
        TODO: Need to implement going further.
    """
    def __init__(self, context=None, body=None, secret_ref=None, ):
        super(WorkloadValidator,self).__init__()
        self.body = body
        self.context = context
        self.encryption = body.get('encryption',False)
        self.secret_uuid = body.get('secret_uuid')
        if body.get('encryption'):
            self.barbican_api = barbican.API()
        self.cinder_api = cinder.API()
        self.nova_api = nova.API()

    def _validate_req_body(self):
        raise NotImplementedError()

    def _validate_workload_name(self):
        raise NotImplementedError()

    def _validate_workload_jobschedule(self):
        raise NotImplementedError()

    def _validate_workload_secrets(self):
        """
            * Validate the user secret from key-manager.
            * If secret ref exists then only allow to create Workload.
            optimized by breaking the loop when first occurence is met.
        """
        if self.encryption:
            try:
                secrets = self.barbican_api.list_all_secret(self.context)
                found = False
                if secrets:
                    for secret in secrets:
                        # break when unique secret ref exists.
                        if self.secret_uuid in secret.secret_ref and \
                                secret.payload and  \
                                json.loads(
                                        self.barbican_api.get_secret_metadata(self.context,
                                        self.secret_uuid)).get('metadata') == {}:
                            found = True
                            break
                if not found:
                    message = 'please provide valid secret uuid from secret key manager'  \
                            'or secret should have valid payload.'
                    raise wlm_exception.ErrorOccurred(message)
            except Exception as error:
                raise exc.HTTPServerError(explanation=str(error))

    def _validate_logical_path_for_workload_encryption(self):
        """
        #################################
        Not Supported-
        #################################
        Qcow2:-
        1. If workload_encryption is False and Image/s is Encrypted.
        2. If workload_encryption is True and Image/s is Encrypted.
        Cinder:-
        1. If workload_encryption is False and Image/s is Encrypted.

        * This method ensures only correct logical paths is hit for snapshots.
        * If Above path comes, simply returns False.
        """
        if not self.encryption:
            try:
                instances = self.body['instances']
                is_workload_encrypted = self.body.get('encryption', False)
                for instance in instances:
                    workload_instance = self.nova_api.get_server_by_id(self.context, instance.get('instance-id'))
                    attached_volumes = getattr(workload_instance, 'os-extended-volumes:volumes_attached')
                    for volume in attached_volumes:
                        volume_encrypted = self.cinder_api.is_volume_encrypted(self.context, volume['id'])
                        if not is_workload_encrypted and volume_encrypted:
                            message = 'UnEncrypted workload cannot have Encrypted Volume.'
                            raise wlm_exception.ErrorOccurred(message)
            except Exception as error:
                raise exc.HTTPServerError(explanation=str(error))

    def _ensure_trust(self):
        workload_api = workloadAPI.API()
        trust = workload_api.trust_list(self.context, get_hidden=True, is_cloud_admin=False)
        if not trust:
            trust = workload_api.trust_create(self.context, vault.CONF.trustee_role)
        return trust


def validate_workload(func):
    """
        Factory method for validating workloads.
        * This Validation Decorator make sure All types of Validation are covered
        * before hitting workload API.
        TODO: Need to implement for all.
    """
    def inner(*args, **kwargs):
        context = kwargs['req'].environ["workloadmgr.context"]
        body = kwargs['body']['workload']
        wv = WorkloadValidator(context=context, body=body)
        wv._ensure_trust()
        wv._validate_workload_secrets()
        wv._validate_logical_path_for_workload_encryption()

        return func(*args, **kwargs)
    return inner