Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
namara-python / query.py
Size: Mime:
from typing import Any, Dict, List, Union, TYPE_CHECKING

if TYPE_CHECKING:
    from namara_python.client import Client

from namara_python.utils import gen_request_decorator
from namara_python.rpc.query_twirp import QueryServiceClient
from namara_python.version import __version__
import namara_python.rpc.query_pb2 as pb

from twirp.context import Context

# Custom type for sql results
Value = Union[str, int, float, bool]
Results = List[Dict[str, Value]]

class Query:
    # JDBC SQL type to python types
    TYPE_MAPPING = {
        'VARCHAR': str,
        'BIGINT': int,
        'INT': int,
        'BIT': bool,
        'TIMESTAMP': str,
        'DOUBLE': float
    }

    def __init__(self, client: 'Client') -> None:
        self.client  = client

        self.rpc_client = QueryServiceClient(client.base_api_url)

    def query(self, statement: str, fill_null: Any=None) -> Results:
        ctx = Context()
        ctx.set_header('x-api-key', self.client.api_key)
        ctx.set_header('x-namara-client', 'python')
        ctx.set_header('x-namara-client-version', __version__)

        query_response = self.rpc_client.Query(ctx=ctx, request=pb.QueryRequest(statement=statement))

        return Query._parse_rows(query_response, fill_null)

    @staticmethod
    def _parse_rows(query_response: Any, fill_null: Any='') -> Results:
        ''' Takes in a `query_response` pb.QueryResponse object
        '''
        results = []
        for row in query_response.rows:
            result = {}
            for pos, cell in enumerate(row.cells):
                column_meta = query_response.column_metadata[pos]
                key = column_meta.key
                value = Query._cell_value(cell, column_meta, fill_null)
                result[key] = value

            results.append(result)

        return results

    @staticmethod
    def _convert_value(sql_value:str , sql_type:str) -> Value:
        type = Query.TYPE_MAPPING.get(sql_type, None)
        if type:
            return type(sql_value)
        else:
            return sql_value

    @staticmethod
    def _cell_value(cell, column_meta, fill_null=''):
        if cell.is_null:
            if isinstance(fill_null, dict):
                value = fill_null.get(column_meta.key, None)
            else:
                value = fill_null
        else:
            value = Query._convert_value(cell.value, column_meta.jdbc_type)

        return value