Repository URL to install this package:
|
Version:
1.0.24 ▾
|
import pytest
import re
import pandas as pd
import namara_pandas as npd
simple_query_fixture = [
{'level':11, 'age': 1},
{'level':22, 'age': 2},
{'level':33, 'age': 3},
{'level':44, 'age': 4},
{'level':55, 'age': 5},
{'level':66, 'age': 6},
]
groupby_query_fixture = [
{'country':'canada', 'city': 'ottawa'},
{'country':'canada', 'city': 'toronto'},
{'country':'mexico', 'city': 'mexico city'},
{'country':'us', 'city': 'nyc'},
{'country':'us', 'city': 'nyc'},
]
count_query_fixture = [{'count': 50}]
data_set_id = 'some id'
class MockQueryClient:
def __init__(self, data=simple_query_fixture, limit=None, offset=0):
self.data = data
self.limit = limit
self.offset = 0
def query(self, stmt):
if re.search('select count', stmt.lower()):
return count_query_fixture
# a basic implementation of limit and offset
if self.limit is None:
return self.data
upper = self.offset + self.limit
if upper > len(self.data):
upper = len(self.data)
segment = self.data[self.offset: upper]
self.offset = len(self.data)
self.limit = 0
else:
segment = self.data[self.offset: upper]
if upper < len(self.data):
self.offset = upper
return segment
class TestNamaraDataFrame:
def teardown_method(cls):
# return these to their original values
npd.dataframe.MAX_ROWS = 1000
npd.dataframe.API_CALLS_MAX = 5
npd.dataframe.API_RATE_LIMIT = 0.5
def test_init(cls):
mock_client = MockQueryClient()
df = npd.DataFrame(data_set_id=data_set_id, client=mock_client)
assert isinstance(df, npd.DataFrame)
assert isinstance(df, pd.DataFrame)
assert df.data_set_id == data_set_id
assert df.full_count == count_query_fixture[0]['count']
assert isinstance(df.data_frame_query, npd.query.Query)
assert df.data_frame_query.from_id == data_set_id
assert len(df.apply_queue) == 0
def test_itertuples(cls):
npd.dataframe.MAX_ROWS = len(simple_query_fixture)
npd.dataframe.API_CALLS_MAX = 5
npd.dataframe.API_RATE_LIMIT = 0
df = npd.DataFrame(data_set_id=data_set_id, client=MockQueryClient())
rows = []
for row in df.itertuples():
rows.append(row)
assert len(rows) == len(simple_query_fixture) * npd.dataframe.API_CALLS_MAX
for row in rows:
assert isinstance(row, tuple)
def test_apply(cls):
# a simple enough function to test Apply with
function_to_apply = pd.Series.eq
function_args = [True]
df = npd.DataFrame(data_set_id=data_set_id, client=MockQueryClient())
result_df = df.apply(function_to_apply, args=function_args)
# just make sure its actually applying it to the dataframe
assert result_df.loc[0]['level'] == False
assert result_df.loc[0]['age'] == True
assert len(result_df.apply_queue) == 1
assert result_df.apply_queue[0].function == function_to_apply
assert result_df.apply_queue[0].args == function_args
assert result_df.apply_queue[0].kwargs == {}
assert result_df.apply_queue[0].axis == 1
assert result_df.apply_queue[0].result_type == 'broadcast'
def test_apply_on_iteration(cls):
npd.dataframe.MAX_ROWS = len(simple_query_fixture)
npd.dataframe.API_CALLS_MAX = 5
npd.dataframe.API_RATE_LIMIT = 0
function_to_apply = pd.Series.eq
function_args = [True]
df = npd.DataFrame(data_set_id=data_set_id, client=MockQueryClient())
result_df = df.apply(function_to_apply, args=function_args)
num_rows = 0
for row in result_df.itertuples():
num_rows += 1
for cell in row:
assert isinstance(cell, bool)
assert num_rows == len(simple_query_fixture) * npd.dataframe.API_CALLS_MAX
def test_apply_multiple_functions(cls):
npd.dataframe.MAX_ROWS = len(simple_query_fixture)
npd.dataframe.API_CALLS_MAX = 5
npd.dataframe.API_RATE_LIMIT = 0
function_to_apply = pd.Series.add
function_args = [1]
df = npd.DataFrame(data_set_id=data_set_id, client=MockQueryClient())
result_df = df.apply(function_to_apply, args=function_args)
new_func = pd.Series.pow
new_args = [2]
# at this point (val + 1) ** 2
result_df = result_df.apply(new_func, args=new_args)
assert result_df.loc[0]['level'] == (simple_query_fixture[0]['level'] + 1) ** 2
assert result_df.loc[0]['age'] == (simple_query_fixture[0]['age'] + 1) ** 2
assert len(result_df.apply_queue) == 2
assert result_df.apply_queue[0].function == function_to_apply
assert result_df.apply_queue[0].args == function_args
assert result_df.apply_queue[1].function == new_func
assert result_df.apply_queue[1].args == new_args
num_rows = 0
for row in result_df.itertuples():
# every mocked "api call" gives the same fixture back, so the
# values repeat when number of rows exceeds the length of the
# fixture (since we're making 5 "api calls" in this test)
idx = num_rows % len(simple_query_fixture)
assert row[0] == (simple_query_fixture[idx]['level'] + 1) ** 2
assert row[1] == (simple_query_fixture[idx]['age'] + 1) ** 2
num_rows += 1
assert num_rows == len(simple_query_fixture) * npd.dataframe.API_CALLS_MAX
def test_fillna(cls):
data = [
{'level': 11, 'age': 1},
{'level': None, 'age': 2},
{'level': None, 'age': None}
]
npd.dataframe.API_CALLS_MAX = 2
npd.dataframe.API_RATE_LIMIT = 0
fill_value = 0
df = npd.DataFrame(data_set_id=data_set_id, client=MockQueryClient(data))
df.fillna(value=fill_value, inplace=True)
rows = []
for row in df.itertuples():
rows.append(row)
assert rows[0][0] == data[0]['level']
assert rows[0][1] == data[0]['age']
assert rows[1][0] == fill_value
assert rows[1][1] == data[1]['age']
assert rows[2][0] == fill_value
assert rows[2][1] == fill_value
assert rows[3][0] == data[0]['level']
assert rows[3][1] == data[0]['age']
assert rows[4][0] == fill_value
assert rows[4][1] == data[1]['age']
assert rows[5][0] == fill_value
assert rows[5][1] == fill_value
def test_groupby(cls):
npd.dataframe.API_CALLS_MAX = 3
npd.dataframe.MAX_ROWS = 2
npd.dataframe.API_RATE_LIMIT = 0
df = npd.DataFrame(
data_set_id=data_set_id,
client=MockQueryClient(groupby_query_fixture, limit=npd.dataframe.MAX_ROWS)
)
# the init function calls the mock client once, so we need to reset it
# so values align for assertions
df.client.offset = 0
gp = df.groupby(by=['country'])
num_groups = 0
groups = []
for key, group in gp:
num_groups += 1
groups.append(group)
# there's 4 groups because 'us' group is split across api calls, a
# limitation of namara_pandas' groupby
assert num_groups == 4
assert all(groups[0].country.eq('canada'))
assert all(groups[1].country.eq('mexico'))
assert all(groups[2].country.eq('us'))
assert all(groups[3].country.eq('us'))
def test_merge(cls):
df1 = npd.DataFrame(data_set_id=data_set_id, client=MockQueryClient())
df2 = npd.DataFrame(data_set_id='other_id', client=MockQueryClient())
other_data_set_id = 'other_id'
join_on = 'level'
joined_df = df1.merge(df2, on=join_on)
actual_frame_query = joined_df.data_frame_query
assert actual_frame_query.join_type == 'inner join'
lhs, rhs = npd.dataframe.LEFT_SIDE, npd.dataframe.RIGHT_SIDE
assert actual_frame_query.join_from.lower() == \
"{} as {}".format(other_data_set_id, rhs).lower()
assert actual_frame_query.join_on.lower() == \
"{}.{} = {}.{}".format(lhs, join_on, rhs, join_on).lower()
assert actual_frame_query.select.lower() == \
"{}.level as level_x, {}.age as age_x, {}.level as level_y, {}.age as age_y".format(lhs, lhs, rhs, rhs).lower()
def test_aggregate(cls, monkeypatch):
''' All aggregate functions are based on a single function, we just test `count` here
'''
client = MockQueryClient()
def query_assertion_func(q):
assert q.lower().strip() == \
"select count(level) as level, count(age) as age from %s" % (data_set_id)
# return this because aggregate funcs are expecting a result back
return [{'level':1}]
with monkeypatch.context() as m:
df = npd.DataFrame(data_set_id=data_set_id, client=client)
m.setattr(client, 'query', query_assertion_func)
df._agg_query('count')