Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
sarus_sql / sarus_sql / ast_utils / compose_queries.py
Size: Mime:
import copy
import typing as t

from snsql._ast.ast import Identifier, NamedSubquery, Query, Select
from snsql.sql.reader.base import NameCompare
from snsql.sql.reader.postgres import PostgresNameCompare

import sarus_sql.ast_utils as ast_utils


def compose_parsed_query(
    queries: t.Dict[str, Query],
    base_query: Query,
    name_compare: t.Optional[NameCompare] = None,
) -> Query:
    """Compose queries in a dict with a base_query generating WITH clause.
    if base_query has already CTEs, the incoming queries will the firsts
    in the CTEs of the composed query.

    Parameters:
        queries (Dict[str, Query]): The common table expressions
            with their names
        base_query (Query): The major query
        name_compare (NameCampare): engine NameCompare

    Returns:
        with_query (Query): the wuery with the WITH clause
    """
    if name_compare is None:
        name_compare = PostgresNameCompare()

    def clean_escape(string: str) -> str:
        return (
            str(name_compare.strip_escapes(string))
            if name_compare.is_escaped(string)
            else string.lower()
        )

    if queries == {}:
        return base_query

    ctes = {}
    base_query_ctes = {}
    query_ctes = base_query.select.ctes
    if query_ctes is not None:
        for subqry in query_ctes:
            base_query_ctes[subqry.name.text] = subqry.subquery
    ctes_names = [clean_escape(str(name)) for name in base_query_ctes]
    for name, qry in queries.items():
        if clean_escape(name) in ctes_names:
            raise ValueError(
                f'WITH query name "{name}" specified more than once'
            )
        ctes[name] = qry

    ctes = {**ctes, **base_query_ctes}

    if len(ctes) == 0:
        return base_query

    select = Select(
        copy.deepcopy(base_query.select.quantifier),
        copy.deepcopy(base_query.select.namedExpressions),
        # columns redefinition in the cte table name are not considered at the
        # moment
        [
            NamedSubquery(Identifier(name), [], qry)
            for name, qry in ctes.items()
        ],
    )
    with_query = Query(
        select,
        base_query.source,
        base_query.where,
        base_query.agg,
        base_query.having,
        base_query.order,
        base_query.limit,
    )
    with_query.row_privacy = base_query.row_privacy
    return with_query


def compose_query(
    select_sql_queries: t.Dict[str, str],
    query: str,
) -> str:
    """Return a query with a WITH clause.

    Parameters:
        select_sql_queries (Dict[str, str]): The common table expressions
            with their names
        query (str): The major query

    Returns:
        with_query (str): the query with the WITH clause
    """
    parsed_select_sql_queries = {
        name: ast_utils.parse_query(_qry)
        for (name, _qry) in select_sql_queries.items()
    }
    parsed_query = ast_utils.parse_query(query)
    with_query = str(
        compose_parsed_query(
            parsed_select_sql_queries,
            parsed_query,
        )
    )
    return with_query


def decompose_query(
    query: str,
) -> t.Tuple[t.Dict[str, str], str]:
    """Decompose a query into a dict with the common table
    expressions and a major query

    Parameters:
        query (str): A query

    Returns:
        with_queries (Dict[str, str]): dict with the common table expressions
            and their names
        major_query (str): The query without the WITH clause
    """
    parsed_query = ast_utils.parse_query(query)
    ctes, parsed_major_query = decompose_parsed_query(parsed_query)
    with_queries = {name: str(cte) for name, cte in ctes.items()}
    major_query = str(parsed_major_query)
    return with_queries, major_query


def decompose_parsed_query(
    parsed_query: Query,
) -> t.Tuple[t.Dict[str, Query], Query]:
    """Decompose a parsed query into a dict with the common table
    expressions and a major query

    Parameters:
        parsed_query (Query): A parsed query

    Returns:
        with_queries (Dict[str, Query]): dict with the common table expressions
            and their names
        major_query (Query): The query without the WITH clause
    """
    with_queries: t.Dict[str, Query]
    major_query: Query
    if parsed_query.select.ctes is not None:
        select = Select(
            parsed_query.select.quantifier,
            parsed_query.select.namedExpressions,
        )
        major_query = Query(
            select,
            parsed_query.source,
            parsed_query.where,
            parsed_query.agg,
            parsed_query.having,
            parsed_query.order,
            parsed_query.limit,
        )
        major_query.row_privacy = parsed_query.row_privacy
        with_queries = {
            subqry.name.text: subqry.subquery
            for subqry in parsed_query.select.ctes
        }
    else:
        with_queries = {}
        major_query = copy.deepcopy(parsed_query)
    return (with_queries, major_query)