Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import json
import datetime

from webob import exc
from workloadmgr.volume import cinder
from workloadmgr.keymanager import barbican
from workloadmgr.compute import nova
from workloadmgr.api.validation_models.validators import BaseValidator
from workloadmgr import exception as wlm_exception
from workloadmgr import workloads as workloadAPI
from workloadmgr.vault import vault
from workloadmgr import utils


class WorkloadValidator(BaseValidator):
    """
        Workload Validator Class.
        Does All Validation Operations related to workloads.
        partial Implementation.
        TODO: Need to implement going further.
    """
    def __init__(self, context=None, body=None, secret_ref=None, refresh=False):
        super(WorkloadValidator, self).__init__()
        self.body = body
        self.context = context
        self.encryption = body.get('encryption', False)
        self.secret_uuid = body.get('secret_uuid')
        # get workload encryption details in case of modify
        # operation as horizon does not send it.
        if refresh and not self.encryption and self.body.get("id"):
            workload_api = workloadAPI.API()
            workload_dict = workload_api.workload_get(self.context, self.body["id"])
            if workload_dict:
                if workload_dict.get("encryption"):
                    self.encryption = workload_dict["encryption"]
                if workload_dict.get("secret_uuid"):
                    self.secret_uuid = workload_dict["secret_uuid"]
        if self.encryption:
            self.barbican_api = barbican.API()
        self.cinder_api = cinder.API()
        self.nova_api = nova.API()

    def _validate_req_body(self):
        raise NotImplementedError()

    def _validate_workload_name(self):
        raise NotImplementedError()

    def _validate_workload_jobschedule(self):
        raise NotImplementedError()

    def _validate_workload_secrets(self, workload_id=None):
        """
            * Validate the user secret from key-manager.
            * If secret ref exists then only allow to create Workload.
            optimized by breaking the loop when first occurence is met.
        """
        if self.encryption:
            try:
                found = False
                try:
                    secret = self.barbican_api.get_secret(
                        self.context, secret_id=self.secret_uuid)
                except Exception as ex:
                    raise Exception("Secret could not be found, "
                                    "either the secret is not present or "
                                    "not accessible to the user")
                if secret and secret.payload:
                    wl_metadata = json.loads(
                        self.barbican_api.get_secret_metadata(
                            self.context, self.secret_uuid)).get('metadata')
                    if wl_metadata == {}:
                        found = True
                    elif workload_id and \
                            wl_metadata.get("workload_id") == workload_id:
                        found = True
                    else:
                        message = 'the secret UUID is already in used by workload: {}'.format(wl_metadata.get('workload_id'))
                        raise wlm_exception.ErrorOccurred(message)
                if not found:
                    message = 'Either the secret UUID or '\
                        'the payload of the secret is invalid.'
                    raise wlm_exception.ErrorOccurred(message)
            except Exception as error:
                raise exc.HTTPServerError(explanation=str(error))

    def _validate_logical_path_for_workload_encryption(self):
        """
        #################################
        Not Supported-
        #################################
        Qcow2:-
        1. If workload_encryption is False and Image/s is Encrypted.
        2. If workload_encryption is True and Image/s is Encrypted.
        Cinder:-
        1. If workload_encryption is False and Image/s is Encrypted.

        * This method ensures only correct logical paths is hit for snapshots.
        * If Above path comes, simply returns False.
        """
        if not self.encryption:
            try:
                instances = self.body.get('instances', [])
                for instance in instances:
                    instance_id = instance.get('instance-id') or instance.get('id')
                    if instance_id:
                        workload_instance = self.nova_api.get_server_by_id(self.context, instance_id)
                        if not workload_instance:
                            raise wlm_exception.ErrorOccurred("Unable to find Virtual Machine '{}' in nova inventory".format(instance_id))
                        attached_volumes = getattr(workload_instance, 'os-extended-volumes:volumes_attached')
                        for volume in attached_volumes:
                            volume_encrypted = self.cinder_api.is_volume_encrypted(self.context, volume['id'])
                            if not self.encryption and volume_encrypted:
                                raise wlm_exception.ErrorOccurred(
                                    "Unencrypted workload cannot have instance: {} with encrypted Volume: {}".format(
                                        instance_id, volume.get('id')
                                    )
                                )
            except Exception as error:
                raise exc.HTTPServerError(explanation=str(error))

    def _ensure_trust(self):
        workload_api = workloadAPI.API()
        trust = workload_api.trust_list(self.context, get_hidden=True, is_cloud_admin=False)
        if not trust:
            trust = workload_api.trust_create(self.context, vault.CONF.trustee_role)
        return trust

    def _validate_jobschedule_date_time(self, jobschedule, workloadobj_jobschedule=None, flag_timezone_convert=True):
        if 'start_date' in jobschedule:
            try:
                datetime.datetime.strptime(jobschedule['start_date'], '%m/%d/%Y')
            except ValueError as ex:
                 raise wlm_exception.Invalid(
                     reason=_("start_date: {} does not match format '%m/%d/%Y'. Examples: 06/05/2014, 07/15/2014".format(jobschedule['start_date'])))

        if 'end_date' in jobschedule and \
            jobschedule['end_date'].lower() != 'no end':
            try:
                datetime.datetime.strptime(jobschedule['end_date'], '%m/%d/%Y')
            except ValueError as ex:
                 raise wlm_exception.Invalid(
                     reason=_("end_date: {} does not match format '%m/%d/%Y'. Examples: 06/05/2014, 07/15/2014".format(jobschedule['end_date'])))

        if 'start_time' in jobschedule:
            try:
                datetime.datetime.strptime(jobschedule['start_time'], '%I:%M %p')
            except ValueError as ex:
                raise wlm_exception.Invalid(
                    reason=_("start_time: {} does not match format '%I:%M %p'. Examples: 12:05 PM, 04:30 AM".format(jobschedule['start_time'])))

        # dictionary to hold JS params to verify
        jobschedule_param_dict = {'enabled': False}
        if workloadobj_jobschedule:
            if flag_timezone_convert:
                jobschedule = utils.convert_jobschedule_date_tz(jobschedule)
            else:
                jobschedule['timezone'] = workloadobj_jobschedule['timezone'] if workloadobj_jobschedule.get('timezone') else 'UTC'
            if jobschedule['enabled']:
                jobschedule_param_dict.update({'enabled': jobschedule['enabled']})
                if (jobschedule.get('start_date') and jobschedule['start_date'] != workloadobj_jobschedule['start_date'] or \
                    jobschedule.get('start_time') and jobschedule['start_time'] != workloadobj_jobschedule['start_time']):
                    jobschedule_param_dict.update({
                        'start_date': jobschedule['start_date'],
                        'start_time': jobschedule['start_time']
                    })
                if jobschedule.get('end_date') and jobschedule['end_date'] != workloadobj_jobschedule.get('end_date', 'No End'):
                    jobschedule_param_dict.update({
                        'end_date': jobschedule['end_date']
                    })
        else:
            jobschedule = utils.convert_jobschedule_date_tz(jobschedule)
            jobschedule_param_dict = jobschedule

        if jobschedule_param_dict.get('enabled'):
            if 'start_date' in jobschedule_param_dict:
                if datetime.datetime.strptime(jobschedule_param_dict['start_date'], '%m/%d/%Y').date() < datetime.datetime.utcnow().date():
                    raise wlm_exception.Invalid(
                        reason=_("start_date: {} cannot be lesser than today ({} UTC).".format(jobschedule_param_dict['start_date'], datetime.datetime.utcnow().strftime('%m/%d/%Y'))))

            end_date_tomm_utc = datetime.datetime.utcnow().date() + datetime.timedelta(days=1)
            if 'end_date' in jobschedule_param_dict and \
                str(jobschedule_param_dict['end_date']).lower() != 'no end':
                if datetime.datetime.strptime(jobschedule_param_dict['end_date'], '%m/%d/%Y').date() < end_date_tomm_utc:
                    raise wlm_exception.Invalid(
                        reason=_("end_date: {} cannot be lesser than tomorrow ({} UTC).".format(jobschedule_param_dict['end_date'], end_date_tomm_utc)))

            if 'start_time' in jobschedule_param_dict:
                if datetime.datetime.strptime(jobschedule_param_dict['start_date'], '%m/%d/%Y').date() == \
                    datetime.datetime.utcnow().date() and \
                    datetime.datetime.strptime(jobschedule_param_dict['start_date'] + ' ' + jobschedule_param_dict['start_time'], '%m/%d/%Y %I:%M %p') < datetime.datetime.utcnow():
                    raise wlm_exception.Invalid(
                        reason=_("start_time: {} cannot be lesser than current time({} UTC).".format(jobschedule_param_dict['start_time'], datetime.datetime.utcnow().strftime('%I:%M %p'))))
        return jobschedule


def validate_workload(func):
    """
        Factory method for validating workloads.
        * This Validation Decorator make sure All types of Validation are covered
        * before hitting workload API.
        TODO: Need to implement for all.
    """
    def inner(*args, **kwargs):
        context = kwargs['req'].environ["workloadmgr.context"]
        body = kwargs['body']['workload']
        # pass refresh=True as mostly request body won't have updated
        # information in case of workload modify operation
        if kwargs.get("id") and "id" not in body:
            body.update({"id": kwargs['id']})
        wv = WorkloadValidator(context=context, body=body, refresh=True)
        # need to have trust to verify barbican secrets
        wv._ensure_trust()
        wv._validate_workload_secrets(workload_id=body.get("id", None))
        wv._validate_logical_path_for_workload_encryption()

        return func(*args, **kwargs)
    return inner