Repository URL to install this package:
|
Version:
5.2.1.1.dev1 ▾
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 TrilioData, Inc.
# All Rights Reserved.
from workloadmgr.openstack.common import log as logging
from workloadmgr.api import common
LOG = logging.getLogger(__name__)
class ViewBuilder(common.ViewBuilder):
"""Model migration plan API responses as a python dictionary."""
_collection_name = "migration-plans"
def __init__(self):
"""Initialize view builder."""
super(ViewBuilder, self).__init__()
def summary_list(self, request, migration_plans, api=None, *args, **kwargs):
"""Show a list of migration_plans without many details."""
return self._list_view(self.summary, request, migration_plans, api, *args, **kwargs)
def detail_list(self, request, migration_plans, api=None, *args, **kwargs):
"""Detailed view of a list of migration_plans ."""
return self._list_view(self.detail, request, migration_plans, api, *args, **kwargs)
def summary(self, request, migration_plan, api=None, *args, **kwargs):
"""Generic, non-detailed view of a migration_plan."""
context = request.environ["workloadmgr.context"]
res = None
fetch_scheduler_trust = kwargs.get("scheduler_trust", False)
if fetch_scheduler_trust:
if not api:
LOG.exception(
"'scheduler_trust' and 'api' both should be provided to ViewBuilder"
)
else:
res = api.validate_scheduler_trust(context, migration_plan.id)
return {
"migration_plan": {
"project_id": migration_plan.get("project_id"),
"user_id": migration_plan.get("user_id"),
"id": migration_plan["id"],
"name": migration_plan["display_name"],
"migrations_info": "",
"description": migration_plan["display_description"],
"status": migration_plan["status"],
"created_at": migration_plan.get("created_at"),
"updated_at": migration_plan.get("updated_at"),
"scheduler_trust": res,
'vms': migration_plan.get('vms', []),
"links": self._get_links(request, migration_plan["id"]),
},
}
def restore_summary(self, request, migration, api=None):
"""Generic, non-detailed view of a migration."""
return {
"migration": {
"migration_plan_id": migration["migration_plan_id"],
"instance_id": migration["instance_id"],
},
}
def detail(self, request, migration_plan, api=None, *args, **kwargs):
"""Detailed view of a single migration_plan."""
context = request.environ["workloadmgr.context"]
if api is not None:
migration_plan = api.migration_plan_get(context, migration_plan_id=migration_plan['id'])
res = None
fetch_scheduler_trust = kwargs.get("scheduler_trust", False)
if fetch_scheduler_trust:
if not api:
LOG.exception(
"'scheduler_trust' and 'api' both should be provided to ViewBuilder"
)
else:
res = api.validate_scheduler_trust(context, migration_plan['id'])
policy_id = None
meta_data = migration_plan.get("metadata", {})
if meta_data:
if isinstance(meta_data, list):
for i in meta_data:
policy_id = i.get('value') if i.get('key') == 'policy_id' else None
if policy_id:
break
if isinstance(meta_data, dict):
policy_id = meta_data.get("policy_id", None)
return {
"migration_plan": {
"created_at": migration_plan.get("created_at"),
"updated_at": migration_plan.get("updated_at"),
"id": migration_plan.get("id"),
"user_id": migration_plan.get("user_id"),
"project_id": migration_plan.get("project_id"),
"availability_zone": migration_plan.get("availability_zone"),
"migration_type": migration_plan.get("migration_type"),
"name": migration_plan.get("display_name"),
"description": migration_plan.get("display_description"),
"vms": migration_plan.get("vms"),
"metadata": migration_plan.get("metadata"),
"status": migration_plan.get("status"),
"error_msg": migration_plan.get("error_msg"),
"links": self._get_links(request, migration_plan["id"]),
"scheduler_trust": res,
}
}
def _list_view(self, func, request, migration_plans, api=None, *args, **kwargs):
"""Provide a view for a list of migration_plans."""
migration_plans_list = [
func(request, migration_plan, api, *args, **kwargs)["migration_plan"]
for migration_plan in migration_plans
]
migration_plans_links = self._get_collection_links(
request, migration_plans, self._collection_name
)
migration_plans_dict = dict(migration_plans=migration_plans_list)
if migration_plans_links:
migration_plans_dict["migration_plans_links"] = migration_plans_links
return migration_plans_dict