import collections
import inspect
import re
from functools import wraps
import sys
from contextlib import contextmanager
import itertools
from voluptuous import error as er
if sys.version_info >= (3,):
long = int
unicode = str
basestring = str
ifilter = filter
def iteritems(d):
return d.items()
else:
from itertools import ifilter
def iteritems(d):
return d.iteritems()
"""Schema validation for Python data structures.
Given eg. a nested data structure like this:
{
'exclude': ['Users', 'Uptime'],
'include': [],
'set': {
'snmp_community': 'public',
'snmp_timeout': 15,
'snmp_version': '2c',
},
'targets': {
'localhost': {
'exclude': ['Uptime'],
'features': {
'Uptime': {
'retries': 3,
},
'Users': {
'snmp_community': 'monkey',
'snmp_port': 15,
},
},
'include': ['Users'],
'set': {
'snmp_community': 'monkeys',
},
},
},
}
A schema like this:
>>> settings = {
... 'snmp_community': str,
... 'retries': int,
... 'snmp_version': All(Coerce(str), Any('3', '2c', '1')),
... }
>>> features = ['Ping', 'Uptime', 'Http']
>>> schema = Schema({
... 'exclude': features,
... 'include': features,
... 'set': settings,
... 'targets': {
... 'exclude': features,
... 'include': features,
... 'features': {
... str: settings,
... },
... },
... })
Validate like so:
>>> schema({
... 'set': {
... 'snmp_community': 'public',
... 'snmp_version': '2c',
... },
... 'targets': {
... 'exclude': ['Ping'],
... 'features': {
... 'Uptime': {'retries': 3},
... 'Users': {'snmp_community': 'monkey'},
... },
... },
... }) == {
... 'set': {'snmp_version': '2c', 'snmp_community': 'public'},
... 'targets': {
... 'exclude': ['Ping'],
... 'features': {'Uptime': {'retries': 3},
... 'Users': {'snmp_community': 'monkey'}}}}
True
"""
# options for extra keys
PREVENT_EXTRA = 0 # any extra key not in schema will raise an error
ALLOW_EXTRA = 1 # extra keys not in schema will be included in output
REMOVE_EXTRA = 2 # extra keys not in schema will be excluded from output
def _isnamedtuple(obj):
return isinstance(obj, tuple) and hasattr(obj, '_fields')
primitive_types = (str, unicode, bool, int, float)
class Undefined(object):
def __nonzero__(self):
return False
def __repr__(self):
return '...'
UNDEFINED = Undefined()
def Self():
raise er.SchemaError('"Self" should never be called')
def default_factory(value):
if value is UNDEFINED or callable(value):
return value
return lambda: value
@contextmanager
def raises(exc, msg=None, regex=None):
try:
yield
except exc as e:
if msg is not None:
assert str(e) == msg, '%r != %r' % (str(e), msg)
if regex is not None:
assert re.search(regex, str(e)), '%r does not match %r' % (str(e), regex)
def Extra(_):
"""Allow keys in the data that are not present in the schema."""
raise er.SchemaError('"Extra" should never be called')
# As extra() is never called there's no way to catch references to the
# deprecated object, so we just leave an alias here instead.
extra = Extra
class Schema(object):
"""A validation schema.
The schema is a Python tree-like structure where nodes are pattern
matched against corresponding trees of values.
Nodes can be values, in which case a direct comparison is used, types,
in which case an isinstance() check is performed, or callables, which will
validate and optionally convert the value.
We can equate schemas also.
For Example:
>>> v = Schema({Required('a'): unicode})
>>> v1 = Schema({Required('a'): unicode})
>>> v2 = Schema({Required('b'): unicode})
>>> assert v == v1
>>> assert v != v2
"""
_extra_to_name = {
REMOVE_EXTRA: 'REMOVE_EXTRA',
ALLOW_EXTRA: 'ALLOW_EXTRA',
PREVENT_EXTRA: 'PREVENT_EXTRA',
}
def __init__(self, schema, required=False, extra=PREVENT_EXTRA):
"""Create a new Schema.
:param schema: Validation schema. See :module:`voluptuous` for details.
:param required: Keys defined in the schema must be in the data.
:param extra: Specify how extra keys in the data are treated:
- :const:`~voluptuous.PREVENT_EXTRA`: to disallow any undefined
extra keys (raise ``Invalid``).
- :const:`~voluptuous.ALLOW_EXTRA`: to include undefined extra
keys in the output.
- :const:`~voluptuous.REMOVE_EXTRA`: to exclude undefined extra keys
from the output.
- Any value other than the above defaults to
:const:`~voluptuous.PREVENT_EXTRA`
"""
self.schema = schema
self.required = required
self.extra = int(extra) # ensure the value is an integer
self._compiled = self._compile(schema)
@classmethod
def infer(cls, data, **kwargs):
"""Create a Schema from concrete data (e.g. an API response).
For example, this will take a dict like:
{
'foo': 1,
'bar': {
'a': True,
'b': False
},
'baz': ['purple', 'monkey', 'dishwasher']
}
And return a Schema:
{
'foo': int,
'bar': {
'a': bool,
'b': bool
},
'baz': [str]
}
Note: only very basic inference is supported.
"""
def value_to_schema_type(value):
if isinstance(value, dict):
if len(value) == 0:
return dict
return {k: value_to_schema_type(v)
for k, v in iteritems(value)}
if isinstance(value, list):
if len(value) == 0:
return list
else:
return [value_to_schema_type(v)
for v in value]
return type(value)
return cls(value_to_schema_type(data), **kwargs)
def __eq__(self, other):
if not isinstance(other, Schema):
return False
return other.schema == self.schema
def __ne__(self, other):
return not (self == other)
def __str__(self):
return str(self.schema)
def __repr__(self):
return "<Schema(%s, extra=%s, required=%s) object at 0x%x>" % (
self.schema, self._extra_to_name.get(self.extra, '??'),
self.required, id(self))
def __call__(self, data):
"""Validate data against this schema."""
try:
return self._compiled([], data)
except er.MultipleInvalid:
raise
except er.Invalid as e:
raise er.MultipleInvalid([e])
# return self.validate([], self.schema, data)
def _compile(self, schema):
if schema is Extra:
return lambda _, v: v
if schema is Self:
return lambda p, v: self._compiled(p, v)
elif hasattr(schema, "__voluptuous_compile__"):
return schema.__voluptuous_compile__(self)
if isinstance(schema, Object):
return self._compile_object(schema)
if isinstance(schema, collections.Mapping):
return self._compile_dict(schema)
elif isinstance(schema, list):
return self._compile_list(schema)
elif isinstance(schema, tuple):
return self._compile_tuple(schema)
elif isinstance(schema, (frozenset, set)):
return self._compile_set(schema)
type_ = type(schema)
if inspect.isclass(schema):
type_ = schema
if type_ in (bool, bytes, int, long, str, unicode, float, complex, object,
list, dict, type(None)) or callable(schema):
return _compile_scalar(schema)
raise er.SchemaError('unsupported schema data type %r' %
type(schema).__name__)
def _compile_mapping(self, schema, invalid_msg=None):
"""Create validator for given mapping."""
invalid_msg = invalid_msg or 'mapping value'
# Keys that may be required
all_required_keys = set(key for key in schema
if key is not Extra and
((self.required and not isinstance(key, (Optional, Remove))) or
isinstance(key, Required)))
# Keys that may have defaults
all_default_keys = set(key for key in schema
if isinstance(key, Required) or
isinstance(key, Optional))
_compiled_schema = {}
for skey, svalue in iteritems(schema):
new_key = self._compile(skey)
new_value = self._compile(svalue)
_compiled_schema[skey] = (new_key, new_value)
candidates = list(_iterate_mapping_candidates(_compiled_schema))
# After we have the list of candidates in the correct order, we want to apply some optimization so that each
# key in the data being validated will be matched against the relevant schema keys only.
# No point in matching against different keys
additional_candidates = []
candidates_by_key = {}
for skey, (ckey, cvalue) in candidates:
if type(skey) in primitive_types:
candidates_by_key.setdefault(skey, []).append((skey, (ckey, cvalue)))
elif isinstance(skey, Marker) and type(skey.schema) in primitive_types:
candidates_by_key.setdefault(skey.schema, []).append((skey, (ckey, cvalue)))
else:
# These are wildcards such as 'int', 'str', 'Remove' and others which should be applied to all keys
additional_candidates.append((skey, (ckey, cvalue)))
def validate_mapping(path, iterable, out):
required_keys = all_required_keys.copy()
# Build a map of all provided key-value pairs.
# The type(out) is used to retain ordering in case a ordered
# map type is provided as input.
key_value_map = type(out)()
for key, value in iterable:
key_value_map[key] = value
Loading ...