"""
Provide a generic structure to support window functions,
similar to how we have a Groupby object.
"""
from __future__ import division
from collections import defaultdict
from datetime import timedelta
from textwrap import dedent
import warnings
import numpy as np
import pandas._libs.window as libwindow
import pandas.compat as compat
from pandas.compat.numpy import function as nv
from pandas.util._decorators import Appender, Substitution, cache_readonly
from pandas.core.dtypes.common import (
ensure_float64, is_bool, is_float_dtype, is_integer, is_integer_dtype,
is_list_like, is_scalar, is_timedelta64_dtype, needs_i8_conversion)
from pandas.core.dtypes.generic import (
ABCDataFrame, ABCDateOffset, ABCDatetimeIndex, ABCPeriodIndex, ABCSeries,
ABCTimedeltaIndex)
from pandas.core.base import PandasObject, SelectionMixin
import pandas.core.common as com
from pandas.core.generic import _shared_docs
from pandas.core.groupby.base import GroupByMixin
_shared_docs = dict(**_shared_docs)
_doc_template = """
Returns
-------
Series or DataFrame
Return type is determined by the caller.
See Also
--------
Series.%(name)s : Series %(name)s.
DataFrame.%(name)s : DataFrame %(name)s.
"""
class _Window(PandasObject, SelectionMixin):
_attributes = ['window', 'min_periods', 'center', 'win_type',
'axis', 'on', 'closed']
exclusions = set()
def __init__(self, obj, window=None, min_periods=None,
center=False, win_type=None, axis=0, on=None, closed=None,
**kwargs):
self.__dict__.update(kwargs)
self.blocks = []
self.obj = obj
self.on = on
self.closed = closed
self.window = window
self.min_periods = min_periods
self.center = center
self.win_type = win_type
self.win_freq = None
self.axis = obj._get_axis_number(axis) if axis is not None else None
self.validate()
@property
def _constructor(self):
return Window
@property
def is_datetimelike(self):
return None
@property
def _on(self):
return None
@property
def is_freq_type(self):
return self.win_type == 'freq'
def validate(self):
if self.center is not None and not is_bool(self.center):
raise ValueError("center must be a boolean")
if (self.min_periods is not None and
not is_integer(self.min_periods)):
raise ValueError("min_periods must be an integer")
if (self.closed is not None and
self.closed not in ['right', 'both', 'left', 'neither']):
raise ValueError("closed must be 'right', 'left', 'both' or "
"'neither'")
def _convert_freq(self):
"""
Resample according to the how, return a new object.
"""
obj = self._selected_obj
index = None
return obj, index
def _create_blocks(self):
"""
Split data into blocks & return conformed data.
"""
obj, index = self._convert_freq()
if index is not None:
index = self._on
# filter out the on from the object
if self.on is not None:
if obj.ndim == 2:
obj = obj.reindex(columns=obj.columns.difference([self.on]),
copy=False)
blocks = obj._to_dict_of_blocks(copy=False).values()
return blocks, obj, index
def _gotitem(self, key, ndim, subset=None):
"""
Sub-classes to define. Return a sliced object.
Parameters
----------
key : str / list of selections
ndim : 1,2
requested ndim of result
subset : object, default None
subset to act on
"""
# create a new object to prevent aliasing
if subset is None:
subset = self.obj
self = self._shallow_copy(subset)
self._reset_cache()
if subset.ndim == 2:
if is_scalar(key) and key in subset or is_list_like(key):
self._selection = key
return self
def __getattr__(self, attr):
if attr in self._internal_names_set:
return object.__getattribute__(self, attr)
if attr in self.obj:
return self[attr]
raise AttributeError("%r object has no attribute %r" %
(type(self).__name__, attr))
def _dir_additions(self):
return self.obj._dir_additions()
def _get_window(self, other=None):
return self.window
@property
def _window_type(self):
return self.__class__.__name__
def __unicode__(self):
"""
Provide a nice str repr of our rolling object.
"""
attrs = ["{k}={v}".format(k=k, v=getattr(self, k))
for k in self._attributes
if getattr(self, k, None) is not None]
return "{klass} [{attrs}]".format(klass=self._window_type,
attrs=','.join(attrs))
def __iter__(self):
url = 'https://github.com/pandas-dev/pandas/issues/11704'
raise NotImplementedError('See issue #11704 {url}'.format(url=url))
def _get_index(self, index=None):
"""
Return index as ndarrays.
Returns
-------
tuple of (index, index_as_ndarray)
"""
if self.is_freq_type:
if index is None:
index = self._on
return index, index.asi8
return index, index
def _prep_values(self, values=None, kill_inf=True):
if values is None:
values = getattr(self._selected_obj, 'values', self._selected_obj)
# GH #12373 : rolling functions error on float32 data
# make sure the data is coerced to float64
if is_float_dtype(values.dtype):
values = ensure_float64(values)
elif is_integer_dtype(values.dtype):
values = ensure_float64(values)
elif needs_i8_conversion(values.dtype):
raise NotImplementedError("ops for {action} for this "
"dtype {dtype} are not "
"implemented".format(
action=self._window_type,
dtype=values.dtype))
else:
try:
values = ensure_float64(values)
except (ValueError, TypeError):
raise TypeError("cannot handle this type -> {0}"
"".format(values.dtype))
if kill_inf:
values = values.copy()
values[np.isinf(values)] = np.NaN
return values
def _wrap_result(self, result, block=None, obj=None):
"""
Wrap a single result.
"""
if obj is None:
obj = self._selected_obj
index = obj.index
if isinstance(result, np.ndarray):
# coerce if necessary
if block is not None:
if is_timedelta64_dtype(block.values.dtype):
from pandas import to_timedelta
result = to_timedelta(
result.ravel(), unit='ns').values.reshape(result.shape)
if result.ndim == 1:
from pandas import Series
return Series(result, index, name=obj.name)
return type(obj)(result, index=index, columns=block.columns)
return result
def _wrap_results(self, results, blocks, obj):
"""
Wrap the results.
Parameters
----------
results : list of ndarrays
blocks : list of blocks
obj : conformed data (may be resampled)
"""
from pandas import Series, concat
from pandas.core.index import ensure_index
final = []
for result, block in zip(results, blocks):
result = self._wrap_result(result, block=block, obj=obj)
if result.ndim == 1:
return result
final.append(result)
# if we have an 'on' column
# we want to put it back into the results
# in the same location
columns = self._selected_obj.columns
if self.on is not None and not self._on.equals(obj.index):
name = self._on.name
final.append(Series(self._on, index=obj.index, name=name))
if self._selection is not None:
selection = ensure_index(self._selection)
# need to reorder to include original location of
# the on column (if its not already there)
if name not in selection:
columns = self.obj.columns
indexer = columns.get_indexer(selection.tolist() + [name])
columns = columns.take(sorted(indexer))
if not len(final):
return obj.astype('float64')
return concat(final, axis=1).reindex(columns=columns, copy=False)
def _center_window(self, result, window):
"""
Center the result in the window.
"""
if self.axis > result.ndim - 1:
raise ValueError("Requested axis is larger then no. of argument "
"dimensions")
offset = _offset(window, True)
if offset > 0:
if isinstance(result, (ABCSeries, ABCDataFrame)):
result = result.slice_shift(-offset, axis=self.axis)
else:
lead_indexer = [slice(None)] * result.ndim
lead_indexer[self.axis] = slice(offset, None)
result = np.copy(result[tuple(lead_indexer)])
return result
def aggregate(self, arg, *args, **kwargs):
result, how = self._aggregate(arg, *args, **kwargs)
if result is None:
return self.apply(arg, raw=False, args=args, kwargs=kwargs)
return result
agg = aggregate
_shared_docs['sum'] = dedent("""
Calculate %(name)s sum of given DataFrame or Series.
Parameters
----------
*args, **kwargs
For compatibility with other %(name)s methods. Has no effect
on the computed value.
Returns
-------
Series or DataFrame
Same type as the input, with the same index, containing the
%(name)s sum.
See Also
--------
Series.sum : Reducing sum for Series.
DataFrame.sum : Reducing sum for DataFrame.
Examples
--------
>>> s = pd.Series([1, 2, 3, 4, 5])
>>> s
0 1
1 2
2 3
Loading ...