Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# Copyright 2018 TrilioData Inc.
# All Rights Reserved.

import operator

from sqlalchemy import (
    and_,
    asc,
    desc,
    func,
    not_,
    or_
)


class QueryTransformer(object):
    operators = {"=": operator.eq,
                 "<": operator.lt,
                 ">": operator.gt,
                 "<=": operator.le,
                 "=<": operator.le,
                 ">=": operator.ge,
                 "=>": operator.ge,
                 "!=": operator.ne,
                 "in": lambda field_name, values: field_name.in_(values),
                 "=~": lambda field, value: field.op("regexp")(value)}

    # operators which are different for different dialects
    dialect_operators = {'postgresql': {'=~': (lambda field, value:
                                               field.op("~")(value))}}

    complex_operators = {"or": or_,
                         "and": and_,
                         "not": not_}

    ordering_functions = {"asc": asc,
                          "desc": desc}

    def __init__(self, table, query, dialect='mysql'):
        self.table = table
        self.query = query
        self.dialect_name = dialect

    def _get_operator(self, op):
        return (self.dialect_operators.get(self.dialect_name, {}).get(op)
                or self.operators[op])

    def _handle_complex_op(self, complex_op, nodes):
        op = self.complex_operators[complex_op]
        if op == not_:
            nodes = [nodes]
        element_list = []
        for node in nodes:
            element = self._transform(node)
            element_list.append(element)
        return op(*element_list)

    def _handle_simple_op(self, simple_op, nodes):
        op = self._get_operator(simple_op)
        field_name, value = list(nodes.items())[0]
        return op(getattr(self.table, field_name), value)

    def _transform(self, sub_tree):
        operator, nodes = list(sub_tree.items())[0]
        if operator in self.complex_operators:
            return self._handle_complex_op(operator, nodes)
        else:
            return self._handle_simple_op(operator, nodes)

    def apply_filter(self, expression_tree):
        condition = self._transform(expression_tree)
        self.query = self.query.filter(condition)

    def apply_options(self, orderby, limit):
        self._apply_order_by(orderby)
        if limit is not None:
            self.query = self.query.limit(limit)

    def _apply_order_by(self, orderby):
        if orderby is not None:
            for field in orderby:
                attr, order = list(field.items())[0]
                ordering_function = self.ordering_functions[order]
                if attr == 'severity':
                    self.query = self.query.order_by(ordering_function(
                        func.field(getattr(self.table, attr), 'low',
                                   'moderate', 'critical')))
                else:
                    self.query = self.query.order_by(ordering_function(
                        getattr(self.table, attr)))
        else:
            self.query = self.query.order_by(desc(self.table.timestamp))

    def get_query(self):
        return self.query