Repository URL to install this package:
|
Version:
1.0.14 ▾
|
namara-python
/
dataframe.py
|
|---|
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
from pandas import DataFrame as PandaDataFrame
from pandas import Series as PandaSeries
from pandas.core.groupby import GroupBy as PandaDataFrameGroupBy
import pandas as pd
from namara_python.client import Client
from namara_python.query import Query as QueryClient
from namara_python.utils import listify
from namara_pandas.query import Query
import time
from collections import namedtuple
rows_50_query = Query('*', limit=50)
simple_count_query = Query("count(*)")
# the may be combined with other clauses
distinct_on_clause = "DISTINCT ON ({}) {}"
simple_distinct_clause = "DISTINCT {}"
MAX_ROWS = 1000
API_CALLS_MAX = 5
API_RATE_LIMIT = 0.5 # in seconds
LEFT_SIDE, RIGHT_SIDE = 'DS1', 'DS2'
AAC = '__nm_id' # Namara's always available columnn
Apply = namedtuple('Apply', 'function args kwargs result_type axis')
class DataFrame(PandaDataFrame):
_metadata = ['full_count', 'data_frame_query', 'client', 'data_set_id', 'loaded']
_internal_names = ['apply_queue']
_internal_names_set = set(_internal_names)
@property
def _constructor(self):
return DataFrame
def __init__(self, *args, **kwargs):
''' Takes in a `data_set_id` keyword and will load the first 50 rows of
that dataset. Essentially "registers" this dataframe to a Namara
dataset.
Provides a `full_count` property that can tell you how many
rows are in the full dataset.
'''
self.loaded = False
self.client = None
self.full_count = None
self.apply_queue = []
data_set_id = kwargs.pop('data_set_id', None)
loaded = kwargs.pop('loaded', False)
data_frame_query = kwargs.pop('data_frame_query', None)
load_frame = self._check_frame(data_set_id, loaded, data_frame_query)
if load_frame:
client = kwargs.pop('client', None)
if not client:
client = QueryClient(Client())
if not data_frame_query:
data_frame_query = rows_50_query.extend(from_id=data_set_id)
kwargs['data'] = DataFrame._load(client, data_frame_query.compile())
self.full_count = DataFrame._simple_count(client, data_set_id)
self.loaded = True
self.client = client
super().__init__(*args, **kwargs)
self.original_columns = set(self.columns)
self.data_set_id = data_set_id
self.data_set_ids = set([data_set_id])
self.data_frame_query = data_frame_query
def append(self, *args, **kwargs):
''' Maybe take in a dataset id
'''
data_set_id = kwargs.pop('data_set_id', None)
if data_set_id is None:
return super().append(*args, **kwargs)
query = rows_50_query.compile(from_id=data_set_id)
new_data_set = DataFrame._load(self.client, query)
self.data_set_ids.add(data_set_id)
return super().append(new_data_set, **kwargs)
def drop(self, *args, **kwargs):
''' Drop columns from dataframe. These changes affect what column can
gets queried from namara. Support what default pandas supports
'''
axis = kwargs.get('axis')
columns = kwargs.get('columns')
inplace = kwargs.get('inplace')
if not inplace:
return super().drop(*args, **kwargs)
if axis == 1:
columns_to_drop = args[0]
elif columns:
columns_to_drop = columns
# here `args[0]` is the `labels` arg, which can be a list of strings or
# just a string. Same with `columns`
if type(columns_to_drop) is list:
self.original_columns -= set(columns_to_drop)
else:
self.original_columns -= set([columns_to_drop])
return super().drop(*args, **kwargs)
def itertuples(self, index=True, name='Pandas'):
return self._itertuples(index, name)
def _itertuples(self, index=True, name='Pandas'):
''' Lets you iterate over all rows in the dataset; each row is
represented as tuple. Unlike regular pandas, each tuple is not a named
tuple. Meaning you can't do something like `row.Title` but instead have
to do `row[3]` to actually pull out the column values in each row.
Since this is still in beta, there are limiations to how many rows you
can pull from the api (currently limited to 5 api calls, you can override this however)
'''
for row_offset in range(0, API_CALLS_MAX * MAX_ROWS, MAX_ROWS):
# query = self.data_frame_query + " ORDER BY {} LIMIT {} OFFSET {}".format(AAC, MAX_ROWS, row_offset)
args = {'limit': MAX_ROWS, 'offset': row_offset}
if not self.data_frame_query.order_by: args['order_by'] = AAC
query = self.data_frame_query.compile(**args)
rows = self.client.query(query)
for row in rows:
# TODO: take into account `index` and `name` params for
# creating tuples
row_applied = pd.Series(row)
for apply in self.apply_queue:
if 'inplace' in apply.kwargs:
apply.function.__call__(row_applied, *apply.args, **apply.kwargs)
else:
row_applied = apply.function.__call__(row_applied, *apply.args, **apply.kwargs)
yield tuple(row_applied)
time.sleep(API_RATE_LIMIT)
def apply(self, func, axis=0, result_type=None, args=(), **kwargs):
''' Will return a dataframe with the applied `func` function. Will also
lazily apply the function to the full dataset during iteration. The
result_type and axis will always be `broadcast` and 1 respectively.
This means that it can only work on row by row basis and the function
must work on panda.Series object.
The passed in `func` gets added to a queue which gets passed along to
pandas manipulations. This allows for chaining `apply`s together that
will get evaluated lazily during iteration of the full dataset
'''
# For now, fix this value to 'broadcast' (ie. same shape as original)
result_type = 'broadcast'
# For now, fix this value to apply row wise only
axis = 1
result_frame = super().apply(func, args=args, result_type=result_type, axis=axis, **kwargs)
new_apply = Apply(func, args, kwargs, result_type, axis)
result_frame._inherit_frame(self)
result_frame.apply_queue.append(new_apply)
return result_frame
def drop_duplicates(self, subset=None, keep='first', inplace=False):
''' Doesn't work right now because of a limitation with Namara's NiQL
'''
columns = ', '.join(list(self.columns))
if subset:
distinct_cols = ', '.join(listify(subset))
distinct_clause = distinct_on_clause.format(distinct_cols, columns)
order_by_cols = subset
else:
order_by_cols = AAC
distinct_clause = simple_distinct_clause.format(columns)
query = Query(distinct_clause, from_id=self.data_set_id, order_by=order_by_cols)
# query = "SELECT {} FROM {} ORDER BY {}".format(distinct_clause, '{}', order_by_cols)
if inplace:
self.data_frame_query = query
return super().drop_duplicates(subset=subset, keep=keep, inplace=inplace)
else:
return_frame = super().drop_duplicates(subset=subset, keep=keep, inplace=inplace)
return_frame.data_frame_query = query
return return_frame
def fillna(self, *args, **kwargs):
''' Will return a dataframe with filled in null values. Will also
lazily fill in values during iteration. Supports regular pandas args
'''
new_apply = Apply(PandaSeries.fillna, args, kwargs, 'broadcast', None)
if kwargs.get('inplace', False):
self.apply_queue.append(new_apply)
return super().fillna(*args, **kwargs)
else:
result_frame = super().fillna(*args, **kwargs)
result_frame.apply_queue.append(new_apply)
return result_frame
def groupby(self, *args, **kwargs):
''' Does a group by on certain column values using NiQL. Must pass in
`by` keyword, otherwise will resort to standard pandas groupby.
Read documentation for namara_pandas.Groupby for more details
'''
by = kwargs.pop('by')
if not by:
return super().groupby(*args, **kwargs)
if type(by) is not list:
by = [by]
if not set(by).issubset(set(self.columns)):
raise TypeError('%s not in this dataset' % by)
kwargs['data_set_id'] = self.data_set_id
kwargs['client'] = self.client
kwargs['full_count'] = self.full_count
return DataFrameGroupBy(self, by, **kwargs)
def merge(self, right=None, how='inner', on=None, left_on=None, right_on=None, left_index=False,
right_index=False, sort=False, suffixes=('_x', '_y'), copy=True, indicator=False, validate=None, **kwargs):
''' Performs a sql join on with a specified `data_set_id` or `right` must
be namara_pandas.DataFrame. Does not work with non-namara dataframes.
Supports `right`, `how`, `on`, `left_on`, `right_on` keywords, others are ignored.
'''
if (not isinstance(right, DataFrame)):
raise AttributeError("a Namara DataFrame needs to be passed in")
from_clause = "%s AS %s" % (self.data_set_id, LEFT_SIDE)
join_type = how + ' join'
on = self._join_columns(on, left_on, right_on)
select_columns = self._join_select(self.columns, right.columns, suffixes)
join_from = "%s AS %s" % (right.data_set_id, RIGHT_SIDE)
frame_query = Query(', '.join(select_columns),
from_id=from_clause,
join_type=join_type,
join_from=join_from,
join_on=on)
return DataFrame(
data_set_id=self.data_set_id,
data_frame_query=frame_query,
client=self.client
)
def count(self):
return self._agg_query('count')
def sum(self):
return self._agg_query('sum')
def mean(self):
return self._agg_query('avg')
def max(self):
return self._agg_query('max')
def min(self):
return self._agg_query('min')
def _agg_query(self, agg):
'''
`agg` is the type of SQL aggregate being performed (count, sum, max, avg)
Only does aggregation per column bases. The pandas version of this
methods supports several parameters, however this function only
supports one use case. axis=0, numeric_only=True, other params are
not allowed.
'''
numeric_cols = [col for col, dtype in self.dtypes.items() if dtype in ['float', 'int']]
select_clause = ["%s(%s) as %s"%(agg, col, col) for col in numeric_cols]
select_clause = ', '.join(select_clause)
q = Query(select=select_clause, from_id=self.data_set_id)
agg_resp = self.client.query(q.compile())
return PandaSeries(agg_resp[0])
def _join_columns(self, on=None, left_on=None, right_on=None):
on, left_on, right_on = listify(on), listify(left_on), listify(right_on)
if (left_on and not right_on) or (not left_on and right_on):
raise ValueError("Either, both 'right_on' and 'left_on' must be provided, or 'on'")
elif not(left_on and right_on) and (not on):
raise ValueError("Either, both 'right_on' and 'left_on' must be provided, or 'on'")
if (left_on and right_on) and (len(left_on) != len(right_on)):
raise ValueError("Length of both 'left_on' and 'right_on' must be the same")
join_on_str = []
if on:
for column in on:
join_on_str.append("%s.%s = %s.%s" %(LEFT_SIDE, column, RIGHT_SIDE, column))
else:
for cols in zip(left_on, right_on):
join_on_str.append("%s.%s = %s.%s" %(LEFT_SIDE, cols[0], RIGHT_SIDE, cols[1]))
return ' AND '.join(join_on_str)
def _join_select(self, ds1_cols, ds2_cols, suffixes):
left_side = [LEFT_SIDE + '.' + col + ' AS ' + col+suffixes[0] for col in ds1_cols]
right_side = [RIGHT_SIDE + '.' + col + ' AS ' + col+suffixes[1] for col in ds2_cols]
return left_side + right_side
@staticmethod
def _simple_count(client, data_set_id):
''' Counts the total number of rows in a dataset, does not take into consideration null values in a column.
'''
query = simple_count_query.compile(from_id=data_set_id)
resp = client.query(query)
return resp[0]["count"]
@staticmethod
def _load(client, query):
''' Fetches the first 50 rows from the dataset `id` from Namara
'''
data_set = client.query(query)
# return self._decor_list_of_dict_to_pd(data_set)
return data_set
def _load_full(self, query, client, id):
''' Fetches all the rows for a dataset up to maximum number of api
calls (API_CALLS_MAX). Each call is limited to MAX_ROWS rows.
`query` must have formatting params for limit and offset, these values
have to be controlled by this function.
'''
api_calls_to_make = (self.full_count // MAX_ROWS) + 1
if api_calls_to_make > API_CALLS_MAX:
print("WARNING: Making too many API calls, limiting to %d" % API_CALLS_MAX)
data_set = []
for row_offset in range(0, API_CALLS_MAX * MAX_ROWS, MAX_ROWS):
query = query.format(MAX_ROWS, row_offset)
rows = client.query(query)
data_set += rows
time.sleep(API_RATE_LIMIT)
return data_set
def _check_frame(self, data_set_id, loaded, data_frame_query):
return (data_set_id or data_frame_query) and not (self.loaded or loaded)
def _validate(self, obj):
# verify pandas obj
if 'id' not in obj.columns:
raise AttributeError("Cannot find id from namara pandas object")
def _decor_list_of_dict_to_pd(self, list_of_dict):
return DataFrame(list_of_dict)
def _inherit_frame(self, parent_frame):
self.data_set_id = parent_frame.data_set_id
self.data_frame_query = parent_frame.data_frame_query
self.client = parent_frame.client
self.apply_queue = parent_frame.apply_queue
class DataFrameGroupBy(PandaDataFrameGroupBy):
def __init__(self, obj, by, **kwargs):
self.data_set_id = kwargs.pop('data_set_id')
self.client = kwargs.pop('client')
self.full_count = kwargs.pop('full_count')
super().__init__(obj, by, **kwargs)
def __iter__(self):
return self.get_iterator()
def get_iterator(self):
''' Generator function for iterating over groupby objects. Based of how
Pandas internally does, with added functionality for pulling in data
from Namara.
LIMITATION: if a group is very large, ie larger than MAX_ROWS, than
that group will be split up into multiple Panda groups. Which means if
you're iterating over groups, then you have to take into account; cant
assume each iteration will be a different group.
'''
max_results = min(self.full_count, API_CALLS_MAX * MAX_ROWS)
for row_offset in range(0, max_results, MAX_ROWS):
query = rows_50_query.compile(
from_id=self.data_set_id,
order_by=', '.join(self.keys),
limit=MAX_ROWS,
offset=row_offset)
rows = self.client.query(query)
frame = pd.DataFrame(rows)
frame.index = pd.RangeIndex(row_offset, row_offset + min(len(rows), MAX_ROWS))
gp = PandaDataFrameGroupBy(frame, self.keys)
for key, group in gp:
yield key, group
time.sleep(API_RATE_LIMIT)
def count(self, *args, **kwargs):
columns = ', '.join(self.keys)
query = "select count(*), %s from %s group by %s" %(columns, self.data_set_id, columns)
count_resp = self.client.query(query)
column_list = [[] for i in range(len(self.keys))]
counts = []
for item in count_resp:
for i, col in enumerate(self.keys):
column_list[i].append(item[col])
counts.append(item['count'])
if len(self.keys) == 1:
index = pd.Index(column_list[0], names=self.keys)
else:
index = pd.MultiIndex.from_arrays(column_list, names=self.keys)
return pd.DataFrame(index=index, columns=['count'], data=counts)