Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
namara-python / tests / test_dataframe.py
Size: Mime:
import pytest
import re
import pandas as pd

import namara_pandas as npd

simple_query_fixture = [
    {'level':11, 'age': 1},
    {'level':22, 'age': 2},
    {'level':33, 'age': 3},
    {'level':44, 'age': 4},
    {'level':55, 'age': 5},
    {'level':66, 'age': 6},
]

groupby_query_fixture = [
    {'country':'canada', 'city': 'ottawa'},
    {'country':'canada', 'city': 'toronto'},
    {'country':'mexico', 'city': 'mexico city'},
    {'country':'us',     'city': 'nyc'},
    {'country':'us',     'city': 'nyc'},
]
count_query_fixture = [{'count': 50}]

data_set_id = 'some id'

class MockQueryClient:
    def __init__(self, data=simple_query_fixture, limit=None, offset=0):
        self.data = data
        self.limit = limit
        self.offset = 0

    def query(self, stmt):
        if re.search('select count', stmt.lower()):
            return count_query_fixture

        # a basic implementation of limit and offset
        if self.limit is None:
            return self.data

        upper = self.offset + self.limit
        if upper > len(self.data):
            upper = len(self.data)
            segment = self.data[self.offset: upper]
            self.offset = len(self.data)
            self.limit = 0
        else:
            segment = self.data[self.offset: upper]

        if upper < len(self.data):
            self.offset = upper

        return segment

class TestNamaraDataFrame:
    def teardown_method(cls):
        # return these to their original values
        npd.dataframe.MAX_ROWS = 1000
        npd.dataframe.API_CALLS_MAX = 5
        npd.dataframe.API_RATE_LIMIT = 0.5

    def test_init(cls):
        mock_client = MockQueryClient()

        df = npd.DataFrame(data_set_id=data_set_id, client=mock_client)

        assert isinstance(df, npd.DataFrame)
        assert isinstance(df, pd.DataFrame)
        assert df.data_set_id == data_set_id
        assert df.full_count == count_query_fixture[0]['count']

        assert isinstance(df.data_frame_query, npd.query.Query)
        assert df.data_frame_query.from_id == data_set_id

        assert len(df.apply_queue) == 0

    def test_itertuples(cls):
        npd.dataframe.MAX_ROWS = len(simple_query_fixture)
        npd.dataframe.API_CALLS_MAX = 5
        npd.dataframe.API_RATE_LIMIT = 0

        df = npd.DataFrame(data_set_id=data_set_id, client=MockQueryClient())

        rows = []

        for row in df.itertuples():
            rows.append(row)

        assert len(rows) == len(simple_query_fixture) * npd.dataframe.API_CALLS_MAX
        for row in rows:
            assert isinstance(row, tuple)

    def test_apply(cls):
        # a simple enough function to test Apply with
        function_to_apply = pd.Series.eq
        function_args = [True]

        df = npd.DataFrame(data_set_id=data_set_id, client=MockQueryClient())
        result_df = df.apply(function_to_apply, args=function_args)

        # just make sure its actually applying it to the dataframe
        assert result_df.loc[0]['level'] == False
        assert result_df.loc[0]['age'] == True

        assert len(result_df.apply_queue) == 1
        assert result_df.apply_queue[0].function == function_to_apply
        assert result_df.apply_queue[0].args == function_args
        assert result_df.apply_queue[0].kwargs == {}
        assert result_df.apply_queue[0].axis == 1
        assert result_df.apply_queue[0].result_type == 'broadcast'

    def test_apply_on_iteration(cls):
        npd.dataframe.MAX_ROWS = len(simple_query_fixture)
        npd.dataframe.API_CALLS_MAX = 5
        npd.dataframe.API_RATE_LIMIT = 0

        function_to_apply = pd.Series.eq
        function_args = [True]

        df = npd.DataFrame(data_set_id=data_set_id, client=MockQueryClient())
        result_df = df.apply(function_to_apply, args=function_args)

        num_rows = 0
        for row in result_df.itertuples():
            num_rows += 1
            for cell in row:
                assert isinstance(cell, bool)

        assert num_rows == len(simple_query_fixture) * npd.dataframe.API_CALLS_MAX

    def test_apply_multiple_functions(cls):
        npd.dataframe.MAX_ROWS = len(simple_query_fixture)
        npd.dataframe.API_CALLS_MAX = 5
        npd.dataframe.API_RATE_LIMIT = 0

        function_to_apply = pd.Series.add
        function_args = [1]

        df = npd.DataFrame(data_set_id=data_set_id, client=MockQueryClient())
        result_df = df.apply(function_to_apply, args=function_args)

        new_func = pd.Series.pow
        new_args = [2]
        # at this point (val + 1) ** 2
        result_df = result_df.apply(new_func, args=new_args)

        assert result_df.loc[0]['level'] == (simple_query_fixture[0]['level'] + 1) ** 2
        assert result_df.loc[0]['age'] == (simple_query_fixture[0]['age'] + 1) ** 2

        assert len(result_df.apply_queue) == 2
        assert result_df.apply_queue[0].function == function_to_apply
        assert result_df.apply_queue[0].args == function_args
        assert result_df.apply_queue[1].function == new_func
        assert result_df.apply_queue[1].args == new_args

        num_rows = 0
        for row in result_df.itertuples():
            # every mocked "api call" gives the same fixture back, so the
            # values repeat when number of rows exceeds the length of the
            # fixture (since we're making 5 "api calls" in this test)
            idx = num_rows % len(simple_query_fixture)
            assert row[0] == (simple_query_fixture[idx]['level'] + 1) ** 2
            assert row[1] == (simple_query_fixture[idx]['age'] + 1) ** 2
            num_rows += 1

        assert num_rows == len(simple_query_fixture) * npd.dataframe.API_CALLS_MAX

    def test_fillna(cls):
        data = [
            {'level': 11, 'age': 1},
            {'level': None, 'age': 2},
            {'level': None, 'age': None}
        ]

        npd.dataframe.API_CALLS_MAX = 2
        npd.dataframe.API_RATE_LIMIT = 0

        fill_value = 0
        df = npd.DataFrame(data_set_id=data_set_id, client=MockQueryClient(data))
        df.fillna(value=fill_value, inplace=True)

        rows = []
        for row in df.itertuples():
            rows.append(row)

        assert rows[0][0] == data[0]['level']
        assert rows[0][1] == data[0]['age']

        assert rows[1][0] == fill_value
        assert rows[1][1] == data[1]['age']

        assert rows[2][0] == fill_value
        assert rows[2][1] == fill_value

        assert rows[3][0] == data[0]['level']
        assert rows[3][1] == data[0]['age']

        assert rows[4][0] == fill_value
        assert rows[4][1] == data[1]['age']

        assert rows[5][0] == fill_value
        assert rows[5][1] == fill_value

    def test_groupby(cls):
        npd.dataframe.API_CALLS_MAX = 3
        npd.dataframe.MAX_ROWS = 2
        npd.dataframe.API_RATE_LIMIT = 0
        df = npd.DataFrame(
            data_set_id=data_set_id,
            client=MockQueryClient(groupby_query_fixture, limit=npd.dataframe.MAX_ROWS)
        )
        # the init function calls the mock client once, so we need to reset it
        # so values align for assertions
        df.client.offset = 0

        gp = df.groupby(by=['country'])

        num_groups = 0
        groups = []
        for key, group in gp:
            num_groups += 1
            groups.append(group)

        # there's 4 groups because 'us' group is split across api calls, a
        # limitation of namara_pandas' groupby
        assert num_groups == 4

        assert all(groups[0].country.eq('canada'))
        assert all(groups[1].country.eq('mexico'))
        assert all(groups[2].country.eq('us'))
        assert all(groups[3].country.eq('us'))

    def test_merge(cls):
        df1 = npd.DataFrame(data_set_id=data_set_id, client=MockQueryClient())
        df2 = npd.DataFrame(data_set_id='other_id', client=MockQueryClient())

        other_data_set_id = 'other_id'
        join_on = 'level'
        joined_df = df1.merge(df2, on=join_on)

        actual_frame_query = joined_df.data_frame_query

        assert actual_frame_query.join_type == 'inner join'

        lhs, rhs = npd.dataframe.LEFT_SIDE, npd.dataframe.RIGHT_SIDE

        assert actual_frame_query.join_from.lower() == \
                "{} as {}".format(other_data_set_id, rhs).lower()

        assert actual_frame_query.join_on.lower() == \
                "{}.{} = {}.{}".format(lhs, join_on, rhs, join_on).lower()

        assert actual_frame_query.select.lower() == \
                "{}.level as level_x, {}.age as age_x, {}.level as level_y, {}.age as age_y".format(lhs, lhs, rhs, rhs).lower()

    def test_aggregate(cls, monkeypatch):
        ''' All aggregate functions are based on a single function, we just test `count` here
        '''
        client = MockQueryClient()
        def query_assertion_func(q):
            assert q.lower().strip() == \
                    "select count(level) as level, count(age) as age from %s" % (data_set_id)

            # return this because aggregate funcs are expecting a result back
            return [{'level':1}]

        with monkeypatch.context() as m:
            df = npd.DataFrame(data_set_id=data_set_id, client=client)

            m.setattr(client, 'query', query_assertion_func)
            df._agg_query('count')