""" manage PyTables query interface via Expressions """
import ast
from functools import partial
import numpy as np
from pandas.compat import DeepChainMap, string_types, u
from pandas.core.dtypes.common import is_list_like
import pandas as pd
from pandas.core.base import StringMixin
import pandas.core.common as com
from pandas.core.computation import expr, ops
from pandas.core.computation.common import _ensure_decoded
from pandas.core.computation.expr import BaseExprVisitor
from pandas.core.computation.ops import UndefinedVariableError, is_term
from pandas.io.formats.printing import pprint_thing, pprint_thing_encoded
class Scope(expr.Scope):
__slots__ = 'queryables',
def __init__(self, level, global_dict=None, local_dict=None,
queryables=None):
super(Scope, self).__init__(level + 1, global_dict=global_dict,
local_dict=local_dict)
self.queryables = queryables or dict()
class Term(ops.Term):
def __new__(cls, name, env, side=None, encoding=None):
klass = Constant if not isinstance(name, string_types) else cls
supr_new = StringMixin.__new__
return supr_new(klass)
def __init__(self, name, env, side=None, encoding=None):
super(Term, self).__init__(name, env, side=side, encoding=encoding)
def _resolve_name(self):
# must be a queryables
if self.side == 'left':
if self.name not in self.env.queryables:
raise NameError('name {name!r} is not defined'
.format(name=self.name))
return self.name
# resolve the rhs (and allow it to be None)
try:
return self.env.resolve(self.name, is_local=False)
except UndefinedVariableError:
return self.name
@property
def value(self):
return self._value
class Constant(Term):
def __init__(self, value, env, side=None, encoding=None):
super(Constant, self).__init__(value, env, side=side,
encoding=encoding)
def _resolve_name(self):
return self._name
class BinOp(ops.BinOp):
_max_selectors = 31
def __init__(self, op, lhs, rhs, queryables, encoding):
super(BinOp, self).__init__(op, lhs, rhs)
self.queryables = queryables
self.encoding = encoding
self.filter = None
self.condition = None
def _disallow_scalar_only_bool_ops(self):
pass
def prune(self, klass):
def pr(left, right):
""" create and return a new specialized BinOp from myself """
if left is None:
return right
elif right is None:
return left
k = klass
if isinstance(left, ConditionBinOp):
if (isinstance(left, ConditionBinOp) and
isinstance(right, ConditionBinOp)):
k = JointConditionBinOp
elif isinstance(left, k):
return left
elif isinstance(right, k):
return right
elif isinstance(left, FilterBinOp):
if (isinstance(left, FilterBinOp) and
isinstance(right, FilterBinOp)):
k = JointFilterBinOp
elif isinstance(left, k):
return left
elif isinstance(right, k):
return right
return k(self.op, left, right, queryables=self.queryables,
encoding=self.encoding).evaluate()
left, right = self.lhs, self.rhs
if is_term(left) and is_term(right):
res = pr(left.value, right.value)
elif not is_term(left) and is_term(right):
res = pr(left.prune(klass), right.value)
elif is_term(left) and not is_term(right):
res = pr(left.value, right.prune(klass))
elif not (is_term(left) or is_term(right)):
res = pr(left.prune(klass), right.prune(klass))
return res
def conform(self, rhs):
""" inplace conform rhs """
if not is_list_like(rhs):
rhs = [rhs]
if isinstance(rhs, np.ndarray):
rhs = rhs.ravel()
return rhs
@property
def is_valid(self):
""" return True if this is a valid field """
return self.lhs in self.queryables
@property
def is_in_table(self):
""" return True if this is a valid column name for generation (e.g. an
actual column in the table) """
return self.queryables.get(self.lhs) is not None
@property
def kind(self):
""" the kind of my field """
return getattr(self.queryables.get(self.lhs), 'kind', None)
@property
def meta(self):
""" the meta of my field """
return getattr(self.queryables.get(self.lhs), 'meta', None)
@property
def metadata(self):
""" the metadata of my field """
return getattr(self.queryables.get(self.lhs), 'metadata', None)
def generate(self, v):
""" create and return the op string for this TermValue """
val = v.tostring(self.encoding)
return "({lhs} {op} {val})".format(lhs=self.lhs, op=self.op, val=val)
def convert_value(self, v):
""" convert the expression that is in the term to something that is
accepted by pytables """
def stringify(value):
if self.encoding is not None:
encoder = partial(pprint_thing_encoded,
encoding=self.encoding)
else:
encoder = pprint_thing
return encoder(value)
kind = _ensure_decoded(self.kind)
meta = _ensure_decoded(self.meta)
if kind == u('datetime64') or kind == u('datetime'):
if isinstance(v, (int, float)):
v = stringify(v)
v = _ensure_decoded(v)
v = pd.Timestamp(v)
if v.tz is not None:
v = v.tz_convert('UTC')
return TermValue(v, v.value, kind)
elif kind == u('timedelta64') or kind == u('timedelta'):
v = pd.Timedelta(v, unit='s').value
return TermValue(int(v), v, kind)
elif meta == u('category'):
metadata = com.values_from_object(self.metadata)
result = metadata.searchsorted(v, side='left')
# result returns 0 if v is first element or if v is not in metadata
# check that metadata contains v
if not result and v not in metadata:
result = -1
return TermValue(result, result, u('integer'))
elif kind == u('integer'):
v = int(float(v))
return TermValue(v, v, kind)
elif kind == u('float'):
v = float(v)
return TermValue(v, v, kind)
elif kind == u('bool'):
if isinstance(v, string_types):
v = not v.strip().lower() in [u('false'), u('f'), u('no'),
u('n'), u('none'), u('0'),
u('[]'), u('{}'), u('')]
else:
v = bool(v)
return TermValue(v, v, kind)
elif isinstance(v, string_types):
# string quoting
return TermValue(v, stringify(v), u('string'))
else:
raise TypeError("Cannot compare {v} of type {typ} to {kind} column"
.format(v=v, typ=type(v), kind=kind))
def convert_values(self):
pass
class FilterBinOp(BinOp):
def __unicode__(self):
return pprint_thing("[Filter : [{lhs}] -> [{op}]"
.format(lhs=self.filter[0], op=self.filter[1]))
def invert(self):
""" invert the filter """
if self.filter is not None:
f = list(self.filter)
f[1] = self.generate_filter_op(invert=True)
self.filter = tuple(f)
return self
def format(self):
""" return the actual filter format """
return [self.filter]
def evaluate(self):
if not self.is_valid:
raise ValueError("query term is not valid [{slf}]"
.format(slf=self))
rhs = self.conform(self.rhs)
values = [TermValue(v, v, self.kind) for v in rhs]
if self.is_in_table:
# if too many values to create the expression, use a filter instead
if self.op in ['==', '!='] and len(values) > self._max_selectors:
filter_op = self.generate_filter_op()
self.filter = (
self.lhs,
filter_op,
pd.Index([v.value for v in values]))
return self
return None
# equality conditions
if self.op in ['==', '!=']:
filter_op = self.generate_filter_op()
self.filter = (
self.lhs,
filter_op,
pd.Index([v.value for v in values]))
else:
raise TypeError("passing a filterable condition to a non-table "
"indexer [{slf}]".format(slf=self))
return self
def generate_filter_op(self, invert=False):
if (self.op == '!=' and not invert) or (self.op == '==' and invert):
return lambda axis, vals: ~axis.isin(vals)
else:
return lambda axis, vals: axis.isin(vals)
class JointFilterBinOp(FilterBinOp):
def format(self):
raise NotImplementedError("unable to collapse Joint Filters")
def evaluate(self):
return self
class ConditionBinOp(BinOp):
def __unicode__(self):
return pprint_thing("[Condition : [{cond}]]"
.format(cond=self.condition))
def invert(self):
""" invert the condition """
# if self.condition is not None:
# self.condition = "~(%s)" % self.condition
# return self
raise NotImplementedError("cannot use an invert condition when "
"passing to numexpr")
def format(self):
""" return the actual ne format """
return self.condition
def evaluate(self):
if not self.is_valid:
raise ValueError("query term is not valid [{slf}]"
.format(slf=self))
# convert values if we are in the table
if not self.is_in_table:
return None
rhs = self.conform(self.rhs)
values = [self.convert_value(v) for v in rhs]
# equality conditions
if self.op in ['==', '!=']:
# too many values to create the expression?
if len(values) <= self._max_selectors:
vs = [self.generate(v) for v in values]
self.condition = "({cond})".format(cond=' | '.join(vs))
# use a filter after reading
else:
return None
else:
self.condition = self.generate(values[0])
Loading ...