Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / pandas   python

Repository URL to install this package:

/ core / window.py

"""
Provide a generic structure to support window functions,
similar to how we have a Groupby object.
"""
from __future__ import division

from collections import defaultdict
from datetime import timedelta
from textwrap import dedent
import warnings

import numpy as np

import pandas._libs.window as libwindow
import pandas.compat as compat
from pandas.compat.numpy import function as nv
from pandas.util._decorators import Appender, Substitution, cache_readonly

from pandas.core.dtypes.common import (
    ensure_float64, is_bool, is_float_dtype, is_integer, is_integer_dtype,
    is_list_like, is_scalar, is_timedelta64_dtype, needs_i8_conversion)
from pandas.core.dtypes.generic import (
    ABCDataFrame, ABCDateOffset, ABCDatetimeIndex, ABCPeriodIndex, ABCSeries,
    ABCTimedeltaIndex)

from pandas.core.base import PandasObject, SelectionMixin
import pandas.core.common as com
from pandas.core.generic import _shared_docs
from pandas.core.groupby.base import GroupByMixin

_shared_docs = dict(**_shared_docs)
_doc_template = """
        Returns
        -------
        Series or DataFrame
            Return type is determined by the caller.

        See Also
        --------
        Series.%(name)s : Series %(name)s.
        DataFrame.%(name)s : DataFrame %(name)s.
"""


class _Window(PandasObject, SelectionMixin):
    _attributes = ['window', 'min_periods', 'center', 'win_type',
                   'axis', 'on', 'closed']
    exclusions = set()

    def __init__(self, obj, window=None, min_periods=None,
                 center=False, win_type=None, axis=0, on=None, closed=None,
                 **kwargs):

        self.__dict__.update(kwargs)
        self.blocks = []
        self.obj = obj
        self.on = on
        self.closed = closed
        self.window = window
        self.min_periods = min_periods
        self.center = center
        self.win_type = win_type
        self.win_freq = None
        self.axis = obj._get_axis_number(axis) if axis is not None else None
        self.validate()

    @property
    def _constructor(self):
        return Window

    @property
    def is_datetimelike(self):
        return None

    @property
    def _on(self):
        return None

    @property
    def is_freq_type(self):
        return self.win_type == 'freq'

    def validate(self):
        if self.center is not None and not is_bool(self.center):
            raise ValueError("center must be a boolean")
        if (self.min_periods is not None and
                not is_integer(self.min_periods)):
            raise ValueError("min_periods must be an integer")
        if (self.closed is not None and
                self.closed not in ['right', 'both', 'left', 'neither']):
            raise ValueError("closed must be 'right', 'left', 'both' or "
                             "'neither'")

    def _convert_freq(self):
        """
        Resample according to the how, return a new object.
        """
        obj = self._selected_obj
        index = None
        return obj, index

    def _create_blocks(self):
        """
        Split data into blocks & return conformed data.
        """

        obj, index = self._convert_freq()
        if index is not None:
            index = self._on

        # filter out the on from the object
        if self.on is not None:
            if obj.ndim == 2:
                obj = obj.reindex(columns=obj.columns.difference([self.on]),
                                  copy=False)
        blocks = obj._to_dict_of_blocks(copy=False).values()

        return blocks, obj, index

    def _gotitem(self, key, ndim, subset=None):
        """
        Sub-classes to define. Return a sliced object.

        Parameters
        ----------
        key : str / list of selections
        ndim : 1,2
            requested ndim of result
        subset : object, default None
            subset to act on
        """

        # create a new object to prevent aliasing
        if subset is None:
            subset = self.obj
        self = self._shallow_copy(subset)
        self._reset_cache()
        if subset.ndim == 2:
            if is_scalar(key) and key in subset or is_list_like(key):
                self._selection = key
        return self

    def __getattr__(self, attr):
        if attr in self._internal_names_set:
            return object.__getattribute__(self, attr)
        if attr in self.obj:
            return self[attr]

        raise AttributeError("%r object has no attribute %r" %
                             (type(self).__name__, attr))

    def _dir_additions(self):
        return self.obj._dir_additions()

    def _get_window(self, other=None):
        return self.window

    @property
    def _window_type(self):
        return self.__class__.__name__

    def __unicode__(self):
        """
        Provide a nice str repr of our rolling object.
        """

        attrs = ["{k}={v}".format(k=k, v=getattr(self, k))
                 for k in self._attributes
                 if getattr(self, k, None) is not None]
        return "{klass} [{attrs}]".format(klass=self._window_type,
                                          attrs=','.join(attrs))

    def __iter__(self):
        url = 'https://github.com/pandas-dev/pandas/issues/11704'
        raise NotImplementedError('See issue #11704 {url}'.format(url=url))

    def _get_index(self, index=None):
        """
        Return index as ndarrays.

        Returns
        -------
        tuple of (index, index_as_ndarray)
        """

        if self.is_freq_type:
            if index is None:
                index = self._on
            return index, index.asi8
        return index, index

    def _prep_values(self, values=None, kill_inf=True):

        if values is None:
            values = getattr(self._selected_obj, 'values', self._selected_obj)

        # GH #12373 : rolling functions error on float32 data
        # make sure the data is coerced to float64
        if is_float_dtype(values.dtype):
            values = ensure_float64(values)
        elif is_integer_dtype(values.dtype):
            values = ensure_float64(values)
        elif needs_i8_conversion(values.dtype):
            raise NotImplementedError("ops for {action} for this "
                                      "dtype {dtype} are not "
                                      "implemented".format(
                                          action=self._window_type,
                                          dtype=values.dtype))
        else:
            try:
                values = ensure_float64(values)
            except (ValueError, TypeError):
                raise TypeError("cannot handle this type -> {0}"
                                "".format(values.dtype))

        if kill_inf:
            values = values.copy()
            values[np.isinf(values)] = np.NaN

        return values

    def _wrap_result(self, result, block=None, obj=None):
        """
        Wrap a single result.
        """

        if obj is None:
            obj = self._selected_obj
        index = obj.index

        if isinstance(result, np.ndarray):

            # coerce if necessary
            if block is not None:
                if is_timedelta64_dtype(block.values.dtype):
                    from pandas import to_timedelta
                    result = to_timedelta(
                        result.ravel(), unit='ns').values.reshape(result.shape)

            if result.ndim == 1:
                from pandas import Series
                return Series(result, index, name=obj.name)

            return type(obj)(result, index=index, columns=block.columns)
        return result

    def _wrap_results(self, results, blocks, obj):
        """
        Wrap the results.

        Parameters
        ----------
        results : list of ndarrays
        blocks : list of blocks
        obj : conformed data (may be resampled)
        """

        from pandas import Series, concat
        from pandas.core.index import ensure_index

        final = []
        for result, block in zip(results, blocks):

            result = self._wrap_result(result, block=block, obj=obj)
            if result.ndim == 1:
                return result
            final.append(result)

        # if we have an 'on' column
        # we want to put it back into the results
        # in the same location
        columns = self._selected_obj.columns
        if self.on is not None and not self._on.equals(obj.index):

            name = self._on.name
            final.append(Series(self._on, index=obj.index, name=name))

            if self._selection is not None:

                selection = ensure_index(self._selection)

                # need to reorder to include original location of
                # the on column (if its not already there)
                if name not in selection:
                    columns = self.obj.columns
                    indexer = columns.get_indexer(selection.tolist() + [name])
                    columns = columns.take(sorted(indexer))

        if not len(final):
            return obj.astype('float64')
        return concat(final, axis=1).reindex(columns=columns, copy=False)

    def _center_window(self, result, window):
        """
        Center the result in the window.
        """
        if self.axis > result.ndim - 1:
            raise ValueError("Requested axis is larger then no. of argument "
                             "dimensions")

        offset = _offset(window, True)
        if offset > 0:
            if isinstance(result, (ABCSeries, ABCDataFrame)):
                result = result.slice_shift(-offset, axis=self.axis)
            else:
                lead_indexer = [slice(None)] * result.ndim
                lead_indexer[self.axis] = slice(offset, None)
                result = np.copy(result[tuple(lead_indexer)])
        return result

    def aggregate(self, arg, *args, **kwargs):
        result, how = self._aggregate(arg, *args, **kwargs)
        if result is None:
            return self.apply(arg, raw=False, args=args, kwargs=kwargs)
        return result

    agg = aggregate

    _shared_docs['sum'] = dedent("""
    Calculate %(name)s sum of given DataFrame or Series.

    Parameters
    ----------
    *args, **kwargs
        For compatibility with other %(name)s methods. Has no effect
        on the computed value.

    Returns
    -------
    Series or DataFrame
        Same type as the input, with the same index, containing the
        %(name)s sum.

    See Also
    --------
    Series.sum : Reducing sum for Series.
    DataFrame.sum : Reducing sum for DataFrame.

    Examples
    --------
    >>> s = pd.Series([1, 2, 3, 4, 5])
    >>> s
    0    1
    1    2
    2    3
Loading ...