Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
cql / cql / query.py
Size: Mime:
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
from cql.apivalues import ProgrammingError
from cql.cqltypes import lookup_casstype

stringlit_re = re.compile(r"""('[^']*'|"[^"]*")""")
comments_re = re.compile(r'(/\*(?:[^*]|\*[^/])*\*/|//.*$|--.*$)', re.MULTILINE)
param_re = re.compile(r'''
    ( \W )            # don't match : at the beginning of the text (meaning it
                      # immediately follows a comment or string literal) or
                      # right after an identifier character
    : ( \w+ )
    (?= \W )          # and don't match a param that is immediately followed by
                      # a comment or string literal either
''', re.IGNORECASE | re.VERBOSE)

def replace_param_substitutions(query, replacer):
    split_strings = stringlit_re.split(' ' + query + ' ')
    split_str_and_cmt = []
    for p in split_strings:
        if p[:1] in '\'"':
            split_str_and_cmt.append(p)
        else:
            split_str_and_cmt.extend(comments_re.split(p))
    output = []
    for p in split_str_and_cmt:
        if p[:1] in '\'"' or p[:2] in ('--', '//', '/*'):
            output.append(p)
        else:
            output.append(param_re.sub(replacer, p))
    assert output[0][0] == ' ' and output[-1][-1] == ' '
    return ''.join(output)[1:-1]

class PreparedQuery(object):
    def __init__(self, querytext, itemid, vartypes, paramnames):
        self.querytext = querytext
        self.itemid = itemid
        self.vartypes = map(lookup_casstype, vartypes)
        self.paramnames = paramnames
        if len(self.vartypes) != len(self.paramnames):
            raise ProgrammingError("Length of variable types list is not the same"
                                   " length as the list of parameter names")

    def encode_params(self, params):
        return [t.to_binary(t.validate(params[n])) for (n, t) in zip(self.paramnames, self.vartypes)]

def prepare_inline(query, params):
    """
    For every match of the form ":param_name", call cql_quote
    on kwargs['param_name'] and replace that section of the query
    with the result
    """

    def param_replacer(match):
        return match.group(1) + cql_quote(params[match.group(2)])
    return replace_param_substitutions(query, param_replacer)

def prepare_query(querytext):
    paramnames = []
    def found_param(match):
        pre_param_text = match.group(1)
        paramname = match.group(2)
        paramnames.append(paramname)
        return pre_param_text + '?'
    transformed_query = replace_param_substitutions(querytext, found_param)
    return transformed_query, paramnames

def cql_quote(term):
    if isinstance(term, unicode):
        return "'%s'" % __escape_quotes(term.encode('utf8'))
    elif isinstance(term, (str, bool)):
        return "'%s'" % __escape_quotes(str(term))
    else:
        return str(term)

def __escape_quotes(term):
    assert isinstance(term, basestring)
    return term.replace("'", "''")

def cql_quote_name(name):
    if isinstance(name, unicode):
        name = name.encode('utf8')
    return '"%s"' % name.replace('"', '""')