Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aroundthecode / voluptuous   python

Repository URL to install this package:

Version: 0.11.5 

/ schema_builder.py

import collections
import inspect
import re
from functools import wraps
import sys
from contextlib import contextmanager

import itertools
from voluptuous import error as er

if sys.version_info >= (3,):
    long = int
    unicode = str
    basestring = str
    ifilter = filter

    def iteritems(d):
        return d.items()
else:
    from itertools import ifilter

    def iteritems(d):
        return d.iteritems()

"""Schema validation for Python data structures.

Given eg. a nested data structure like this:

    {
        'exclude': ['Users', 'Uptime'],
        'include': [],
        'set': {
            'snmp_community': 'public',
            'snmp_timeout': 15,
            'snmp_version': '2c',
        },
        'targets': {
            'localhost': {
                'exclude': ['Uptime'],
                'features': {
                    'Uptime': {
                        'retries': 3,
                    },
                    'Users': {
                        'snmp_community': 'monkey',
                        'snmp_port': 15,
                    },
                },
                'include': ['Users'],
                'set': {
                    'snmp_community': 'monkeys',
                },
            },
        },
    }

A schema like this:

    >>> settings = {
    ...   'snmp_community': str,
    ...   'retries': int,
    ...   'snmp_version': All(Coerce(str), Any('3', '2c', '1')),
    ... }
    >>> features = ['Ping', 'Uptime', 'Http']
    >>> schema = Schema({
    ...    'exclude': features,
    ...    'include': features,
    ...    'set': settings,
    ...    'targets': {
    ...      'exclude': features,
    ...      'include': features,
    ...      'features': {
    ...        str: settings,
    ...      },
    ...    },
    ... })

Validate like so:

    >>> schema({
    ...   'set': {
    ...     'snmp_community': 'public',
    ...     'snmp_version': '2c',
    ...   },
    ...   'targets': {
    ...     'exclude': ['Ping'],
    ...     'features': {
    ...       'Uptime': {'retries': 3},
    ...       'Users': {'snmp_community': 'monkey'},
    ...     },
    ...   },
    ... }) == {
    ...   'set': {'snmp_version': '2c', 'snmp_community': 'public'},
    ...   'targets': {
    ...     'exclude': ['Ping'],
    ...     'features': {'Uptime': {'retries': 3},
    ...                  'Users': {'snmp_community': 'monkey'}}}}
    True
"""

# options for extra keys
PREVENT_EXTRA = 0  # any extra key not in schema will raise an error
ALLOW_EXTRA = 1  # extra keys not in schema will be included in output
REMOVE_EXTRA = 2  # extra keys not in schema will be excluded from output


def _isnamedtuple(obj):
    return isinstance(obj, tuple) and hasattr(obj, '_fields')


primitive_types = (str, unicode, bool, int, float)


class Undefined(object):
    def __nonzero__(self):
        return False

    def __repr__(self):
        return '...'


UNDEFINED = Undefined()


def Self():
    raise er.SchemaError('"Self" should never be called')


def default_factory(value):
    if value is UNDEFINED or callable(value):
        return value
    return lambda: value


@contextmanager
def raises(exc, msg=None, regex=None):
    try:
        yield
    except exc as e:
        if msg is not None:
            assert str(e) == msg, '%r != %r' % (str(e), msg)
        if regex is not None:
            assert re.search(regex, str(e)), '%r does not match %r' % (str(e), regex)


def Extra(_):
    """Allow keys in the data that are not present in the schema."""
    raise er.SchemaError('"Extra" should never be called')


# As extra() is never called there's no way to catch references to the
# deprecated object, so we just leave an alias here instead.
extra = Extra


class Schema(object):
    """A validation schema.

    The schema is a Python tree-like structure where nodes are pattern
    matched against corresponding trees of values.

    Nodes can be values, in which case a direct comparison is used, types,
    in which case an isinstance() check is performed, or callables, which will
    validate and optionally convert the value.

    We can equate schemas also.

    For Example:

            >>> v = Schema({Required('a'): unicode})
            >>> v1 = Schema({Required('a'): unicode})
            >>> v2 = Schema({Required('b'): unicode})
            >>> assert v == v1
            >>> assert v != v2

    """

    _extra_to_name = {
        REMOVE_EXTRA: 'REMOVE_EXTRA',
        ALLOW_EXTRA: 'ALLOW_EXTRA',
        PREVENT_EXTRA: 'PREVENT_EXTRA',
    }

    def __init__(self, schema, required=False, extra=PREVENT_EXTRA):
        """Create a new Schema.

        :param schema: Validation schema. See :module:`voluptuous` for details.
        :param required: Keys defined in the schema must be in the data.
        :param extra: Specify how extra keys in the data are treated:
            - :const:`~voluptuous.PREVENT_EXTRA`: to disallow any undefined
              extra keys (raise ``Invalid``).
            - :const:`~voluptuous.ALLOW_EXTRA`: to include undefined extra
              keys in the output.
            - :const:`~voluptuous.REMOVE_EXTRA`: to exclude undefined extra keys
              from the output.
            - Any value other than the above defaults to
              :const:`~voluptuous.PREVENT_EXTRA`
        """
        self.schema = schema
        self.required = required
        self.extra = int(extra)  # ensure the value is an integer
        self._compiled = self._compile(schema)

    @classmethod
    def infer(cls, data, **kwargs):
        """Create a Schema from concrete data (e.g. an API response).

        For example, this will take a dict like:

        {
            'foo': 1,
            'bar': {
                'a': True,
                'b': False
            },
            'baz': ['purple', 'monkey', 'dishwasher']
        }

        And return a Schema:

        {
            'foo': int,
            'bar': {
                'a': bool,
                'b': bool
            },
            'baz': [str]
        }

        Note: only very basic inference is supported.
        """
        def value_to_schema_type(value):
            if isinstance(value, dict):
                if len(value) == 0:
                    return dict
                return {k: value_to_schema_type(v)
                        for k, v in iteritems(value)}
            if isinstance(value, list):
                if len(value) == 0:
                    return list
                else:
                    return [value_to_schema_type(v)
                            for v in value]
            return type(value)

        return cls(value_to_schema_type(data), **kwargs)

    def __eq__(self, other):
        if not isinstance(other, Schema):
            return False
        return other.schema == self.schema

    def __ne__(self, other):
        return not (self == other)

    def __str__(self):
        return str(self.schema)

    def __repr__(self):
        return "<Schema(%s, extra=%s, required=%s) object at 0x%x>" % (
            self.schema, self._extra_to_name.get(self.extra, '??'),
            self.required, id(self))

    def __call__(self, data):
        """Validate data against this schema."""
        try:
            return self._compiled([], data)
        except er.MultipleInvalid:
            raise
        except er.Invalid as e:
            raise er.MultipleInvalid([e])
            # return self.validate([], self.schema, data)

    def _compile(self, schema):
        if schema is Extra:
            return lambda _, v: v
        if schema is Self:
            return lambda p, v: self._compiled(p, v)
        elif hasattr(schema, "__voluptuous_compile__"):
            return schema.__voluptuous_compile__(self)
        if isinstance(schema, Object):
            return self._compile_object(schema)
        if isinstance(schema, collections.Mapping):
            return self._compile_dict(schema)
        elif isinstance(schema, list):
            return self._compile_list(schema)
        elif isinstance(schema, tuple):
            return self._compile_tuple(schema)
        elif isinstance(schema, (frozenset, set)):
            return self._compile_set(schema)
        type_ = type(schema)
        if inspect.isclass(schema):
            type_ = schema
        if type_ in (bool, bytes, int, long, str, unicode, float, complex, object,
                     list, dict, type(None)) or callable(schema):
            return _compile_scalar(schema)
        raise er.SchemaError('unsupported schema data type %r' %
                             type(schema).__name__)

    def _compile_mapping(self, schema, invalid_msg=None):
        """Create validator for given mapping."""
        invalid_msg = invalid_msg or 'mapping value'

        # Keys that may be required
        all_required_keys = set(key for key in schema
                                if key is not Extra and
                                ((self.required and not isinstance(key, (Optional, Remove))) or
                                 isinstance(key, Required)))

        # Keys that may have defaults
        all_default_keys = set(key for key in schema
                               if isinstance(key, Required) or
                               isinstance(key, Optional))

        _compiled_schema = {}
        for skey, svalue in iteritems(schema):
            new_key = self._compile(skey)
            new_value = self._compile(svalue)
            _compiled_schema[skey] = (new_key, new_value)

        candidates = list(_iterate_mapping_candidates(_compiled_schema))

        # After we have the list of candidates in the correct order, we want to apply some optimization so that each
        # key in the data being validated will be matched against the relevant schema keys only.
        # No point in matching against different keys
        additional_candidates = []
        candidates_by_key = {}
        for skey, (ckey, cvalue) in candidates:
            if type(skey) in primitive_types:
                candidates_by_key.setdefault(skey, []).append((skey, (ckey, cvalue)))
            elif isinstance(skey, Marker) and type(skey.schema) in primitive_types:
                candidates_by_key.setdefault(skey.schema, []).append((skey, (ckey, cvalue)))
            else:
                # These are wildcards such as 'int', 'str', 'Remove' and others which should be applied to all keys
                additional_candidates.append((skey, (ckey, cvalue)))

        def validate_mapping(path, iterable, out):
            required_keys = all_required_keys.copy()

            # Build a map of all provided key-value pairs.
            # The type(out) is used to retain ordering in case a ordered
            # map type is provided as input.
            key_value_map = type(out)()
            for key, value in iterable:
                key_value_map[key] = value
Loading ...