Repository URL to install this package:
|
Version:
5.2.1.1.dev1 ▾
|
import json
from webob import exc
from workloadmgr.volume import cinder
from workloadmgr.keymanager import barbican
from workloadmgr.compute import nova
from workloadmgr.api.validation_models.validators import BaseValidator
from workloadmgr import exception as wlm_exception
from workloadmgr import workloads as workloadAPI
from workloadmgr.vault import vault
class MigrationPlanValidator(BaseValidator):
"""
MigrationPlan Validator Class.
Does All Validation Operations related to migration_plans.
partial Implementation.
TODO: Need to implement going further.
"""
def __init__(self, context=None, body=None, secret_ref=None, refresh=False):
super(MigrationPlanValidator, self).__init__()
self.body = body
self.context = context
self.cinder_api = cinder.API()
self.nova_api = nova.API()
def _validate_req_body(self):
raise NotImplementedError()
def _ensure_trust(self):
workload_api = workloadAPI.API()
trust = workload_api.trust_list(self.context, get_hidden=True, is_cloud_admin=False)
if not trust:
trust = workload_api.trust_create(self.context, vault.CONF.trustee_role)
return trust
class MigrationValidator(BaseValidator):
"""
MigrationPlan Validator Class.
Does All Validation Operations related to migration_plans.
partial Implementation.
TODO: Need to implement going further.
"""
def __init__(self, context=None, body=None, secret_ref=None, refresh=False):
super(MigrationValidator, self).__init__()
self.body = body
self.context = context
self.cinder_api = cinder.API()
self.nova_api = nova.API()
def _validate_req_body(self):
raise NotImplementedError()
def _ensure_trust(self):
workload_api = workloadAPI.API()
trust = workload_api.trust_list(self.context, get_hidden=True, is_cloud_admin=False)
if not trust:
trust = workload_api.trust_create(self.context, vault.CONF.trustee_role)
return trust
def validate_migration_plan(func):
"""
Factory method for validating migration_plan.
* This Validation Decorator make sure All types of Validation are covered
* before hitting migration_plan API.
TODO: Need to implement for all.
"""
def inner(*args, **kwargs):
context = kwargs['req'].environ["workloadmgr.context"]
body = kwargs['body']['migration_plan']
# pass refresh=True as mostly request body won't have updated
# information in case of workload modify operation
if kwargs.get("id") and "id" not in body:
body.update({"id": kwargs['id']})
wv = MigrationPlanValidator(context=context, body=body, refresh=True)
# need to have trust to verify barbican secrets
wv._ensure_trust()
return func(*args, **kwargs)
return inner
def validate_migration(func):
"""
Factory method for validating migration_plan.
* This Validation Decorator make sure All types of Validation are covered
* before hitting migration_plan API.
TODO: Need to implement for all.
"""
def inner(*args, **kwargs):
context = kwargs['req'].environ["workloadmgr.context"]
body = kwargs['body']['migration']
wv = MigrationValidator(context=context, body=body, refresh=True)
# need to have trust to verify barbican secrets
wv._ensure_trust()
return func(*args, **kwargs)
return inner