Repository URL to install this package:
|
Version:
4.1.94.1.dev5 ▾
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2015 TrilioData, Inc.
# All Rights Reserved.
"""The trust api."""
import json
import webob
from webob import exc
try:
from cgi import parse_qs
except:
from six.moves.urllib.parse import parse_qs
try:
from cgi import escape
except:
from html import escape
from workloadmgr.api import common
from workloadmgr.api import wsgi
from workloadmgr.api import xmlutil
from workloadmgr import exception as wlm_exceptions
from workloadmgr import flags
from workloadmgr.openstack.common import log as logging
from workloadmgr.openstack.common import strutils
from workloadmgr import utils
from workloadmgr import workloads as workloadAPI
LOG = logging.getLogger(__name__)
FLAGS = flags.FLAGS
class TrustController(wsgi.Controller):
"""The trust API controller for the workload manager API."""
def __init__(self, ext_mgr=None):
self.workload_api = workloadAPI.API()
self.ext_mgr = ext_mgr
super(TrustController, self).__init__()
def create(self, req, body):
"""Create a new trust"""
try:
context = req.environ["workloadmgr.context"]
role_name = body["trusts"]["role_name"]
is_cloud_trust = body["trusts"]["is_cloud_trust"]
created_trust = self.workload_api.trust_create(
context, role_name, is_cloud_trust
)
return {"trust": created_trust}
except exc.HTTPNotFound as error:
raise error
except exc.HTTPBadRequest as error:
raise error
except exc.HTTPServerError as error:
raise error
except Exception as error:
raise exc.HTTPServerError(explanation=str(error))
def delete(self, req, name):
"""Delete a trust."""
try:
context = req.environ["workloadmgr.context"]
try:
self.workload_api.trust_delete(context, name)
except wlm_exceptions.NotFound:
raise exc.HTTPNotFound()
except wlm_exceptions.InvalidState as error:
raise exc.HTTPBadRequest(explanation=str(error))
except exc.HTTPNotFound as error:
LOG.exception(error)
raise error
except exc.HTTPBadRequest as error:
LOG.exception(error)
raise error
except exc.HTTPServerError as error:
LOG.exception(error)
raise error
except Exception as error:
LOG.exception(error)
raise exc.HTTPServerError(explanation=str(error))
def index(self, req):
"""Returns a summary list of trust."""
get_hidden = False
is_cloud_admin = False
if "QUERY_STRING" in req.environ:
var = parse_qs(req.environ["QUERY_STRING"])
get_hidden = var.get("get_hidden", [""])[0]
is_cloud_admin = var.get("is_cloud_admin", [""])[0]
get_hidden = escape(get_hidden)
if get_hidden.lower() == "true":
get_hidden = json.loads(get_hidden.lower())
if is_cloud_admin.lower() == "true":
is_cloud_admin = json.loads(is_cloud_admin.lower())
try:
return self._get_trust(req, get_hidden=get_hidden, is_cloud_admin=is_cloud_admin)
except exc.HTTPNotFound as error:
LOG.exception(error)
raise error
except exc.HTTPBadRequest as error:
LOG.exception(error)
raise error
except exc.HTTPServerError as error:
LOG.exception(error)
raise error
except Exception as error:
LOG.exception(error)
raise exc.HTTPServerError(explanation=str(error))
def show(self, req, name):
"""Return data about the given setting."""
try:
context = req.environ["workloadmgr.context"]
get_hidden = False
try:
trust = self.workload_api.trust_get(context, name)
except wlm_exceptions.NotFound:
raise exc.HTTPNotFound()
return {"trust": trust}
except exc.HTTPNotFound as error:
LOG.exception(error)
raise error
except exc.HTTPBadRequest as error:
LOG.exception(error)
raise error
except exc.HTTPServerError as error:
LOG.exception(error)
raise error
except Exception as error:
LOG.exception(error)
raise exc.HTTPServerError(explanation=str(error))
def validate_scheduler_trust(self, req, workload_id):
""" Verify trust is present with given workload's scheduler
and validate that the trust is valid by generating token """
try:
context = req.environ["workloadmgr.context"]
get_hidden = False
res = {"scheduler_enabled": False, "trust": None, "is_valid": False}
try:
validated_res = self.workload_api.validate_scheduler_trust(
context, workload_id
)
res.update(validated_res)
return res
except wlm_exceptions.NotFound:
raise exc.HTTPNotFound()
return res
except exc.HTTPNotFound as error:
LOG.exception(error)
raise error
except exc.HTTPBadRequest as error:
LOG.exception(error)
raise error
except exc.HTTPServerError as error:
LOG.exception(error)
raise error
except Exception as error:
LOG.exception(error)
raise exc.HTTPServerError(explanation=unicode(error))
def _get_trust(self, req, **kwargs):
"""Returns a list of trust"""
context = req.environ["workloadmgr.context"]
trust = self.workload_api.trust_list(
context,
get_hidden=kwargs.get('get_hidden', False),
is_cloud_admin=kwargs.get('is_cloud_admin', False)
)
return {"trust": trust}
def create_resource(ext_mgr):
return wsgi.Resource(TrustController(ext_mgr))