from datetime import datetime
import numpy as np
import pytest
import pandas as pd
from pandas import DataFrame, Index, MultiIndex, Series, bdate_range, compat
from pandas.util import testing as tm
def test_apply_issues():
# GH 5788
s = """2011.05.16,00:00,1.40893
2011.05.16,01:00,1.40760
2011.05.16,02:00,1.40750
2011.05.16,03:00,1.40649
2011.05.17,02:00,1.40893
2011.05.17,03:00,1.40760
2011.05.17,04:00,1.40750
2011.05.17,05:00,1.40649
2011.05.18,02:00,1.40893
2011.05.18,03:00,1.40760
2011.05.18,04:00,1.40750
2011.05.18,05:00,1.40649"""
df = pd.read_csv(
compat.StringIO(s), header=None, names=['date', 'time', 'value'],
parse_dates=[['date', 'time']])
df = df.set_index('date_time')
expected = df.groupby(df.index.date).idxmax()
result = df.groupby(df.index.date).apply(lambda x: x.idxmax())
tm.assert_frame_equal(result, expected)
# GH 5789
# don't auto coerce dates
df = pd.read_csv(
compat.StringIO(s), header=None, names=['date', 'time', 'value'])
exp_idx = pd.Index(
['2011.05.16', '2011.05.17', '2011.05.18'
], dtype=object, name='date')
expected = Series(['00:00', '02:00', '02:00'], index=exp_idx)
result = df.groupby('date').apply(
lambda x: x['time'][x['value'].idxmax()])
tm.assert_series_equal(result, expected)
def test_apply_trivial():
# GH 20066
# trivial apply: ignore input and return a constant dataframe.
df = pd.DataFrame({'key': ['a', 'a', 'b', 'b', 'a'],
'data': [1.0, 2.0, 3.0, 4.0, 5.0]},
columns=['key', 'data'])
expected = pd.concat([df.iloc[1:], df.iloc[1:]],
axis=1, keys=['float64', 'object'])
result = df.groupby([str(x) for x in df.dtypes],
axis=1).apply(lambda x: df.iloc[1:])
tm.assert_frame_equal(result, expected)
@pytest.mark.xfail(reason="GH#20066; function passed into apply "
"returns a DataFrame with the same index "
"as the one to create GroupBy object.")
def test_apply_trivial_fail():
# GH 20066
# trivial apply fails if the constant dataframe has the same index
# with the one used to create GroupBy object.
df = pd.DataFrame({'key': ['a', 'a', 'b', 'b', 'a'],
'data': [1.0, 2.0, 3.0, 4.0, 5.0]},
columns=['key', 'data'])
expected = pd.concat([df, df],
axis=1, keys=['float64', 'object'])
result = df.groupby([str(x) for x in df.dtypes],
axis=1).apply(lambda x: df)
tm.assert_frame_equal(result, expected)
def test_fast_apply():
# make sure that fast apply is correctly called
# rather than raising any kind of error
# otherwise the python path will be callsed
# which slows things down
N = 1000
labels = np.random.randint(0, 2000, size=N)
labels2 = np.random.randint(0, 3, size=N)
df = DataFrame({'key': labels,
'key2': labels2,
'value1': np.random.randn(N),
'value2': ['foo', 'bar', 'baz', 'qux'] * (N // 4)})
def f(g):
return 1
g = df.groupby(['key', 'key2'])
grouper = g.grouper
splitter = grouper._get_splitter(g._selected_obj, axis=g.axis)
group_keys = grouper._get_group_keys()
values, mutated = splitter.fast_apply(f, group_keys)
assert not mutated
def test_apply_with_mixed_dtype():
# GH3480, apply with mixed dtype on axis=1 breaks in 0.11
df = DataFrame({'foo1': np.random.randn(6),
'foo2': ['one', 'two', 'two', 'three', 'one', 'two']})
result = df.apply(lambda x: x, axis=1)
tm.assert_series_equal(df.get_dtype_counts(), result.get_dtype_counts())
# GH 3610 incorrect dtype conversion with as_index=False
df = DataFrame({"c1": [1, 2, 6, 6, 8]})
df["c2"] = df.c1 / 2.0
result1 = df.groupby("c2").mean().reset_index().c2
result2 = df.groupby("c2", as_index=False).mean().c2
tm.assert_series_equal(result1, result2)
def test_groupby_as_index_apply(df):
# GH #4648 and #3417
df = DataFrame({'item_id': ['b', 'b', 'a', 'c', 'a', 'b'],
'user_id': [1, 2, 1, 1, 3, 1],
'time': range(6)})
g_as = df.groupby('user_id', as_index=True)
g_not_as = df.groupby('user_id', as_index=False)
res_as = g_as.head(2).index
res_not_as = g_not_as.head(2).index
exp = Index([0, 1, 2, 4])
tm.assert_index_equal(res_as, exp)
tm.assert_index_equal(res_not_as, exp)
res_as_apply = g_as.apply(lambda x: x.head(2)).index
res_not_as_apply = g_not_as.apply(lambda x: x.head(2)).index
# apply doesn't maintain the original ordering
# changed in GH5610 as the as_index=False returns a MI here
exp_not_as_apply = MultiIndex.from_tuples([(0, 0), (0, 2), (1, 1), (
2, 4)])
tp = [(1, 0), (1, 2), (2, 1), (3, 4)]
exp_as_apply = MultiIndex.from_tuples(tp, names=['user_id', None])
tm.assert_index_equal(res_as_apply, exp_as_apply)
tm.assert_index_equal(res_not_as_apply, exp_not_as_apply)
ind = Index(list('abcde'))
df = DataFrame([[1, 2], [2, 3], [1, 4], [1, 5], [2, 6]], index=ind)
res = df.groupby(0, as_index=False).apply(lambda x: x).index
tm.assert_index_equal(res, ind)
def test_apply_concat_preserve_names(three_group):
grouped = three_group.groupby(['A', 'B'])
def desc(group):
result = group.describe()
result.index.name = 'stat'
return result
def desc2(group):
result = group.describe()
result.index.name = 'stat'
result = result[:len(group)]
# weirdo
return result
def desc3(group):
result = group.describe()
# names are different
result.index.name = 'stat_%d' % len(group)
result = result[:len(group)]
# weirdo
return result
result = grouped.apply(desc)
assert result.index.names == ('A', 'B', 'stat')
result2 = grouped.apply(desc2)
assert result2.index.names == ('A', 'B', 'stat')
result3 = grouped.apply(desc3)
assert result3.index.names == ('A', 'B', None)
def test_apply_series_to_frame():
def f(piece):
with np.errstate(invalid='ignore'):
logged = np.log(piece)
return DataFrame({'value': piece,
'demeaned': piece - piece.mean(),
'logged': logged})
dr = bdate_range('1/1/2000', periods=100)
ts = Series(np.random.randn(100), index=dr)
grouped = ts.groupby(lambda x: x.month)
result = grouped.apply(f)
assert isinstance(result, DataFrame)
tm.assert_index_equal(result.index, ts.index)
def test_apply_series_yield_constant(df):
result = df.groupby(['A', 'B'])['C'].apply(len)
assert result.index.names[:2] == ('A', 'B')
def test_apply_frame_yield_constant(df):
# GH13568
result = df.groupby(['A', 'B']).apply(len)
assert isinstance(result, Series)
assert result.name is None
result = df.groupby(['A', 'B'])[['C', 'D']].apply(len)
assert isinstance(result, Series)
assert result.name is None
def test_apply_frame_to_series(df):
grouped = df.groupby(['A', 'B'])
result = grouped.apply(len)
expected = grouped.count()['C']
tm.assert_index_equal(result.index, expected.index)
tm.assert_numpy_array_equal(result.values, expected.values)
def test_apply_frame_concat_series():
def trans(group):
return group.groupby('B')['C'].sum().sort_values()[:2]
def trans2(group):
grouped = group.groupby(df.reindex(group.index)['B'])
return grouped.sum().sort_values()[:2]
df = DataFrame({'A': np.random.randint(0, 5, 1000),
'B': np.random.randint(0, 5, 1000),
'C': np.random.randn(1000)})
result = df.groupby('A').apply(trans)
exp = df.groupby('A')['C'].apply(trans2)
tm.assert_series_equal(result, exp, check_names=False)
assert result.name == 'C'
def test_apply_transform(ts):
grouped = ts.groupby(lambda x: x.month)
result = grouped.apply(lambda x: x * 2)
expected = grouped.transform(lambda x: x * 2)
tm.assert_series_equal(result, expected)
def test_apply_multikey_corner(tsframe):
grouped = tsframe.groupby([lambda x: x.year, lambda x: x.month])
def f(group):
return group.sort_values('A')[-5:]
result = grouped.apply(f)
for key, group in grouped:
tm.assert_frame_equal(result.loc[key], f(group))
def test_apply_chunk_view():
# Low level tinkering could be unsafe, make sure not
df = DataFrame({'key': [1, 1, 1, 2, 2, 2, 3, 3, 3],
'value': compat.lrange(9)})
result = df.groupby('key', group_keys=False).apply(lambda x: x[:2])
expected = df.take([0, 1, 3, 4, 6, 7])
tm.assert_frame_equal(result, expected)
def test_apply_no_name_column_conflict():
df = DataFrame({'name': [1, 1, 1, 1, 1, 1, 2, 2, 2, 2],
'name2': [0, 0, 0, 1, 1, 1, 0, 0, 1, 1],
'value': compat.lrange(10)[::-1]})
# it works! #2605
grouped = df.groupby(['name', 'name2'])
grouped.apply(lambda x: x.sort_values('value', inplace=True))
def test_apply_typecast_fail():
df = DataFrame({'d': [1., 1., 1., 2., 2., 2.],
'c': np.tile(
['a', 'b', 'c'], 2),
'v': np.arange(1., 7.)})
def f(group):
v = group['v']
group['v2'] = (v - v.min()) / (v.max() - v.min())
return group
result = df.groupby('d').apply(f)
expected = df.copy()
expected['v2'] = np.tile([0., 0.5, 1], 2)
tm.assert_frame_equal(result, expected)
def test_apply_multiindex_fail():
index = MultiIndex.from_arrays([[0, 0, 0, 1, 1, 1], [1, 2, 3, 1, 2, 3]
])
df = DataFrame({'d': [1., 1., 1., 2., 2., 2.],
'c': np.tile(['a', 'b', 'c'], 2),
'v': np.arange(1., 7.)}, index=index)
def f(group):
v = group['v']
group['v2'] = (v - v.min()) / (v.max() - v.min())
return group
result = df.groupby('d').apply(f)
expected = df.copy()
expected['v2'] = np.tile([0., 0.5, 1], 2)
tm.assert_frame_equal(result, expected)
def test_apply_corner(tsframe):
result = tsframe.groupby(lambda x: x.year).apply(lambda x: x * 2)
expected = tsframe * 2
tm.assert_frame_equal(result, expected)
def test_apply_without_copy():
# GH 5545
# returning a non-copy in an applied function fails
data = DataFrame({'id_field': [100, 100, 200, 300],
'category': ['a', 'b', 'c', 'c'],
'value': [1, 2, 3, 4]})
def filt1(x):
if x.shape[0] == 1:
return x.copy()
Loading ...