Repository URL to install this package:
Version:
4.2.29-4.2 ▾
|
# Copyright 2018 TrilioData Inc.
# All Rights Reserved.
"""
Common Auth Middleware.
"""
from oslo_log import log as logging
from oslo_log import versionutils
from oslo_serialization import jsonutils
import webob.dec
import webob.exc
import dmapi.conf
from dmapi import context
from dmapi.i18n import _
from dmapi import wsgi
CONF = dmapi.conf.CONF
LOG = logging.getLogger(__name__)
def _load_pipeline(loader, pipeline):
filters = [loader.get_filter(n) for n in pipeline[:-1]]
app = loader.get_app(pipeline[-1])
filters.reverse()
for filter in filters:
app = filter(app)
return app
def pipeline_factory_v2(loader, global_conf, **local_conf):
"""A paste pipeline replica that keys off of auth_strategy."""
return _load_pipeline(loader, local_conf[CONF.api.auth_strategy].split())
class InjectContext(wsgi.Middleware):
"""Add a 'dmapi.context' to WSGI environ."""
def __init__(self, context, *args, **kwargs):
self.context = context
super(InjectContext, self).__init__(*args, **kwargs)
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
req.environ['dmapi.context'] = self.context
return self.application
class DmapiKeystoneContext(wsgi.Middleware):
"""Make a request context from keystone headers."""
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
# Build a context, including the auth_token...
remote_address = req.remote_addr
if CONF.api.use_forwarded_for:
remote_address = req.headers.get('X-Forwarded-For', remote_address)
service_catalog = None
if req.headers.get('X_SERVICE_CATALOG') is not None:
try:
catalog_header = req.headers.get('X_SERVICE_CATALOG')
service_catalog = jsonutils.loads(catalog_header)
except ValueError:
raise webob.exc.HTTPInternalServerError(
_('Invalid service catalog json.'))
# NOTE(jamielennox): This is a full auth plugin set by auth_token
# middleware in newer versions.
user_auth_plugin = req.environ.get('keystone.token_auth')
ctx = context.RequestContext.from_environ(
req.environ,
user_auth_plugin=user_auth_plugin,
remote_address=remote_address,
service_catalog=service_catalog)
if ctx.user_id is None:
LOG.debug("Neither X_USER_ID nor X_USER found in request")
return webob.exc.HTTPUnauthorized()
req.environ['dmapi.context'] = ctx
return self.application