from collections import OrderedDict
from datetime import datetime, timedelta
from itertools import product
import warnings
from warnings import catch_warnings
import numpy as np
from numpy.random import randn
import pytest
from pandas.compat import range, zip
from pandas.errors import UnsupportedFunctionCall
import pandas.util._test_decorators as td
import pandas as pd
from pandas import (
DataFrame, Index, Series, Timestamp, bdate_range, concat, isna, notna)
from pandas.core.base import SpecificationError
from pandas.core.sorting import safe_sort
import pandas.core.window as rwindow
import pandas.util.testing as tm
import pandas.tseries.offsets as offsets
N, K = 100, 10
def assert_equal(left, right):
if isinstance(left, Series):
tm.assert_series_equal(left, right)
else:
tm.assert_frame_equal(left, right)
@pytest.fixture(params=[True, False])
def raw(request):
return request.param
@pytest.fixture(params=['triang', 'blackman', 'hamming', 'bartlett', 'bohman',
'blackmanharris', 'nuttall', 'barthann'])
def win_types(request):
return request.param
@pytest.fixture(params=['kaiser', 'gaussian', 'general_gaussian'])
def win_types_special(request):
return request.param
class Base(object):
_nan_locs = np.arange(20, 40)
_inf_locs = np.array([])
def _create_data(self):
arr = randn(N)
arr[self._nan_locs] = np.NaN
self.arr = arr
self.rng = bdate_range(datetime(2009, 1, 1), periods=N)
self.series = Series(arr.copy(), index=self.rng)
self.frame = DataFrame(randn(N, K), index=self.rng,
columns=np.arange(K))
class TestApi(Base):
def setup_method(self, method):
self._create_data()
def test_getitem(self):
r = self.frame.rolling(window=5)
tm.assert_index_equal(r._selected_obj.columns, self.frame.columns)
r = self.frame.rolling(window=5)[1]
assert r._selected_obj.name == self.frame.columns[1]
# technically this is allowed
r = self.frame.rolling(window=5)[1, 3]
tm.assert_index_equal(r._selected_obj.columns,
self.frame.columns[[1, 3]])
r = self.frame.rolling(window=5)[[1, 3]]
tm.assert_index_equal(r._selected_obj.columns,
self.frame.columns[[1, 3]])
def test_select_bad_cols(self):
df = DataFrame([[1, 2]], columns=['A', 'B'])
g = df.rolling(window=5)
pytest.raises(KeyError, g.__getitem__, ['C']) # g[['C']]
pytest.raises(KeyError, g.__getitem__, ['A', 'C']) # g[['A', 'C']]
with pytest.raises(KeyError, match='^[^A]+$'):
# A should not be referenced as a bad column...
# will have to rethink regex if you change message!
g[['A', 'C']]
def test_attribute_access(self):
df = DataFrame([[1, 2]], columns=['A', 'B'])
r = df.rolling(window=5)
tm.assert_series_equal(r.A.sum(), r['A'].sum())
pytest.raises(AttributeError, lambda: r.F)
def tests_skip_nuisance(self):
df = DataFrame({'A': range(5), 'B': range(5, 10), 'C': 'foo'})
r = df.rolling(window=3)
result = r[['A', 'B']].sum()
expected = DataFrame({'A': [np.nan, np.nan, 3, 6, 9],
'B': [np.nan, np.nan, 18, 21, 24]},
columns=list('AB'))
tm.assert_frame_equal(result, expected)
def test_skip_sum_object_raises(self):
df = DataFrame({'A': range(5), 'B': range(5, 10), 'C': 'foo'})
r = df.rolling(window=3)
with pytest.raises(TypeError, match='cannot handle this type'):
r.sum()
def test_agg(self):
df = DataFrame({'A': range(5), 'B': range(0, 10, 2)})
r = df.rolling(window=3)
a_mean = r['A'].mean()
a_std = r['A'].std()
a_sum = r['A'].sum()
b_mean = r['B'].mean()
b_std = r['B'].std()
b_sum = r['B'].sum()
result = r.aggregate([np.mean, np.std])
expected = concat([a_mean, a_std, b_mean, b_std], axis=1)
expected.columns = pd.MultiIndex.from_product([['A', 'B'], ['mean',
'std']])
tm.assert_frame_equal(result, expected)
result = r.aggregate({'A': np.mean, 'B': np.std})
expected = concat([a_mean, b_std], axis=1)
tm.assert_frame_equal(result, expected, check_like=True)
result = r.aggregate({'A': ['mean', 'std']})
expected = concat([a_mean, a_std], axis=1)
expected.columns = pd.MultiIndex.from_tuples([('A', 'mean'), ('A',
'std')])
tm.assert_frame_equal(result, expected)
result = r['A'].aggregate(['mean', 'sum'])
expected = concat([a_mean, a_sum], axis=1)
expected.columns = ['mean', 'sum']
tm.assert_frame_equal(result, expected)
with catch_warnings(record=True):
# using a dict with renaming
warnings.simplefilter("ignore", FutureWarning)
result = r.aggregate({'A': {'mean': 'mean', 'sum': 'sum'}})
expected = concat([a_mean, a_sum], axis=1)
expected.columns = pd.MultiIndex.from_tuples([('A', 'mean'),
('A', 'sum')])
tm.assert_frame_equal(result, expected, check_like=True)
with catch_warnings(record=True):
warnings.simplefilter("ignore", FutureWarning)
result = r.aggregate({'A': {'mean': 'mean',
'sum': 'sum'},
'B': {'mean2': 'mean',
'sum2': 'sum'}})
expected = concat([a_mean, a_sum, b_mean, b_sum], axis=1)
exp_cols = [('A', 'mean'), ('A', 'sum'), ('B', 'mean2'), ('B', 'sum2')]
expected.columns = pd.MultiIndex.from_tuples(exp_cols)
tm.assert_frame_equal(result, expected, check_like=True)
result = r.aggregate({'A': ['mean', 'std'], 'B': ['mean', 'std']})
expected = concat([a_mean, a_std, b_mean, b_std], axis=1)
exp_cols = [('A', 'mean'), ('A', 'std'), ('B', 'mean'), ('B', 'std')]
expected.columns = pd.MultiIndex.from_tuples(exp_cols)
tm.assert_frame_equal(result, expected, check_like=True)
def test_agg_apply(self, raw):
# passed lambda
df = DataFrame({'A': range(5), 'B': range(0, 10, 2)})
r = df.rolling(window=3)
a_sum = r['A'].sum()
result = r.agg({'A': np.sum, 'B': lambda x: np.std(x, ddof=1)})
rcustom = r['B'].apply(lambda x: np.std(x, ddof=1), raw=raw)
expected = concat([a_sum, rcustom], axis=1)
tm.assert_frame_equal(result, expected, check_like=True)
def test_agg_consistency(self):
df = DataFrame({'A': range(5), 'B': range(0, 10, 2)})
r = df.rolling(window=3)
result = r.agg([np.sum, np.mean]).columns
expected = pd.MultiIndex.from_product([list('AB'), ['sum', 'mean']])
tm.assert_index_equal(result, expected)
result = r['A'].agg([np.sum, np.mean]).columns
expected = Index(['sum', 'mean'])
tm.assert_index_equal(result, expected)
result = r.agg({'A': [np.sum, np.mean]}).columns
expected = pd.MultiIndex.from_tuples([('A', 'sum'), ('A', 'mean')])
tm.assert_index_equal(result, expected)
def test_agg_nested_dicts(self):
# API change for disallowing these types of nested dicts
df = DataFrame({'A': range(5), 'B': range(0, 10, 2)})
r = df.rolling(window=3)
def f():
r.aggregate({'r1': {'A': ['mean', 'sum']},
'r2': {'B': ['mean', 'sum']}})
pytest.raises(SpecificationError, f)
expected = concat([r['A'].mean(), r['A'].std(),
r['B'].mean(), r['B'].std()], axis=1)
expected.columns = pd.MultiIndex.from_tuples([('ra', 'mean'), (
'ra', 'std'), ('rb', 'mean'), ('rb', 'std')])
with catch_warnings(record=True):
warnings.simplefilter("ignore", FutureWarning)
result = r[['A', 'B']].agg({'A': {'ra': ['mean', 'std']},
'B': {'rb': ['mean', 'std']}})
tm.assert_frame_equal(result, expected, check_like=True)
with catch_warnings(record=True):
warnings.simplefilter("ignore", FutureWarning)
result = r.agg({'A': {'ra': ['mean', 'std']},
'B': {'rb': ['mean', 'std']}})
expected.columns = pd.MultiIndex.from_tuples([('A', 'ra', 'mean'), (
'A', 'ra', 'std'), ('B', 'rb', 'mean'), ('B', 'rb', 'std')])
tm.assert_frame_equal(result, expected, check_like=True)
def test_count_nonnumeric_types(self):
# GH12541
cols = ['int', 'float', 'string', 'datetime', 'timedelta', 'periods',
'fl_inf', 'fl_nan', 'str_nan', 'dt_nat', 'periods_nat']
df = DataFrame(
{'int': [1, 2, 3],
'float': [4., 5., 6.],
'string': list('abc'),
'datetime': pd.date_range('20170101', periods=3),
'timedelta': pd.timedelta_range('1 s', periods=3, freq='s'),
'periods': [pd.Period('2012-01'), pd.Period('2012-02'),
pd.Period('2012-03')],
'fl_inf': [1., 2., np.Inf],
'fl_nan': [1., 2., np.NaN],
'str_nan': ['aa', 'bb', np.NaN],
'dt_nat': [Timestamp('20170101'), Timestamp('20170203'),
Timestamp(None)],
'periods_nat': [pd.Period('2012-01'), pd.Period('2012-02'),
pd.Period(None)]},
columns=cols)
expected = DataFrame(
{'int': [1., 2., 2.],
'float': [1., 2., 2.],
'string': [1., 2., 2.],
'datetime': [1., 2., 2.],
'timedelta': [1., 2., 2.],
'periods': [1., 2., 2.],
'fl_inf': [1., 2., 2.],
'fl_nan': [1., 2., 1.],
'str_nan': [1., 2., 1.],
'dt_nat': [1., 2., 1.],
'periods_nat': [1., 2., 1.]},
columns=cols)
result = df.rolling(window=2).count()
tm.assert_frame_equal(result, expected)
result = df.rolling(1).count()
expected = df.notna().astype(float)
tm.assert_frame_equal(result, expected)
@td.skip_if_no_scipy
@pytest.mark.filterwarnings("ignore:can't resolve:ImportWarning")
def test_window_with_args(self):
# make sure that we are aggregating window functions correctly with arg
r = Series(np.random.randn(100)).rolling(window=10, min_periods=1,
win_type='gaussian')
expected = concat([r.mean(std=10), r.mean(std=.01)], axis=1)
expected.columns = ['<lambda>', '<lambda>']
result = r.aggregate([lambda x: x.mean(std=10),
lambda x: x.mean(std=.01)])
tm.assert_frame_equal(result, expected)
def a(x):
return x.mean(std=10)
def b(x):
return x.mean(std=0.01)
expected = concat([r.mean(std=10), r.mean(std=.01)], axis=1)
expected.columns = ['a', 'b']
result = r.aggregate([a, b])
tm.assert_frame_equal(result, expected)
def test_preserve_metadata(self):
# GH 10565
s = Series(np.arange(100), name='foo')
s2 = s.rolling(30).sum()
s3 = s.rolling(20).sum()
assert s2.name == 'foo'
assert s3.name == 'foo'
@pytest.mark.parametrize("func,window_size,expected_vals", [
('rolling', 2, [[np.nan, np.nan, np.nan, np.nan],
[15., 20., 25., 20.],
[25., 30., 35., 30.],
[np.nan, np.nan, np.nan, np.nan],
[20., 30., 35., 30.],
[35., 40., 60., 40.],
[60., 80., 85., 80]]),
('expanding', None, [[10., 10., 20., 20.],
[15., 20., 25., 20.],
[20., 30., 30., 20.],
[10., 10., 30., 30.],
[20., 30., 35., 30.],
[26.666667, 40., 50., 30.],
[40., 80., 60., 30.]])])
def test_multiple_agg_funcs(self, func, window_size, expected_vals):
# GH 15072
df = pd.DataFrame([
['A', 10, 20],
['A', 20, 30],
['A', 30, 40],
['B', 10, 30],
['B', 30, 40],
['B', 40, 80],
['B', 80, 90]], columns=['stock', 'low', 'high'])
f = getattr(df.groupby('stock'), func)
Loading ...