Repository URL to install this package:
|
Version:
1.0.15 ▾
|
namara-python
/
NamaraAccessor.py
|
|---|
import os
import pandas as pd
from namara_python.client import Client
import modin.pandas as mpd
from typing import Type, List
EMAIL = 'NAMARA_EMAIL'
PASSWORD = 'NAMARA_PASSWORD'
mean_query = "SELECT AVG({}) as 'average' FROM {}"
count_query = "SELECT COUNT({}) as 'count' FROM {}"
count_numeric_query = "SELECT COUNTN({}) as 'count' FROM {}"
sum_query = "SELECT SUM({}) as 'sum' FROM {}"
max_query = "SELECT MAX({}) as 'max' FROM {}"
min_query = "SELECT MIN({}) as 'min' FROM {}"
rows_50_query = "SELECT * FROM {} LIMIT 50"
class NamaraDataFrame(mpd.DataFrame):
@property
def _constructor(self):
return NamaraDataFrame
@pd.api.extensions.register_dataframe_accessor("namara")
class NamaraAccessor:
def __init__(self, pandas_obj):
self.client = None
# self._validate(pandas_obj)
self.client = Client()
self.id = pandas_obj.id[0]
self._obj = self._load()
def mean(self, column: str, params: dict = None) -> pd.DataFrame:
result = self._aggregate_template(mean_query, self.id, column, params)
return self._decor_list_of_dict_to_pd(result)
def count(self, numeric: bool = False, column: str = None,
distinct: bool = False, params: dict = None) -> pd.DataFrame:
if numeric:
query = count_numeric_query
if column is None:
raise AttributeError(
"column must be provided for numeric query")
else:
query = count_query
if column:
if distinct:
result = self._aggregate_template(query, params, 'DISTINCT ' + column,
self.id)
else:
result = self._aggregate_template(query, params, column, self.id)
else:
result = self._aggregate_template(query, params, '*', self.id)
return self._decor_list_of_dict_to_pd(result)
def sum(self, column: str, distinct: bool = False, params: dict = None) -> pd.DataFrame:
if distinct:
result = self._aggregate_template(sum_query, params, 'DISTINCT ' + column, self.id)
else:
result = self._aggregate_template(sum_query, params, column, self.id)
return self._decor_list_of_dict_to_pd(result)
def max(self, column: str, params: dict = None) -> pd.DataFrame:
result = self._aggregate_template(max_query, params, column, self.id)
return self._decor_list_of_dict_to_pd(result)
def min(self, column: str, params: dict = None) -> pd.DataFrame:
result = self._aggregate_template(min_query,params, column, self.id)
return self._decor_list_of_dict_to_pd(result)
def set_client(self, client: Type[Client]) -> None:
self.client = client
def _load(self):
query = rows_50_query.format(self.id)
data_set = self.client.query_client().query(query)
return self._decor_list_of_dict_to_pd(data_set)
@staticmethod
def set_auth(email: str, pwd: str) -> None:
os.environ[EMAIL] = email
os.environ[PASSWORD] = pwd
def _aggregate_template(self, aggr_query: str, params: dict = None, *args) -> List:
query = aggr_query.format(*args)
if params:
condition_query = 'where'
for k, v in params.items():
condition_query = condition_query + k + '=' + v + 'AND'
condition_query = condition_query[:-3]
query = query + ' ' + condition_query
result = self.client.query_client().query(query)
if result:
return result
else:
raise Exception
def _validate(self, obj):
# verify pandas obj
if 'id' not in obj.columns:
raise AttributeError("Cannot find id from namara pandas object")
def _decor_list_of_dict_to_pd(self, list_of_dict: List) -> pd.DataFrame:
return NamaraDataFrame(list_of_dict)