Repository URL to install this package:
|
Version:
1.0.20 ▾
|
namara-python
/
query.py
|
|---|
from typing import Any, Dict, List, Union, TYPE_CHECKING
if TYPE_CHECKING:
from namara_python.client import Client
from namara_python.utils import gen_request_decorator
from namara_python.rpc.query_twirp import QueryServiceClient
from namara_python.version import __version__
import namara_python.rpc.query_pb2 as pb
from twirp.context import Context
# Custom type for sql results
Value = Union[str, int, float, bool]
Results = List[Dict[str, Value]]
class Query:
# JDBC SQL type to python types
TYPE_MAPPING = {
'VARCHAR': str,
'BIGINT': int,
'INT': int,
'BIT': bool,
'TIMESTAMP': str,
'DOUBLE': float
}
def __init__(self, client: 'Client') -> None:
self.client = client
self.rpc_client = QueryServiceClient(client.base_api_url)
def query(self, statement: str, fill_null: Any=None) -> Results:
ctx = Context()
ctx.set_header('x-api-key', self.client.api_key)
ctx.set_header('x-namara-client', 'python')
ctx.set_header('x-namara-client-version', __version__)
query_response = self.rpc_client.Query(ctx=ctx, request=pb.QueryRequest(statement=statement))
return Query._parse_rows(query_response, fill_null)
@staticmethod
def _parse_rows(query_response: Any, fill_null: Any='') -> Results:
''' Takes in a `query_response` pb.QueryResponse object
'''
results = []
for row in query_response.rows:
result = {}
for pos, cell in enumerate(row.cells):
column_meta = query_response.column_metadata[pos]
key = column_meta.key
value = Query._cell_value(cell, column_meta, fill_null)
result[key] = value
results.append(result)
return results
@staticmethod
def _convert_value(sql_value:str , sql_type:str) -> Value:
type = Query.TYPE_MAPPING.get(sql_type, None)
if type:
return type(sql_value)
else:
return sql_value
@staticmethod
def _cell_value(cell, column_meta, fill_null=''):
if cell.is_null:
if isinstance(fill_null, dict):
value = fill_null.get(column_meta.key, None)
else:
value = fill_null
else:
value = Query._convert_value(cell.value, column_meta.jdbc_type)
return value