Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# Copyright 2018 TrilioData Inc.
# All Rights Reserved.

"""
Request Body validating middleware.

"""

import functools
import re

from dmapi.api.openstack import api_version_request as api_version
from dmapi.api.validation import validators


def _schema_validation_helper(schema, target, min_version, max_version,
                              args, kwargs, is_body=True):
    """A helper method to execute JSON-Schema Validation.

    This method checks the request version whether matches the specified max
    version and min_version. It also takes a care of legacy v2 request.

    If the version range matches the request, we validate the schema against
    the target and a failure will result in a ValidationError being raised.

    :param schema: A dict, the JSON-Schema is used to validate the target.
    :param target: A dict, the target is validated by the JSON-Schema.
    :param min_version: A string of two numerals. X.Y indicating the minimum
                        version of the JSON-Schema to validate against.
    :param max_version: A string of two numerals. X.Y indicating the maximum
                        version of the JSON-Schema to validate against.
    :param args: Positional arguments which passed into original method.
    :param kwargs: Keyword arguments which passed into original method.
    :param is_body: A boolean. Indicating whether the target is HTTP request
                    body or not.
    :returns: A boolean. `True` if and only if the version range matches the
              request AND the schema is successfully validated. `False` if the
              version range does not match the request and no validation is
              performed.
    :raises: ValidationError, when the validation fails.
    """
    min_ver = api_version.APIVersionRequest(min_version)
    max_ver = api_version.APIVersionRequest(max_version)

    # The request object is always the second argument.
    # However numerous unittests pass in the request object
    # via kwargs instead so we handle that as well.
    # TODO(cyeoh): cleanup unittests so we don't have to
    # to do this
    if 'req' in kwargs:
        ver = kwargs['req'].api_version_request
        legacy_v2 = kwargs['req'].is_legacy_v2()
    else:
        ver = args[1].api_version_request
        legacy_v2 = args[1].is_legacy_v2()

    if legacy_v2:
        # NOTE: For v2.0 compatible API, here should work like
        #    client  | schema min_version | schema
        # -----------+--------------------+--------
        #  legacy_v2 | None               | work
        #  legacy_v2 | 2.0                | work
        #  legacy_v2 | 2.1+               | don't
        if min_version is None or min_version == '2.0':
            schema_validator = validators._SchemaValidator(
                schema, legacy_v2, is_body)
            schema_validator.validate(target)
            return True
    elif ver.matches(min_ver, max_ver):
        # Only validate against the schema if it lies within
        # the version range specified. Note that if both min
        # and max are not specified the validator will always
        # be run.
        schema_validator = validators._SchemaValidator(
            schema, legacy_v2, is_body)
        schema_validator.validate(target)
        return True

    return False


def schema(request_body_schema, min_version=None, max_version=None):
    """Register a schema to validate request body.

    Registered schema will be used for validating request body just before
    API method executing.

    :argument dict request_body_schema: a schema to validate request body

    """

    def add_validator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            _schema_validation_helper(request_body_schema, kwargs['body'],
                                      min_version, max_version,
                                      args, kwargs)
            return func(*args, **kwargs)
        return wrapper

    return add_validator


def _strip_additional_query_parameters(schema, req):
    """Strip the additional properties from the req.GET.

    This helper method assumes the JSON-Schema is only described as a dict
    without nesting. This method should be called after query parameters pass
    the JSON-Schema validation. It also means this method only can be called
    after _schema_validation_helper return `True`.
    """
    additional_properties = schema.get('additionalProperties', True)
    pattern_regexes = []

    patterns = schema.get('patternProperties', None)
    if patterns:
        for regex in patterns:
            pattern_regexes.append(re.compile(regex))

    if additional_properties:
        # `req.GET.keys` will return duplicated keys for multiple values
        # parameters. To get rid of duplicated keys, convert it to set.
        for param in set(req.GET.keys()):
            if param not in list(schema['properties'].keys()):
                # keys that can match the patternProperties will be
                # retained and handle latter.
                if not (list(regex for regex in pattern_regexes if
                             regex.match(param))):
                    del req.GET[param]


def query_schema(query_params_schema, min_version=None,
                 max_version=None):
    """Register a schema to validate request query parameters.

    Registered schema will be used for validating request query params just
    before API method executing.

    :param query_params_schema: A dict, the JSON-Schema for validating the
                                query parameters.
    :param min_version: A string of two numerals. X.Y indicating the minimum
                        version of the JSON-Schema to validate against.
    :param max_version: A string of two numerals. X.Y indicating the maximum
                        version of the JSON-Schema against to.
    """

    def add_validator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # The request object is always the second argument.
            # However numerous unittests pass in the request object
            # via kwargs instead so we handle that as well.
            # TODO(cyeoh): cleanup unittests so we don't have to
            # to do this
            if 'req' in kwargs:
                req = kwargs['req']
            else:
                req = args[1]

            if _schema_validation_helper(query_params_schema,
                                         req.GET.dict_of_lists(),
                                         min_version, max_version,
                                         args, kwargs, is_body=False):
                # NOTE(alex_xu): The additional query parameters were stripped
                # out when `additionalProperties=True`. This is for backward
                # compatible with v2.1 API and legacy v2 API. But it makes the
                # system more safe for no more unexpected parameters pass down
                # to the system. In the future, we may block all of those
                # additional parameters by Microversion.
                _strip_additional_query_parameters(query_params_schema, req)
            return func(*args, **kwargs)
        return wrapper

    return add_validator