Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
pyramid / pyramid / viewderivers.py
Size: Mime:
import inspect

from zope.interface import (
    implementer,
    provider,
    )

from pyramid.security import NO_PERMISSION_REQUIRED
from pyramid.csrf import (
    check_csrf_origin,
    check_csrf_token,
)
from pyramid.response import Response

from pyramid.interfaces import (
    IAuthenticationPolicy,
    IAuthorizationPolicy,
    IDefaultCSRFOptions,
    IDefaultPermission,
    IDebugLogger,
    IResponse,
    IViewMapper,
    IViewMapperFactory,
    )

from pyramid.compat import (
    is_bound_method,
    is_unbound_method,
    )

from pyramid.config.util import (
    DEFAULT_PHASH,
    MAX_ORDER,
    takes_one_arg,
    )

from pyramid.exceptions import (
    ConfigurationError,
    PredicateMismatch,
    )
from pyramid.httpexceptions import HTTPForbidden
from pyramid.util import object_description
from pyramid.view import render_view_to_response
from pyramid import renderers


def view_description(view):
    try:
        return view.__text__
    except AttributeError:
        # custom view mappers might not add __text__
        return object_description(view)

def requestonly(view, attr=None):
    return takes_one_arg(view, attr=attr, argname='request')

@implementer(IViewMapper)
@provider(IViewMapperFactory)
class DefaultViewMapper(object):
    def __init__(self, **kw):
        self.attr = kw.get('attr')

    def __call__(self, view):
        if is_unbound_method(view) and self.attr is None:
            raise ConfigurationError((
                'Unbound method calls are not supported, please set the class '
                'as your `view` and the method as your `attr`'
            ))

        if inspect.isclass(view):
            view = self.map_class(view)
        else:
            view = self.map_nonclass(view)
        return view

    def map_class(self, view):
        ronly = requestonly(view, self.attr)
        if ronly:
            mapped_view = self.map_class_requestonly(view)
        else:
            mapped_view = self.map_class_native(view)
        mapped_view.__text__ = 'method %s of %s' % (
            self.attr or '__call__', object_description(view))
        return mapped_view

    def map_nonclass(self, view):
        # We do more work here than appears necessary to avoid wrapping the
        # view unless it actually requires wrapping (to avoid function call
        # overhead).
        mapped_view = view
        ronly = requestonly(view, self.attr)
        if ronly:
            mapped_view = self.map_nonclass_requestonly(view)
        elif self.attr:
            mapped_view = self.map_nonclass_attr(view)
        if inspect.isroutine(mapped_view):
            # This branch will be true if the view is a function or a method.
            # We potentially mutate an unwrapped object here if it's a
            # function.  We do this to avoid function call overhead of
            # injecting another wrapper.  However, we must wrap if the
            # function is a bound method because we can't set attributes on a
            # bound method.
            if is_bound_method(view):
                _mapped_view = mapped_view
                def mapped_view(context, request):
                    return _mapped_view(context, request)
            if self.attr is not None:
                mapped_view.__text__ = 'attr %s of %s' % (
                    self.attr, object_description(view))
            else:
                mapped_view.__text__ = object_description(view)
        return mapped_view

    def map_class_requestonly(self, view):
        # its a class that has an __init__ which only accepts request
        attr = self.attr
        def _class_requestonly_view(context, request):
            inst = view(request)
            request.__view__ = inst
            if attr is None:
                response = inst()
            else:
                response = getattr(inst, attr)()
            return response
        return _class_requestonly_view

    def map_class_native(self, view):
        # its a class that has an __init__ which accepts both context and
        # request
        attr = self.attr
        def _class_view(context, request):
            inst = view(context, request)
            request.__view__ = inst
            if attr is None:
                response = inst()
            else:
                response = getattr(inst, attr)()
            return response
        return _class_view

    def map_nonclass_requestonly(self, view):
        # its a function that has a __call__ which accepts only a single
        # request argument
        attr = self.attr
        def _requestonly_view(context, request):
            if attr is None:
                response = view(request)
            else:
                response = getattr(view, attr)(request)
            return response
        return _requestonly_view

    def map_nonclass_attr(self, view):
        # its a function that has a __call__ which accepts both context and
        # request, but still has an attr
        def _attr_view(context, request):
            response = getattr(view, self.attr)(context, request)
            return response
        return _attr_view


def wraps_view(wrapper):
    def inner(view, info):
        wrapper_view = wrapper(view, info)
        return preserve_view_attrs(view, wrapper_view)
    return inner

def preserve_view_attrs(view, wrapper):
    if view is None:
        return wrapper

    if wrapper is view:
        return view

    original_view = getattr(view, '__original_view__', None)

    if original_view is None:
        original_view = view

    wrapper.__wraps__ = view
    wrapper.__original_view__ = original_view
    wrapper.__module__ = view.__module__
    wrapper.__doc__ = view.__doc__

    try:
        wrapper.__name__ = view.__name__
    except AttributeError:
        wrapper.__name__ = repr(view)

    # attrs that may not exist on "view", but, if so, must be attached to
    # "wrapped view"
    for attr in ('__permitted__', '__call_permissive__', '__permission__',
                 '__predicated__', '__predicates__', '__accept__',
                 '__order__', '__text__'):
        try:
            setattr(wrapper, attr, getattr(view, attr))
        except AttributeError:
            pass

    return wrapper

def mapped_view(view, info):
    mapper = info.options.get('mapper')
    if mapper is None:
        mapper = getattr(view, '__view_mapper__', None)
        if mapper is None:
            mapper = info.registry.queryUtility(IViewMapperFactory)
            if mapper is None:
                mapper = DefaultViewMapper

    mapped_view = mapper(**info.options)(view)
    return mapped_view

mapped_view.options = ('mapper', 'attr')

def owrapped_view(view, info):
    wrapper_viewname = info.options.get('wrapper')
    viewname = info.options.get('name')
    if not wrapper_viewname:
        return view
    def _owrapped_view(context, request):
        response = view(context, request)
        request.wrapped_response = response
        request.wrapped_body = response.body
        request.wrapped_view = view
        wrapped_response = render_view_to_response(context, request,
                                                   wrapper_viewname)
        if wrapped_response is None:
            raise ValueError(
                'No wrapper view named %r found when executing view '
                'named %r' % (wrapper_viewname, viewname))
        return wrapped_response
    return _owrapped_view

owrapped_view.options = ('name', 'wrapper')

def http_cached_view(view, info):
    if info.settings.get('prevent_http_cache', False):
        return view

    seconds = info.options.get('http_cache')

    if seconds is None:
        return view

    options = {}

    if isinstance(seconds, (tuple, list)):
        try:
            seconds, options = seconds
        except ValueError:
            raise ConfigurationError(
                'If http_cache parameter is a tuple or list, it must be '
                'in the form (seconds, options); not %s' % (seconds,))

    def wrapper(context, request):
        response = view(context, request)
        prevent_caching = getattr(response.cache_control, 'prevent_auto',
                                  False)
        if not prevent_caching:
            response.cache_expires(seconds, **options)
        return response

    return wrapper

http_cached_view.options = ('http_cache',)

def secured_view(view, info):
    for wrapper in (_secured_view, _authdebug_view):
        view = wraps_view(wrapper)(view, info)
    return view

secured_view.options = ('permission',)

def _secured_view(view, info):
    permission = explicit_val = info.options.get('permission')
    if permission is None:
        permission = info.registry.queryUtility(IDefaultPermission)
    if permission == NO_PERMISSION_REQUIRED:
        # allow views registered within configurations that have a
        # default permission to explicitly override the default
        # permission, replacing it with no permission at all
        permission = None

    wrapped_view = view
    authn_policy = info.registry.queryUtility(IAuthenticationPolicy)
    authz_policy = info.registry.queryUtility(IAuthorizationPolicy)

    # no-op on exception-only views without an explicit permission
    if explicit_val is None and info.exception_only:
        return view

    if authn_policy and authz_policy and (permission is not None):
        def permitted(context, request):
            principals = authn_policy.effective_principals(request)
            return authz_policy.permits(context, principals, permission)
        def secured_view(context, request):
            result = permitted(context, request)
            if result:
                return view(context, request)
            view_name = getattr(view, '__name__', view)
            msg = getattr(
                request, 'authdebug_message',
                'Unauthorized: %s failed permission check' % view_name)
            raise HTTPForbidden(msg, result=result)
        wrapped_view = secured_view
        wrapped_view.__call_permissive__ = view
        wrapped_view.__permitted__ = permitted
        wrapped_view.__permission__ = permission

    return wrapped_view

def _authdebug_view(view, info):
    wrapped_view = view
    settings = info.settings
    permission = explicit_val = info.options.get('permission')
    if permission is None:
        permission = info.registry.queryUtility(IDefaultPermission)
    authn_policy = info.registry.queryUtility(IAuthenticationPolicy)
    authz_policy = info.registry.queryUtility(IAuthorizationPolicy)
    logger = info.registry.queryUtility(IDebugLogger)

    # no-op on exception-only views without an explicit permission
    if explicit_val is None and info.exception_only:
        return view

    if settings and settings.get('debug_authorization', False):
        def authdebug_view(context, request):
            view_name = getattr(request, 'view_name', None)

            if authn_policy and authz_policy:
                if permission is NO_PERMISSION_REQUIRED:
                    msg = 'Allowed (NO_PERMISSION_REQUIRED)'
                elif permission is None:
                    msg = 'Allowed (no permission registered)'
                else:
                    principals = authn_policy.effective_principals(request)
                    msg = str(authz_policy.permits(
                        context, principals, permission))
            else:
                msg = 'Allowed (no authorization policy in use)'

            view_name = getattr(request, 'view_name', None)
            url = getattr(request, 'url', None)
            msg = ('debug_authorization of url %s (view name %r against '
                   'context %r): %s' % (url, view_name, context, msg))
            if logger:
                logger.debug(msg)
            if request is not None:
                request.authdebug_message = msg
            return view(context, request)
        wrapped_view = authdebug_view

    return wrapped_view

def predicated_view(view, info):
    preds = info.predicates
    if not preds:
        return view
    def predicate_wrapper(context, request):
        for predicate in preds:
            if not predicate(context, request):
                view_name = getattr(view, '__name__', view)
                raise PredicateMismatch(
                    'predicate mismatch for view %s (%s)' % (
                        view_name, predicate.text()))
        return view(context, request)
    def checker(context, request):
        return all((predicate(context, request) for predicate in
                    preds))
    predicate_wrapper.__predicated__ = checker
    predicate_wrapper.__predicates__ = preds
    return predicate_wrapper

def attr_wrapped_view(view, info):
    accept, order, phash = (info.options.get('accept', None),
                            getattr(info, 'order', MAX_ORDER),
                            getattr(info, 'phash', DEFAULT_PHASH))
    # this is a little silly but we don't want to decorate the original
    # function with attributes that indicate accept, order, and phash,
    # so we use a wrapper
    if (
        (accept is None) and
        (order == MAX_ORDER) and
        (phash == DEFAULT_PHASH)
    ):
        return view # defaults
    def attr_view(context, request):
        return view(context, request)
    attr_view.__accept__ = accept
    attr_view.__order__ = order
    attr_view.__phash__ = phash
    attr_view.__view_attr__ = info.options.get('attr')
    attr_view.__permission__ = info.options.get('permission')
    return attr_view

attr_wrapped_view.options = ('accept', 'attr', 'permission')

def rendered_view(view, info):
    # one way or another this wrapper must produce a Response (unless
    # the renderer is a NullRendererHelper)
    renderer = info.options.get('renderer')
    if renderer is None:
        # register a default renderer if you want super-dynamic
        # rendering.  registering a default renderer will also allow
        # override_renderer to work if a renderer is left unspecified for
        # a view registration.
        def viewresult_to_response(context, request):
            result = view(context, request)
            if result.__class__ is Response: # common case
                response = result
            else:
                response = info.registry.queryAdapterOrSelf(result, IResponse)
                if response is None:
                    if result is None:
                        append = (' You may have forgotten to return a value '
                                  'from the view callable.')
                    elif isinstance(result, dict):
                        append = (' You may have forgotten to define a '
                                  'renderer in the view configuration.')
                    else:
                        append = ''

                    msg = ('Could not convert return value of the view '
                           'callable %s into a response object. '
                           'The value returned was %r.' + append)

                    raise ValueError(msg % (view_description(view), result))

            return response

        return viewresult_to_response

    if renderer is renderers.null_renderer:
        return view

    def rendered_view(context, request):
        result = view(context, request)
        if result.__class__ is Response: # potential common case
            response = result
        else:
            # this must adapt, it can't do a simple interface check
            # (avoid trying to render webob responses)
            response = info.registry.queryAdapterOrSelf(result, IResponse)
            if response is None:
                attrs = getattr(request, '__dict__', {})
                if 'override_renderer' in attrs:
                    # renderer overridden by newrequest event or other
                    renderer_name = attrs.pop('override_renderer')
                    view_renderer = renderers.RendererHelper(
                        name=renderer_name,
                        package=info.package,
                        registry=info.registry)
                else:
                    view_renderer = renderer.clone()
                if '__view__' in attrs:
                    view_inst = attrs.pop('__view__')
                else:
                    view_inst = getattr(view, '__original_view__', view)
                response = view_renderer.render_view(
                    request, result, view_inst, context)
        return response

    return rendered_view

rendered_view.options = ('renderer',)

def decorated_view(view, info):
    decorator = info.options.get('decorator')
    if decorator is None:
        return view
    return decorator(view)

decorated_view.options = ('decorator',)

def csrf_view(view, info):
    explicit_val = info.options.get('require_csrf')
    defaults = info.registry.queryUtility(IDefaultCSRFOptions)
    if defaults is None:
        default_val = False
        token = 'csrf_token'
        header = 'X-CSRF-Token'
        safe_methods = frozenset(["GET", "HEAD", "OPTIONS", "TRACE"])
        callback = None
    else:
        default_val = defaults.require_csrf
        token = defaults.token
        header = defaults.header
        safe_methods = defaults.safe_methods
        callback = defaults.callback

    enabled = (
        explicit_val is True or
        # fallback to the default val if not explicitly enabled
        # but only if the view is not an exception view
        (
            explicit_val is not False and default_val and
            not info.exception_only
        )
    )
    # disable if both header and token are disabled
    enabled = enabled and (token or header)
    wrapped_view = view
    if enabled:
        def csrf_view(context, request):
            if (
                request.method not in safe_methods and
                (callback is None or callback(request))
            ):
                check_csrf_origin(request, raises=True)
                check_csrf_token(request, token, header, raises=True)
            return view(context, request)
        wrapped_view = csrf_view
    return wrapped_view

csrf_view.options = ('require_csrf',)

VIEW = 'VIEW'
INGRESS = 'INGRESS'