Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / pandas   python

Repository URL to install this package:

Version: 0.24.2 

/ tests / test_window.py

from collections import OrderedDict
from datetime import datetime, timedelta
from itertools import product
import warnings
from warnings import catch_warnings

import numpy as np
from numpy.random import randn
import pytest

from pandas.compat import range, zip
from pandas.errors import UnsupportedFunctionCall
import pandas.util._test_decorators as td

import pandas as pd
from pandas import (
    DataFrame, Index, Series, Timestamp, bdate_range, concat, isna, notna)
from pandas.core.base import SpecificationError
from pandas.core.sorting import safe_sort
import pandas.core.window as rwindow
import pandas.util.testing as tm

import pandas.tseries.offsets as offsets

N, K = 100, 10


def assert_equal(left, right):
    if isinstance(left, Series):
        tm.assert_series_equal(left, right)
    else:
        tm.assert_frame_equal(left, right)


@pytest.fixture(params=[True, False])
def raw(request):
    return request.param


@pytest.fixture(params=['triang', 'blackman', 'hamming', 'bartlett', 'bohman',
                        'blackmanharris', 'nuttall', 'barthann'])
def win_types(request):
    return request.param


@pytest.fixture(params=['kaiser', 'gaussian', 'general_gaussian'])
def win_types_special(request):
    return request.param


class Base(object):

    _nan_locs = np.arange(20, 40)
    _inf_locs = np.array([])

    def _create_data(self):
        arr = randn(N)
        arr[self._nan_locs] = np.NaN

        self.arr = arr
        self.rng = bdate_range(datetime(2009, 1, 1), periods=N)
        self.series = Series(arr.copy(), index=self.rng)
        self.frame = DataFrame(randn(N, K), index=self.rng,
                               columns=np.arange(K))


class TestApi(Base):

    def setup_method(self, method):
        self._create_data()

    def test_getitem(self):

        r = self.frame.rolling(window=5)
        tm.assert_index_equal(r._selected_obj.columns, self.frame.columns)

        r = self.frame.rolling(window=5)[1]
        assert r._selected_obj.name == self.frame.columns[1]

        # technically this is allowed
        r = self.frame.rolling(window=5)[1, 3]
        tm.assert_index_equal(r._selected_obj.columns,
                              self.frame.columns[[1, 3]])

        r = self.frame.rolling(window=5)[[1, 3]]
        tm.assert_index_equal(r._selected_obj.columns,
                              self.frame.columns[[1, 3]])

    def test_select_bad_cols(self):
        df = DataFrame([[1, 2]], columns=['A', 'B'])
        g = df.rolling(window=5)
        pytest.raises(KeyError, g.__getitem__, ['C'])  # g[['C']]

        pytest.raises(KeyError, g.__getitem__, ['A', 'C'])  # g[['A', 'C']]
        with pytest.raises(KeyError, match='^[^A]+$'):
            # A should not be referenced as a bad column...
            # will have to rethink regex if you change message!
            g[['A', 'C']]

    def test_attribute_access(self):

        df = DataFrame([[1, 2]], columns=['A', 'B'])
        r = df.rolling(window=5)
        tm.assert_series_equal(r.A.sum(), r['A'].sum())
        pytest.raises(AttributeError, lambda: r.F)

    def tests_skip_nuisance(self):

        df = DataFrame({'A': range(5), 'B': range(5, 10), 'C': 'foo'})
        r = df.rolling(window=3)
        result = r[['A', 'B']].sum()
        expected = DataFrame({'A': [np.nan, np.nan, 3, 6, 9],
                              'B': [np.nan, np.nan, 18, 21, 24]},
                             columns=list('AB'))
        tm.assert_frame_equal(result, expected)

    def test_skip_sum_object_raises(self):
        df = DataFrame({'A': range(5), 'B': range(5, 10), 'C': 'foo'})
        r = df.rolling(window=3)

        with pytest.raises(TypeError, match='cannot handle this type'):
            r.sum()

    def test_agg(self):
        df = DataFrame({'A': range(5), 'B': range(0, 10, 2)})

        r = df.rolling(window=3)
        a_mean = r['A'].mean()
        a_std = r['A'].std()
        a_sum = r['A'].sum()
        b_mean = r['B'].mean()
        b_std = r['B'].std()
        b_sum = r['B'].sum()

        result = r.aggregate([np.mean, np.std])
        expected = concat([a_mean, a_std, b_mean, b_std], axis=1)
        expected.columns = pd.MultiIndex.from_product([['A', 'B'], ['mean',
                                                                    'std']])
        tm.assert_frame_equal(result, expected)

        result = r.aggregate({'A': np.mean, 'B': np.std})

        expected = concat([a_mean, b_std], axis=1)
        tm.assert_frame_equal(result, expected, check_like=True)

        result = r.aggregate({'A': ['mean', 'std']})
        expected = concat([a_mean, a_std], axis=1)
        expected.columns = pd.MultiIndex.from_tuples([('A', 'mean'), ('A',
                                                                      'std')])
        tm.assert_frame_equal(result, expected)

        result = r['A'].aggregate(['mean', 'sum'])
        expected = concat([a_mean, a_sum], axis=1)
        expected.columns = ['mean', 'sum']
        tm.assert_frame_equal(result, expected)

        with catch_warnings(record=True):
            # using a dict with renaming
            warnings.simplefilter("ignore", FutureWarning)
            result = r.aggregate({'A': {'mean': 'mean', 'sum': 'sum'}})
        expected = concat([a_mean, a_sum], axis=1)
        expected.columns = pd.MultiIndex.from_tuples([('A', 'mean'),
                                                      ('A', 'sum')])
        tm.assert_frame_equal(result, expected, check_like=True)

        with catch_warnings(record=True):
            warnings.simplefilter("ignore", FutureWarning)
            result = r.aggregate({'A': {'mean': 'mean',
                                        'sum': 'sum'},
                                  'B': {'mean2': 'mean',
                                        'sum2': 'sum'}})
        expected = concat([a_mean, a_sum, b_mean, b_sum], axis=1)
        exp_cols = [('A', 'mean'), ('A', 'sum'), ('B', 'mean2'), ('B', 'sum2')]
        expected.columns = pd.MultiIndex.from_tuples(exp_cols)
        tm.assert_frame_equal(result, expected, check_like=True)

        result = r.aggregate({'A': ['mean', 'std'], 'B': ['mean', 'std']})
        expected = concat([a_mean, a_std, b_mean, b_std], axis=1)

        exp_cols = [('A', 'mean'), ('A', 'std'), ('B', 'mean'), ('B', 'std')]
        expected.columns = pd.MultiIndex.from_tuples(exp_cols)
        tm.assert_frame_equal(result, expected, check_like=True)

    def test_agg_apply(self, raw):

        # passed lambda
        df = DataFrame({'A': range(5), 'B': range(0, 10, 2)})

        r = df.rolling(window=3)
        a_sum = r['A'].sum()

        result = r.agg({'A': np.sum, 'B': lambda x: np.std(x, ddof=1)})
        rcustom = r['B'].apply(lambda x: np.std(x, ddof=1), raw=raw)
        expected = concat([a_sum, rcustom], axis=1)
        tm.assert_frame_equal(result, expected, check_like=True)

    def test_agg_consistency(self):

        df = DataFrame({'A': range(5), 'B': range(0, 10, 2)})
        r = df.rolling(window=3)

        result = r.agg([np.sum, np.mean]).columns
        expected = pd.MultiIndex.from_product([list('AB'), ['sum', 'mean']])
        tm.assert_index_equal(result, expected)

        result = r['A'].agg([np.sum, np.mean]).columns
        expected = Index(['sum', 'mean'])
        tm.assert_index_equal(result, expected)

        result = r.agg({'A': [np.sum, np.mean]}).columns
        expected = pd.MultiIndex.from_tuples([('A', 'sum'), ('A', 'mean')])
        tm.assert_index_equal(result, expected)

    def test_agg_nested_dicts(self):

        # API change for disallowing these types of nested dicts
        df = DataFrame({'A': range(5), 'B': range(0, 10, 2)})
        r = df.rolling(window=3)

        def f():
            r.aggregate({'r1': {'A': ['mean', 'sum']},
                         'r2': {'B': ['mean', 'sum']}})

        pytest.raises(SpecificationError, f)

        expected = concat([r['A'].mean(), r['A'].std(),
                           r['B'].mean(), r['B'].std()], axis=1)
        expected.columns = pd.MultiIndex.from_tuples([('ra', 'mean'), (
            'ra', 'std'), ('rb', 'mean'), ('rb', 'std')])
        with catch_warnings(record=True):
            warnings.simplefilter("ignore", FutureWarning)
            result = r[['A', 'B']].agg({'A': {'ra': ['mean', 'std']},
                                        'B': {'rb': ['mean', 'std']}})
        tm.assert_frame_equal(result, expected, check_like=True)

        with catch_warnings(record=True):
            warnings.simplefilter("ignore", FutureWarning)
            result = r.agg({'A': {'ra': ['mean', 'std']},
                            'B': {'rb': ['mean', 'std']}})
        expected.columns = pd.MultiIndex.from_tuples([('A', 'ra', 'mean'), (
            'A', 'ra', 'std'), ('B', 'rb', 'mean'), ('B', 'rb', 'std')])
        tm.assert_frame_equal(result, expected, check_like=True)

    def test_count_nonnumeric_types(self):
        # GH12541
        cols = ['int', 'float', 'string', 'datetime', 'timedelta', 'periods',
                'fl_inf', 'fl_nan', 'str_nan', 'dt_nat', 'periods_nat']

        df = DataFrame(
            {'int': [1, 2, 3],
             'float': [4., 5., 6.],
             'string': list('abc'),
             'datetime': pd.date_range('20170101', periods=3),
             'timedelta': pd.timedelta_range('1 s', periods=3, freq='s'),
             'periods': [pd.Period('2012-01'), pd.Period('2012-02'),
                         pd.Period('2012-03')],
             'fl_inf': [1., 2., np.Inf],
             'fl_nan': [1., 2., np.NaN],
             'str_nan': ['aa', 'bb', np.NaN],
             'dt_nat': [Timestamp('20170101'), Timestamp('20170203'),
                        Timestamp(None)],
             'periods_nat': [pd.Period('2012-01'), pd.Period('2012-02'),
                             pd.Period(None)]},
            columns=cols)

        expected = DataFrame(
            {'int': [1., 2., 2.],
             'float': [1., 2., 2.],
             'string': [1., 2., 2.],
             'datetime': [1., 2., 2.],
             'timedelta': [1., 2., 2.],
             'periods': [1., 2., 2.],
             'fl_inf': [1., 2., 2.],
             'fl_nan': [1., 2., 1.],
             'str_nan': [1., 2., 1.],
             'dt_nat': [1., 2., 1.],
             'periods_nat': [1., 2., 1.]},
            columns=cols)

        result = df.rolling(window=2).count()
        tm.assert_frame_equal(result, expected)

        result = df.rolling(1).count()
        expected = df.notna().astype(float)
        tm.assert_frame_equal(result, expected)

    @td.skip_if_no_scipy
    @pytest.mark.filterwarnings("ignore:can't resolve:ImportWarning")
    def test_window_with_args(self):
        # make sure that we are aggregating window functions correctly with arg
        r = Series(np.random.randn(100)).rolling(window=10, min_periods=1,
                                                 win_type='gaussian')
        expected = concat([r.mean(std=10), r.mean(std=.01)], axis=1)
        expected.columns = ['<lambda>', '<lambda>']
        result = r.aggregate([lambda x: x.mean(std=10),
                              lambda x: x.mean(std=.01)])
        tm.assert_frame_equal(result, expected)

        def a(x):
            return x.mean(std=10)

        def b(x):
            return x.mean(std=0.01)

        expected = concat([r.mean(std=10), r.mean(std=.01)], axis=1)
        expected.columns = ['a', 'b']
        result = r.aggregate([a, b])
        tm.assert_frame_equal(result, expected)

    def test_preserve_metadata(self):
        # GH 10565
        s = Series(np.arange(100), name='foo')

        s2 = s.rolling(30).sum()
        s3 = s.rolling(20).sum()
        assert s2.name == 'foo'
        assert s3.name == 'foo'

    @pytest.mark.parametrize("func,window_size,expected_vals", [
        ('rolling', 2, [[np.nan, np.nan, np.nan, np.nan],
                        [15., 20., 25., 20.],
                        [25., 30., 35., 30.],
                        [np.nan, np.nan, np.nan, np.nan],
                        [20., 30., 35., 30.],
                        [35., 40., 60., 40.],
                        [60., 80., 85., 80]]),
        ('expanding', None, [[10., 10., 20., 20.],
                             [15., 20., 25., 20.],
                             [20., 30., 30., 20.],
                             [10., 10., 30., 30.],
                             [20., 30., 35., 30.],
                             [26.666667, 40., 50., 30.],
                             [40., 80., 60., 30.]])])
    def test_multiple_agg_funcs(self, func, window_size, expected_vals):
        # GH 15072
        df = pd.DataFrame([
            ['A', 10, 20],
            ['A', 20, 30],
            ['A', 30, 40],
            ['B', 10, 30],
            ['B', 30, 40],
            ['B', 40, 80],
            ['B', 80, 90]], columns=['stock', 'low', 'high'])

        f = getattr(df.groupby('stock'), func)
Loading ...