Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
namara-python / dataframe.py
Size: Mime:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

from pandas import DataFrame as PandaDataFrame
from pandas import Series as PandaSeries
from pandas.core.groupby import GroupBy as PandaDataFrameGroupBy
import pandas as pd

from namara_python.client import Client
from namara_python.query import Query as QueryClient
from namara_python.utils import listify

from namara_pandas.query import Query

import time
from collections import namedtuple

rows_50_query = Query('*', limit=50)
simple_count_query = Query("count(*)")

# the may be combined with other clauses
distinct_on_clause = "DISTINCT ON ({}) {}"
simple_distinct_clause = "DISTINCT {}"

MAX_ROWS = 1000
API_CALLS_MAX = 5
API_RATE_LIMIT = 0.5 # in seconds

LEFT_SIDE, RIGHT_SIDE = 'DS1', 'DS2'

AAC = '__nm_id' # Namara's always available columnn

Apply = namedtuple('Apply', 'function args kwargs result_type axis')


class DataFrame(PandaDataFrame):
    _metadata = ['full_count', 'data_frame_query', 'client', 'data_set_id', 'loaded']
    _internal_names = ['apply_queue']
    _internal_names_set = set(_internal_names)

    @property
    def _constructor(self):
        return DataFrame

    def __init__(self, *args, **kwargs):
        ''' Takes in a `data_set_id` keyword and will load the first 50 rows of
        that dataset. Essentially "registers" this dataframe to a Namara
        dataset.

        Provides a `full_count` property that can tell you how many
        rows are in the full dataset.
        '''
        self.loaded = False
        self.client = None
        self.full_count = None
        self.apply_queue = []

        data_set_id = kwargs.pop('data_set_id', None)
        loaded = kwargs.pop('loaded', False)
        data_frame_query = kwargs.pop('data_frame_query', None)

        load_frame = self._check_frame(data_set_id, loaded, data_frame_query)
        if load_frame:
            client = kwargs.pop('client', None)
            if not client:
                client = QueryClient(Client())

            if not data_frame_query:
                data_frame_query = rows_50_query.extend(from_id=data_set_id)

            kwargs['data'] = DataFrame._load(client, data_frame_query.compile())

            self.full_count = DataFrame._simple_count(client, data_set_id)
            self.loaded = True
            self.client = client

        super().__init__(*args, **kwargs)

        self.original_columns = set(self.columns)
        self.data_set_id = data_set_id
        self.data_set_ids = set([data_set_id])

        self.data_frame_query = data_frame_query

    def append(self, *args, **kwargs):
        ''' Maybe take in a dataset id
        '''
        data_set_id = kwargs.pop('data_set_id', None)

        if data_set_id is None:
            return super().append(*args, **kwargs)

        query = rows_50_query.compile(from_id=data_set_id)
        new_data_set = DataFrame._load(self.client, query)

        self.data_set_ids.add(data_set_id)

        return super().append(new_data_set, **kwargs)

    def drop(self, *args, **kwargs):
        ''' Drop columns from dataframe. These changes affect what column can
        gets queried from namara. Support what default pandas supports
        '''
        axis = kwargs.get('axis')
        columns = kwargs.get('columns')
        inplace = kwargs.get('inplace')

        if not inplace:
            return super().drop(*args, **kwargs)

        if axis == 1:
            columns_to_drop = args[0]
        elif columns:
            columns_to_drop = columns

        # here `args[0]` is the `labels` arg, which can be a list of strings or
        # just a string. Same with `columns`
        if type(columns_to_drop) is list:
            self.original_columns -= set(columns_to_drop)
        else:
            self.original_columns -= set([columns_to_drop])

        return super().drop(*args, **kwargs)

    def itertuples(self, index=True, name='Pandas'):
        return self._itertuples(index, name)

    def _itertuples(self, index=True, name='Pandas'):
        ''' Lets you iterate over all rows in the dataset; each row is
        represented as tuple. Unlike regular pandas, each tuple is not a named
        tuple. Meaning you can't do something like `row.Title` but instead have
        to do `row[3]` to actually pull out the column values in each row.

        Since this is still in beta, there are limiations to how many rows you
        can pull from the api (currently limited to 5 api calls, you can override this however)
        '''
        for row_offset in range(0, API_CALLS_MAX * MAX_ROWS, MAX_ROWS):
           #  query = self.data_frame_query + " ORDER BY {} LIMIT {} OFFSET {}".format(AAC, MAX_ROWS, row_offset)
           args = {'limit': MAX_ROWS, 'offset': row_offset}
           if not self.data_frame_query.order_by: args['order_by'] = AAC

           query = self.data_frame_query.compile(**args)

           rows = self.client.query(query)

           for row in rows:
               # TODO: take into account `index` and `name` params for
               # creating tuples
               row_applied = pd.Series(row)
               for apply in self.apply_queue:
                   if 'inplace' in apply.kwargs:
                       apply.function.__call__(row_applied, *apply.args, **apply.kwargs)
                   else:
                       row_applied = apply.function.__call__(row_applied, *apply.args, **apply.kwargs)

               yield tuple(row_applied)

           time.sleep(API_RATE_LIMIT)

    def apply(self, func, axis=0, result_type=None, args=(), **kwargs):
        ''' Will return a dataframe with the applied `func` function. Will also
        lazily apply the function to the full dataset during iteration. The
        result_type and axis will always be `broadcast` and 1 respectively.
        This means that it can only work on row by row basis and the function
        must work on panda.Series object.

        The passed in `func` gets added to a queue which gets passed along to
        pandas manipulations. This allows for chaining `apply`s together that
        will get evaluated lazily during iteration of the full dataset
        '''
        # For now, fix this value to 'broadcast' (ie. same shape as original)
        result_type = 'broadcast'
        # For now, fix this value to apply row wise only
        axis = 1

        result_frame = super().apply(func, args=args, result_type=result_type, axis=axis, **kwargs)

        new_apply = Apply(func, args, kwargs, result_type, axis)
        result_frame._inherit_frame(self)
        result_frame.apply_queue.append(new_apply)

        return result_frame

    def drop_duplicates(self, subset=None, keep='first', inplace=False):
        ''' Doesn't work right now because of a limitation with Namara's NiQL
        '''
        columns = ', '.join(list(self.columns))
        if subset:
            distinct_cols = ', '.join(listify(subset))
            distinct_clause = distinct_on_clause.format(distinct_cols, columns)
            order_by_cols = subset
        else:
            order_by_cols = AAC
            distinct_clause = simple_distinct_clause.format(columns)

        query = Query(distinct_clause, from_id=self.data_set_id, order_by=order_by_cols)
        #  query = "SELECT {} FROM {} ORDER BY {}".format(distinct_clause, '{}', order_by_cols)

        if inplace:
            self.data_frame_query = query
            return super().drop_duplicates(subset=subset, keep=keep, inplace=inplace)
        else:
            return_frame = super().drop_duplicates(subset=subset, keep=keep, inplace=inplace)
            return_frame.data_frame_query = query
            return return_frame

    def fillna(self, *args, **kwargs):
        ''' Will return a dataframe with filled in null values. Will also
        lazily fill in values during iteration. Supports regular pandas args
        '''
        new_apply = Apply(PandaSeries.fillna, args, kwargs, 'broadcast', None)

        if kwargs.get('inplace', False):
            self.apply_queue.append(new_apply)
            return super().fillna(*args, **kwargs)
        else:
            result_frame = super().fillna(*args, **kwargs)

            result_frame.apply_queue.append(new_apply)
            return result_frame

    def groupby(self, *args, **kwargs):
        ''' Does a group by on certain column values using NiQL. Must pass in
        `by` keyword, otherwise will resort to standard pandas groupby.

        Read documentation for namara_pandas.Groupby for more details
        '''
        by = kwargs.pop('by')

        if not by:
            return super().groupby(*args, **kwargs)

        if type(by) is not list:
            by = [by]

        if not set(by).issubset(set(self.columns)):
            raise TypeError('%s not in this dataset' % by)

        kwargs['data_set_id'] = self.data_set_id
        kwargs['client'] = self.client
        kwargs['full_count'] = self.full_count

        return DataFrameGroupBy(self,  by, **kwargs)

    def merge(self, right=None, how='inner', on=None, left_on=None, right_on=None, left_index=False,
              right_index=False, sort=False, suffixes=('_x', '_y'), copy=True, indicator=False, validate=None, **kwargs):
        ''' Performs a sql join on with a specified `data_set_id` or `right` must
        be namara_pandas.DataFrame. Does not work with non-namara dataframes.
        Supports `right`, `how`, `on`, `left_on`, `right_on` keywords, others are ignored.
        '''
        if (not isinstance(right, DataFrame)):
            raise AttributeError("a Namara DataFrame needs to be passed in")

        from_clause = "%s AS %s" % (self.data_set_id, LEFT_SIDE)
        join_type = how + ' join'
        on = self._join_columns(on, left_on, right_on)
        select_columns = self._join_select(self.columns, right.columns, suffixes)

        join_from = "%s AS %s" % (right.data_set_id, RIGHT_SIDE)

        frame_query = Query(', '.join(select_columns),
                            from_id=from_clause,
                            join_type=join_type,
                            join_from=join_from,
                            join_on=on)

        return DataFrame(
            data_set_id=self.data_set_id,
            data_frame_query=frame_query,
            client=self.client
        )

    def count(self):
        return self._agg_query('count')

    def sum(self):
        return self._agg_query('sum')

    def mean(self):
        return self._agg_query('avg')

    def max(self):
        return self._agg_query('max')

    def min(self):
        return self._agg_query('min')

    def _agg_query(self, agg):
        '''
        `agg` is the type of SQL aggregate being performed (count, sum, max, avg)

        Only does aggregation per column bases. The pandas version of this
        methods supports several parameters, however this function only
        supports one use case. axis=0, numeric_only=True, other params are
        not allowed.
        '''
        numeric_cols = [col for col, dtype in self.dtypes.items() if dtype in ['float', 'int']]

        select_clause = ["%s(%s) as %s"%(agg, col, col) for col in numeric_cols]
        select_clause = ', '.join(select_clause)

        q = Query(select=select_clause, from_id=self.data_set_id)

        agg_resp = self.client.query(q.compile())

        return PandaSeries(agg_resp[0])

    def _join_columns(self, on=None, left_on=None, right_on=None):
        on, left_on, right_on = listify(on), listify(left_on), listify(right_on)

        if (left_on and not right_on) or (not left_on and right_on):
            raise ValueError("Either, both 'right_on' and 'left_on' must be provided, or 'on'")
        elif not(left_on and right_on) and (not on):
            raise ValueError("Either, both 'right_on' and 'left_on' must be provided, or 'on'")

        if (left_on and right_on) and (len(left_on) != len(right_on)):
            raise ValueError("Length of both 'left_on' and 'right_on' must be the same")

        join_on_str = []
        if on:
            for column in on:
                join_on_str.append("%s.%s = %s.%s" %(LEFT_SIDE, column, RIGHT_SIDE, column))
        else:
            for cols in zip(left_on, right_on):
                join_on_str.append("%s.%s = %s.%s" %(LEFT_SIDE, cols[0], RIGHT_SIDE, cols[1]))


        return ' AND '.join(join_on_str)

    def _join_select(self, ds1_cols, ds2_cols, suffixes):
        left_side = [LEFT_SIDE + '.' + col + ' AS ' + col+suffixes[0] for col in ds1_cols]
        right_side = [RIGHT_SIDE + '.' + col + ' AS ' + col+suffixes[1] for col in ds2_cols]

        return left_side + right_side


    @staticmethod
    def _simple_count(client, data_set_id):
        ''' Counts the total number of rows in a dataset, does not take into consideration null values in a column.
        '''
        query = simple_count_query.compile(from_id=data_set_id)
        resp = client.query(query)
        return resp[0]["count"]

    @staticmethod
    def _load(client, query):
        ''' Fetches the first 50 rows from the dataset `id` from Namara
        '''
        data_set = client.query(query)

        #  return self._decor_list_of_dict_to_pd(data_set)
        return data_set

    def _load_full(self, query, client, id):
        ''' Fetches all the rows for a dataset up to maximum number of api
        calls (API_CALLS_MAX). Each call is limited to MAX_ROWS rows.

        `query` must have formatting params for limit and offset, these values
        have to be controlled by this function.
        '''
        api_calls_to_make = (self.full_count // MAX_ROWS) + 1

        if api_calls_to_make > API_CALLS_MAX:
            print("WARNING: Making too many API calls, limiting to %d" % API_CALLS_MAX)

        data_set = []
        for row_offset in range(0, API_CALLS_MAX * MAX_ROWS, MAX_ROWS):
           query = query.format(MAX_ROWS, row_offset)

           rows = client.query(query)
           data_set += rows
           time.sleep(API_RATE_LIMIT)

        return data_set

    def _check_frame(self, data_set_id, loaded, data_frame_query):
        return (data_set_id or data_frame_query) and not (self.loaded or loaded)

    def _validate(self, obj):
        # verify pandas obj
        if 'id' not in obj.columns:
            raise AttributeError("Cannot find id from namara pandas object")

    def _decor_list_of_dict_to_pd(self, list_of_dict):
        return DataFrame(list_of_dict)

    def _inherit_frame(self, parent_frame):
        self.data_set_id = parent_frame.data_set_id
        self.data_frame_query = parent_frame.data_frame_query
        self.client = parent_frame.client
        self.apply_queue = parent_frame.apply_queue


class DataFrameGroupBy(PandaDataFrameGroupBy):
    def __init__(self, obj, by, **kwargs):
        self.data_set_id = kwargs.pop('data_set_id')
        self.client = kwargs.pop('client')
        self.full_count = kwargs.pop('full_count')
        super().__init__(obj, by, **kwargs)

    def __iter__(self):
        return self.get_iterator()

    def get_iterator(self):
        ''' Generator function for iterating over groupby objects. Based of how
        Pandas internally does, with added functionality for pulling in data
        from Namara.

        LIMITATION: if a group is very large, ie larger than MAX_ROWS, than
        that group will be split up into multiple Panda groups. Which means if
        you're iterating over groups, then you have to take into account; cant
        assume each iteration will be a different group.
        '''
        max_results = min(self.full_count, API_CALLS_MAX * MAX_ROWS)
        for row_offset in range(0, max_results, MAX_ROWS):
           query = rows_50_query.compile(
               from_id=self.data_set_id,
               order_by=', '.join(self.keys),
               limit=MAX_ROWS,
               offset=row_offset)

           rows = self.client.query(query)
           frame = pd.DataFrame(rows)

           frame.index = pd.RangeIndex(row_offset, row_offset + min(len(rows), MAX_ROWS))

           gp = PandaDataFrameGroupBy(frame, self.keys)

           for key, group in gp:
               yield key, group

           time.sleep(API_RATE_LIMIT)

    def count(self, *args, **kwargs):
        columns = ', '.join(self.keys)
        query = "select count(*), %s from %s group by %s" %(columns, self.data_set_id, columns)

        count_resp = self.client.query(query)
        column_list = [[] for i in range(len(self.keys))]
        counts = []

        for item in count_resp:
            for i, col in enumerate(self.keys):
                column_list[i].append(item[col])

            counts.append(item['count'])

        if len(self.keys) == 1:
            index = pd.Index(column_list[0], names=self.keys)
        else:
            index = pd.MultiIndex.from_arrays(column_list, names=self.keys)

        return pd.DataFrame(index=index, columns=['count'], data=counts)