Repository URL to install this package:
|
Version:
4.0.0 ▾
|
mongomock
/
filtering.py
|
|---|
from datetime import datetime
import itertools
import uuid
from .helpers import ObjectId, RE_TYPE
from . import OperationFailure
import numbers
import operator
import re
from sentinels import NOTHING
from six import iteritems, string_types, PY3
try:
from types import NoneType
except ImportError:
NoneType = type(None)
try:
from bson import Regex, DBRef
_RE_TYPES = (RE_TYPE, Regex)
except ImportError:
DBRef = None
_RE_TYPES = (RE_TYPE,)
_TOP_LEVEL_OPERATORS = {'$expr', '$text', '$where', '$jsonSchema'}
_NOT_IMPLEMENTED_OPERATORS = {
'$bitsAllClear',
'$bitsAllSet',
'$bitsAnyClear',
'$bitsAnySet',
'$geoIntersects',
'$geoWithin',
'$maxDistance',
'$minDistance',
'$near',
'$nearSphere',
}
def filter_applies(search_filter, document):
"""Applies given filter
This function implements MongoDB's matching strategy over documents in the find() method
and other related scenarios (like $elemMatch)
"""
return _filterer_inst.apply(search_filter, document)
class _Filterer(object):
"""An object to help applying a filter, using the MongoDB query language."""
# This is populated using register_parse_expression further down.
parse_expression = []
def __init__(self):
self._operator_map = dict({
'$eq': _list_expand(operator_eq),
'$ne': _list_expand(lambda dv, sv: not operator_eq(dv, sv), negative=True),
'$all': self._all_op,
'$in': _in_op,
'$nin': lambda dv, sv: not _in_op(dv, sv),
'$exists': lambda dv, sv: bool(sv) == (dv is not NOTHING),
'$regex': _not_nothing_and(_regex),
'$elemMatch': self._elem_match_op,
'$size': _size_op,
'$type': _type_op
}, **{
key: _not_nothing_and(_list_expand(_compare_objects(op)))
for key, op in iteritems(SORTING_OPERATOR_MAP)
})
def apply(self, search_filter, document):
if not isinstance(search_filter, dict):
raise OperationFailure('the match filter must be an expression in an object')
for key, search in iteritems(search_filter):
# Top level operators.
if key == '$comment':
continue
if key in LOGICAL_OPERATOR_MAP:
if not search:
raise OperationFailure('BadValue $and/$or/$nor must be a nonempty array')
if not LOGICAL_OPERATOR_MAP[key](document, search, self.apply):
return False
continue
if key == '$expr':
parse_expression = self.parse_expression[0]
if not parse_expression(search, document, ignore_missing_keys=True):
return False
continue
if key in _TOP_LEVEL_OPERATORS:
raise NotImplementedError(
'The {} operator is not implemented in mongomock yet'.format(key))
if key.startswith('$'):
raise OperationFailure('unknown top level operator: ' + key)
is_match = False
is_checking_negative_match = \
isinstance(search, dict) and {'$ne', '$nin'} & set(search.keys())
is_checking_positive_match = \
not isinstance(search, dict) or (set(search.keys()) - {'$ne', '$nin'})
has_candidates = False
if search == {'$exists': False} and not iter_key_candidates(key, document):
continue
if isinstance(search, dict) and '$all' in search:
if not self._all_op(iter_key_candidates(key, document), search['$all']):
return False
# if there are no query operators then continue
if len(search) == 1:
continue
for doc_val in iter_key_candidates(key, document):
has_candidates |= doc_val is not NOTHING
is_ops_filter = search and isinstance(search, dict) and \
all(key.startswith('$') for key in search.keys())
if is_ops_filter:
if '$options' in search and '$regex' in search:
search = _combine_regex_options(search)
unknown_operators = set(search) - set(self._operator_map) - {'$not'}
if unknown_operators:
not_implemented_operators = unknown_operators & _NOT_IMPLEMENTED_OPERATORS
if not_implemented_operators:
raise NotImplementedError(
"'%s' is a valid operation but it is not supported by Mongomock "
'yet.' % list(not_implemented_operators)[0])
raise OperationFailure('unknown operator: ' + list(unknown_operators)[0])
is_match = all(
operator_string in self._operator_map
and self._operator_map[operator_string](doc_val, search_val)
or operator_string == '$not'
and self._not_op(document, key, search_val)
for operator_string, search_val in iteritems(search)
) and search
elif isinstance(search, _RE_TYPES) and isinstance(doc_val, (string_types, list)):
is_match = _regex(doc_val, search)
elif key in LOGICAL_OPERATOR_MAP:
if not search:
raise OperationFailure('BadValue $and/$or/$nor must be a nonempty array')
is_match = LOGICAL_OPERATOR_MAP[key](document, search, self.apply)
elif isinstance(doc_val, (list, tuple)):
is_match = (search in doc_val or search == doc_val)
if isinstance(search, ObjectId):
is_match |= (str(search) in doc_val)
else:
is_match = (doc_val == search) or (search is None and doc_val is NOTHING)
# When checking negative match, all the elements should match.
if is_checking_negative_match and not is_match:
return False
# If not checking negative matches, the first match is enouh for this criteria.
if is_match and not is_checking_negative_match:
break
if not is_match and (has_candidates or is_checking_positive_match):
return False
return True
def _not_op(self, d, k, s):
if isinstance(s, dict):
for key in s.keys():
if key not in self._operator_map and key not in LOGICAL_OPERATOR_MAP:
raise OperationFailure('unknown operator: %s' % key)
elif isinstance(s, _RE_TYPES):
pass
else:
raise OperationFailure('$not needs a regex or a document')
return not self.apply({k: s}, d)
def _elem_match_op(self, doc_val, query):
if not isinstance(doc_val, list):
return False
if not isinstance(query, dict):
raise OperationFailure('$elemMatch needs an Object')
for item in doc_val:
try:
if self.apply(query, item):
return True
except OperationFailure:
if self.apply({'field': query}, {'field': item}):
return True
return False
def _all_op(self, doc_val, search_val):
if isinstance(doc_val, list) and doc_val and isinstance(doc_val[0], list):
doc_val = list(itertools.chain.from_iterable(doc_val))
dv = _force_list(doc_val)
matches = []
for x in search_val:
if isinstance(x, dict) and '$elemMatch' in x:
matches.append(self._elem_match_op(doc_val, x['$elemMatch']))
else:
matches.append(x in dv)
return all(matches)
def iter_key_candidates(key, doc):
"""Get possible subdocuments or lists that are referred to by the key in question
Returns the appropriate nested value if the key includes dot notation.
"""
if doc is None:
return ()
if not key:
return [doc]
if isinstance(doc, list):
return _iter_key_candidates_sublist(key, doc)
if not isinstance(doc, dict):
return ()
key_parts = key.split('.')
if len(key_parts) == 1:
return [doc.get(key, NOTHING)]
sub_key = '.'.join(key_parts[1:])
sub_doc = doc.get(key_parts[0], {})
return iter_key_candidates(sub_key, sub_doc)
def _iter_key_candidates_sublist(key, doc):
"""Iterates of candidates
:param doc: a list to be searched for candidates for our key
:param key: the string key to be matched
"""
key_parts = key.split('.')
sub_key = key_parts.pop(0)
key_remainder = '.'.join(key_parts)
try:
sub_key_int = int(sub_key)
except ValueError:
sub_key_int = None
if sub_key_int is None:
# subkey is not an integer...
return [x
for sub_doc in doc
if isinstance(sub_doc, dict) and sub_key in sub_doc
for x in iter_key_candidates(key_remainder, sub_doc[sub_key])]
# subkey is an index
if sub_key_int >= len(doc):
return () # dead end
sub_doc = doc[sub_key_int]
if key_parts:
return iter_key_candidates('.'.join(key_parts), sub_doc)
return [sub_doc]
def _force_list(v):
return v if isinstance(v, (list, tuple)) else [v]
def _in_op(doc_val, search_val):
if not isinstance(search_val, (list, tuple)):
raise OperationFailure('$in needs an array')
if doc_val is NOTHING and None in search_val:
return True
doc_val = _force_list(doc_val)
is_regex_list = [isinstance(x, _RE_TYPES) for x in search_val]
if not any(is_regex_list):
return any(x in search_val for x in doc_val)
for x, is_regex in zip(search_val, is_regex_list):
if (is_regex and _regex(doc_val, x)) or (x in doc_val):
return True
return False
def _not_nothing_and(f):
"""wrap an operator to return False if the first arg is NOTHING"""
return lambda v, l: v is not NOTHING and f(v, l)
def _compare_objects(op):
"""Wrap an operator to also compare objects following BSON comparison.
See https://docs.mongodb.com/manual/reference/bson-type-comparison-order/#objects
"""
def _wrapped(a, b):
# Do not compare uncomparable types, see Type Bracketing:
# https://docs.mongodb.com/manual/reference/method/db.collection.find/#type-bracketing
return bson_compare(op, a, b, can_compare_types=False)
return _wrapped
def bson_compare(op, a, b, can_compare_types=True):
"""Compare two elements using BSON comparison.
Args:
op: the basic operation to compare (e.g. operator.lt, operator.ge).
a: the first operand
b: the second operand
can_compare_types: if True, according to BSON's definition order
between types is used, otherwise always return False when types are
different.
"""
a_type = _get_compare_type(a)
b_type = _get_compare_type(b)
if a_type != b_type:
return can_compare_types and op(a_type, b_type)
# Compare DBRefs as dicts
if type(a).__name__ == 'DBRef' and hasattr(a, 'as_doc'):
a = a.as_doc()
if type(b).__name__ == 'DBRef' and hasattr(b, 'as_doc'):
b = b.as_doc()
if isinstance(a, dict):
# MongoDb server compares the type before comparing the keys
# https://github.com/mongodb/mongo/blob/f10f214/src/mongo/bson/bsonelement.cpp#L516
# even though the documentation does not say anything about that.
a = [(_get_compare_type(v), k, v) for k, v in iteritems(a)]
b = [(_get_compare_type(v), k, v) for k, v in iteritems(b)]
if isinstance(a, (tuple, list)):
for item_a, item_b in zip(a, b):
if item_a != item_b:
return bson_compare(op, item_a, item_b)
return bson_compare(op, len(a), len(b))
if isinstance(a, NoneType):
return op(0, 0)
# bson handles bytes as binary in python3+:
# https://api.mongodb.com/python/current/api/bson/index.html
if PY3 and isinstance(a, bytes):
# Performs the same operation as described by:
# https://docs.mongodb.com/manual/reference/bson-type-comparison-order/#bindata
if len(a) != len(b):
return op(len(a), len(b))
# bytes is always treated as subtype 0 by the bson library
return op(a, b)
def _get_compare_type(val):
"""Get a number representing the base type of the value used for comparison.
See https://docs.mongodb.com/manual/reference/bson-type-comparison-order/
also https://github.com/mongodb/mongo/blob/46b28bb/src/mongo/bson/bsontypes.h#L175
for canonical values.
"""
if isinstance(val, NoneType):
return 5
if isinstance(val, bool):
return 40
if isinstance(val, numbers.Number):
return 10
if isinstance(val, string_types):
return 15
if isinstance(val, dict):
return 20
if isinstance(val, (tuple, list)):
return 25
if isinstance(val, uuid.UUID):
return 30
if isinstance(val, bytes):
assert PY3
return 30
if isinstance(val, ObjectId):
return 35
if isinstance(val, datetime):
return 45
if isinstance(val, _RE_TYPES):
return 50
if DBRef and isinstance(val, DBRef):
# According to the C++ code, this should be 55 but apparently sending a DBRef through
# pymongo is stored as a dict.
return 20
raise NotImplementedError(
"Mongomock does not know how to sort '%s' of type '%s'" %
(val, type(val)))
def _regex(doc_val, regex):
if not (isinstance(doc_val, (string_types, list)) or isinstance(doc_val, RE_TYPE)):
return False
if isinstance(regex, string_types):
regex = re.compile(regex)
if not isinstance(regex, RE_TYPE):
# bson.Regex
regex = regex.try_compile()
return any(
regex.search(item) for item in _force_list(doc_val)
if isinstance(item, string_types))
def _size_op(doc_val, search_val):
if isinstance(doc_val, (list, tuple, dict)):
return search_val == len(doc_val)
return search_val == 1 if doc_val and doc_val is not NOTHING else 0
def _list_expand(f, negative=False):
def func(doc_val, search_val):
if isinstance(doc_val, (list, tuple)) and not isinstance(search_val, (list, tuple)):
if negative:
return all(f(val, search_val) for val in doc_val)
return any(f(val, search_val) for val in doc_val)
return f(doc_val, search_val)
return func
def _type_op(doc_val, search_val):
if search_val not in TYPE_MAP:
raise OperationFailure('%r is not a valid $type' % search_val)
elif TYPE_MAP[search_val] is None:
raise NotImplementedError('%s is a valid $type but not implemented' % search_val)
return isinstance(doc_val, TYPE_MAP[search_val])
def _combine_regex_options(search):
if not isinstance(search['$options'], string_types):
raise OperationFailure('$options has to be a string')
options = None
for option in search['$options']:
if option not in 'imxs':
continue
re_option = getattr(re, option.upper())
if options is None:
options = re_option
else:
options |= re_option
search_copy = dict(search)
del search_copy['$options']
if options is None:
return search_copy
if isinstance(search['$regex'], _RE_TYPES):
if isinstance(search['$regex'], RE_TYPE):
search_copy['$regex'] = re.compile(
search['$regex'].pattern, search['$regex'].flags | options)
else:
# bson.Regex
regex = search['$regex']
search_copy['$regex'] = regex.__class__(regex.pattern, regex.flags | options)
else:
search_copy['$regex'] = re.compile(search['$regex'], options)
return search_copy
def operator_eq(doc_val, search_val):
if doc_val is NOTHING and search_val is None:
return True
return operator.eq(doc_val, search_val)
SORTING_OPERATOR_MAP = {
'$gt': operator.gt,
'$gte': operator.ge,
'$lt': operator.lt,
'$lte': operator.le,
}
LOGICAL_OPERATOR_MAP = {
'$or': lambda d, subq, filter_func: any(filter_func(q, d) for q in subq),
'$and': lambda d, subq, filter_func: all(filter_func(q, d) for q in subq),
'$nor': lambda d, subq, filter_func: all(not filter_func(q, d) for q in subq),
}
TYPE_MAP = {
'double': (float,),
'string': (str,),
'object': (dict,),
'array': (list,),
'binData': (bytes,),
'undefined': None,
'objectId': (ObjectId,),
'bool': (bool,),
'date': (datetime,),
'null': None,
'regex': None,
'dbPointer': None,
'javascript': None,
'symbol': None,
'javascriptWithScope': None,
'int': (int,),
'timestamp': None,
'long': (float,),
'decimal': (float,),
'minKey': None,
'maxKey': None,
}
def resolve_key(key, doc):
return next(iter(iter_key_candidates(key, doc)), NOTHING)
def resolve_sort_key(key, doc):
value = resolve_key(key, doc)
# see http://docs.mongodb.org/manual/reference/method/cursor.sort/#ascending-descending-sort
if value is NOTHING:
return 1, BsonComparable(None)
# List or tuples are sorted solely by their first value.
if isinstance(value, (tuple, list)):
if not value:
return 0, BsonComparable(None)
return 1, BsonComparable(value[0])
return 1, BsonComparable(value)
class BsonComparable(object):
"""Wraps a value in an BSON like object that can be compared one to another."""
def __init__(self, obj):
self.obj = obj
def __lt__(self, other):
return bson_compare(operator.lt, self.obj, other.obj)
_filterer_inst = _Filterer()
# Developer note: to avoid a cross-modules dependency (filtering requires aggregation, that requires
# filtering), the aggregation module needs to register its parse_expression function here.
def register_parse_expression(parse_expression):
"""Register the parse_expression function from the aggregate module."""
del _Filterer.parse_expression[:]
_Filterer.parse_expression.append(parse_expression)