Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / pandas   python

Repository URL to install this package:

Version: 0.24.2 

/ tests / groupby / test_apply.py

from datetime import datetime

import numpy as np
import pytest

import pandas as pd
from pandas import DataFrame, Index, MultiIndex, Series, bdate_range, compat
from pandas.util import testing as tm


def test_apply_issues():
        # GH 5788

    s = """2011.05.16,00:00,1.40893
2011.05.16,01:00,1.40760
2011.05.16,02:00,1.40750
2011.05.16,03:00,1.40649
2011.05.17,02:00,1.40893
2011.05.17,03:00,1.40760
2011.05.17,04:00,1.40750
2011.05.17,05:00,1.40649
2011.05.18,02:00,1.40893
2011.05.18,03:00,1.40760
2011.05.18,04:00,1.40750
2011.05.18,05:00,1.40649"""

    df = pd.read_csv(
        compat.StringIO(s), header=None, names=['date', 'time', 'value'],
        parse_dates=[['date', 'time']])
    df = df.set_index('date_time')

    expected = df.groupby(df.index.date).idxmax()
    result = df.groupby(df.index.date).apply(lambda x: x.idxmax())
    tm.assert_frame_equal(result, expected)

    # GH 5789
    # don't auto coerce dates
    df = pd.read_csv(
        compat.StringIO(s), header=None, names=['date', 'time', 'value'])
    exp_idx = pd.Index(
        ['2011.05.16', '2011.05.17', '2011.05.18'
         ], dtype=object, name='date')
    expected = Series(['00:00', '02:00', '02:00'], index=exp_idx)
    result = df.groupby('date').apply(
        lambda x: x['time'][x['value'].idxmax()])
    tm.assert_series_equal(result, expected)


def test_apply_trivial():
    # GH 20066
    # trivial apply: ignore input and return a constant dataframe.
    df = pd.DataFrame({'key': ['a', 'a', 'b', 'b', 'a'],
                       'data': [1.0, 2.0, 3.0, 4.0, 5.0]},
                      columns=['key', 'data'])
    expected = pd.concat([df.iloc[1:], df.iloc[1:]],
                         axis=1, keys=['float64', 'object'])
    result = df.groupby([str(x) for x in df.dtypes],
                        axis=1).apply(lambda x: df.iloc[1:])

    tm.assert_frame_equal(result, expected)


@pytest.mark.xfail(reason="GH#20066; function passed into apply "
                          "returns a DataFrame with the same index "
                          "as the one to create GroupBy object.")
def test_apply_trivial_fail():
    # GH 20066
    # trivial apply fails if the constant dataframe has the same index
    # with the one used to create GroupBy object.
    df = pd.DataFrame({'key': ['a', 'a', 'b', 'b', 'a'],
                       'data': [1.0, 2.0, 3.0, 4.0, 5.0]},
                      columns=['key', 'data'])
    expected = pd.concat([df, df],
                         axis=1, keys=['float64', 'object'])
    result = df.groupby([str(x) for x in df.dtypes],
                        axis=1).apply(lambda x: df)

    tm.assert_frame_equal(result, expected)


def test_fast_apply():
    # make sure that fast apply is correctly called
    # rather than raising any kind of error
    # otherwise the python path will be callsed
    # which slows things down
    N = 1000
    labels = np.random.randint(0, 2000, size=N)
    labels2 = np.random.randint(0, 3, size=N)
    df = DataFrame({'key': labels,
                    'key2': labels2,
                    'value1': np.random.randn(N),
                    'value2': ['foo', 'bar', 'baz', 'qux'] * (N // 4)})

    def f(g):
        return 1

    g = df.groupby(['key', 'key2'])

    grouper = g.grouper

    splitter = grouper._get_splitter(g._selected_obj, axis=g.axis)
    group_keys = grouper._get_group_keys()

    values, mutated = splitter.fast_apply(f, group_keys)
    assert not mutated


def test_apply_with_mixed_dtype():
    # GH3480, apply with mixed dtype on axis=1 breaks in 0.11
    df = DataFrame({'foo1': np.random.randn(6),
                    'foo2': ['one', 'two', 'two', 'three', 'one', 'two']})
    result = df.apply(lambda x: x, axis=1)
    tm.assert_series_equal(df.get_dtype_counts(), result.get_dtype_counts())

    # GH 3610 incorrect dtype conversion with as_index=False
    df = DataFrame({"c1": [1, 2, 6, 6, 8]})
    df["c2"] = df.c1 / 2.0
    result1 = df.groupby("c2").mean().reset_index().c2
    result2 = df.groupby("c2", as_index=False).mean().c2
    tm.assert_series_equal(result1, result2)


def test_groupby_as_index_apply(df):
    # GH #4648 and #3417
    df = DataFrame({'item_id': ['b', 'b', 'a', 'c', 'a', 'b'],
                    'user_id': [1, 2, 1, 1, 3, 1],
                    'time': range(6)})

    g_as = df.groupby('user_id', as_index=True)
    g_not_as = df.groupby('user_id', as_index=False)

    res_as = g_as.head(2).index
    res_not_as = g_not_as.head(2).index
    exp = Index([0, 1, 2, 4])
    tm.assert_index_equal(res_as, exp)
    tm.assert_index_equal(res_not_as, exp)

    res_as_apply = g_as.apply(lambda x: x.head(2)).index
    res_not_as_apply = g_not_as.apply(lambda x: x.head(2)).index

    # apply doesn't maintain the original ordering
    # changed in GH5610 as the as_index=False returns a MI here
    exp_not_as_apply = MultiIndex.from_tuples([(0, 0), (0, 2), (1, 1), (
        2, 4)])
    tp = [(1, 0), (1, 2), (2, 1), (3, 4)]
    exp_as_apply = MultiIndex.from_tuples(tp, names=['user_id', None])

    tm.assert_index_equal(res_as_apply, exp_as_apply)
    tm.assert_index_equal(res_not_as_apply, exp_not_as_apply)

    ind = Index(list('abcde'))
    df = DataFrame([[1, 2], [2, 3], [1, 4], [1, 5], [2, 6]], index=ind)
    res = df.groupby(0, as_index=False).apply(lambda x: x).index
    tm.assert_index_equal(res, ind)


def test_apply_concat_preserve_names(three_group):
    grouped = three_group.groupby(['A', 'B'])

    def desc(group):
        result = group.describe()
        result.index.name = 'stat'
        return result

    def desc2(group):
        result = group.describe()
        result.index.name = 'stat'
        result = result[:len(group)]
        # weirdo
        return result

    def desc3(group):
        result = group.describe()

        # names are different
        result.index.name = 'stat_%d' % len(group)

        result = result[:len(group)]
        # weirdo
        return result

    result = grouped.apply(desc)
    assert result.index.names == ('A', 'B', 'stat')

    result2 = grouped.apply(desc2)
    assert result2.index.names == ('A', 'B', 'stat')

    result3 = grouped.apply(desc3)
    assert result3.index.names == ('A', 'B', None)


def test_apply_series_to_frame():
    def f(piece):
        with np.errstate(invalid='ignore'):
            logged = np.log(piece)
        return DataFrame({'value': piece,
                          'demeaned': piece - piece.mean(),
                          'logged': logged})

    dr = bdate_range('1/1/2000', periods=100)
    ts = Series(np.random.randn(100), index=dr)

    grouped = ts.groupby(lambda x: x.month)
    result = grouped.apply(f)

    assert isinstance(result, DataFrame)
    tm.assert_index_equal(result.index, ts.index)


def test_apply_series_yield_constant(df):
    result = df.groupby(['A', 'B'])['C'].apply(len)
    assert result.index.names[:2] == ('A', 'B')


def test_apply_frame_yield_constant(df):
    # GH13568
    result = df.groupby(['A', 'B']).apply(len)
    assert isinstance(result, Series)
    assert result.name is None

    result = df.groupby(['A', 'B'])[['C', 'D']].apply(len)
    assert isinstance(result, Series)
    assert result.name is None


def test_apply_frame_to_series(df):
    grouped = df.groupby(['A', 'B'])
    result = grouped.apply(len)
    expected = grouped.count()['C']
    tm.assert_index_equal(result.index, expected.index)
    tm.assert_numpy_array_equal(result.values, expected.values)


def test_apply_frame_concat_series():
    def trans(group):
        return group.groupby('B')['C'].sum().sort_values()[:2]

    def trans2(group):
        grouped = group.groupby(df.reindex(group.index)['B'])
        return grouped.sum().sort_values()[:2]

    df = DataFrame({'A': np.random.randint(0, 5, 1000),
                    'B': np.random.randint(0, 5, 1000),
                    'C': np.random.randn(1000)})

    result = df.groupby('A').apply(trans)
    exp = df.groupby('A')['C'].apply(trans2)
    tm.assert_series_equal(result, exp, check_names=False)
    assert result.name == 'C'


def test_apply_transform(ts):
    grouped = ts.groupby(lambda x: x.month)
    result = grouped.apply(lambda x: x * 2)
    expected = grouped.transform(lambda x: x * 2)
    tm.assert_series_equal(result, expected)


def test_apply_multikey_corner(tsframe):
    grouped = tsframe.groupby([lambda x: x.year, lambda x: x.month])

    def f(group):
        return group.sort_values('A')[-5:]

    result = grouped.apply(f)
    for key, group in grouped:
        tm.assert_frame_equal(result.loc[key], f(group))


def test_apply_chunk_view():
    # Low level tinkering could be unsafe, make sure not
    df = DataFrame({'key': [1, 1, 1, 2, 2, 2, 3, 3, 3],
                    'value': compat.lrange(9)})

    result = df.groupby('key', group_keys=False).apply(lambda x: x[:2])
    expected = df.take([0, 1, 3, 4, 6, 7])
    tm.assert_frame_equal(result, expected)


def test_apply_no_name_column_conflict():
    df = DataFrame({'name': [1, 1, 1, 1, 1, 1, 2, 2, 2, 2],
                    'name2': [0, 0, 0, 1, 1, 1, 0, 0, 1, 1],
                    'value': compat.lrange(10)[::-1]})

    # it works! #2605
    grouped = df.groupby(['name', 'name2'])
    grouped.apply(lambda x: x.sort_values('value', inplace=True))


def test_apply_typecast_fail():
    df = DataFrame({'d': [1., 1., 1., 2., 2., 2.],
                    'c': np.tile(
                        ['a', 'b', 'c'], 2),
                    'v': np.arange(1., 7.)})

    def f(group):
        v = group['v']
        group['v2'] = (v - v.min()) / (v.max() - v.min())
        return group

    result = df.groupby('d').apply(f)

    expected = df.copy()
    expected['v2'] = np.tile([0., 0.5, 1], 2)

    tm.assert_frame_equal(result, expected)


def test_apply_multiindex_fail():
    index = MultiIndex.from_arrays([[0, 0, 0, 1, 1, 1], [1, 2, 3, 1, 2, 3]
                                    ])
    df = DataFrame({'d': [1., 1., 1., 2., 2., 2.],
                    'c': np.tile(['a', 'b', 'c'], 2),
                    'v': np.arange(1., 7.)}, index=index)

    def f(group):
        v = group['v']
        group['v2'] = (v - v.min()) / (v.max() - v.min())
        return group

    result = df.groupby('d').apply(f)

    expected = df.copy()
    expected['v2'] = np.tile([0., 0.5, 1], 2)

    tm.assert_frame_equal(result, expected)


def test_apply_corner(tsframe):
    result = tsframe.groupby(lambda x: x.year).apply(lambda x: x * 2)
    expected = tsframe * 2
    tm.assert_frame_equal(result, expected)


def test_apply_without_copy():
    # GH 5545
    # returning a non-copy in an applied function fails

    data = DataFrame({'id_field': [100, 100, 200, 300],
                      'category': ['a', 'b', 'c', 'c'],
                      'value': [1, 2, 3, 4]})

    def filt1(x):
        if x.shape[0] == 1:
            return x.copy()
Loading ...