Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / pandas   python

Repository URL to install this package:

Version: 0.24.2 

/ tests / test_panel.py

# -*- coding: utf-8 -*-
# pylint: disable=W0612,E1101

from datetime import datetime
import operator
from warnings import catch_warnings, simplefilter

import numpy as np
import pytest

from pandas.compat import OrderedDict, StringIO, lrange, range, signature
import pandas.util._test_decorators as td

from pandas.core.dtypes.common import is_float_dtype

from pandas import (
    DataFrame, Index, MultiIndex, Series, compat, date_range, isna, notna)
from pandas.core.nanops import nanall, nanany
import pandas.core.panel as panelm
from pandas.core.panel import Panel
import pandas.util.testing as tm
from pandas.util.testing import (
    assert_almost_equal, assert_frame_equal, assert_panel_equal,
    assert_series_equal, ensure_clean, makeCustomDataframe as mkdf,
    makeMixedDataFrame)

from pandas.io.formats.printing import pprint_thing
from pandas.tseries.offsets import BDay, MonthEnd


def make_test_panel():
    with catch_warnings(record=True):
        simplefilter("ignore", FutureWarning)
        _panel = tm.makePanel()
        tm.add_nans(_panel)
        _panel = _panel.copy()
    return _panel


@pytest.mark.filterwarnings("ignore:\\nPanel:FutureWarning")
class PanelTests(object):
    panel = None

    def test_pickle(self):
        unpickled = tm.round_trip_pickle(self.panel)
        assert_frame_equal(unpickled['ItemA'], self.panel['ItemA'])

    def test_rank(self):
        pytest.raises(NotImplementedError, lambda: self.panel.rank())

    def test_cumsum(self):
        cumsum = self.panel.cumsum()
        assert_frame_equal(cumsum['ItemA'], self.panel['ItemA'].cumsum())

    def not_hashable(self):
        c_empty = Panel()
        c = Panel(Panel([[[1]]]))
        pytest.raises(TypeError, hash, c_empty)
        pytest.raises(TypeError, hash, c)


@pytest.mark.filterwarnings("ignore:\\nPanel:FutureWarning")
class SafeForLongAndSparse(object):

    def test_repr(self):
        repr(self.panel)

    def test_copy_names(self):
        for attr in ('major_axis', 'minor_axis'):
            getattr(self.panel, attr).name = None
            cp = self.panel.copy()
            getattr(cp, attr).name = 'foo'
            assert getattr(self.panel, attr).name is None

    def test_iter(self):
        tm.equalContents(list(self.panel), self.panel.items)

    def test_count(self):
        f = lambda s: notna(s).sum()
        self._check_stat_op('count', f, obj=self.panel, has_skipna=False)

    def test_sum(self):
        self._check_stat_op('sum', np.sum, skipna_alternative=np.nansum)

    def test_mean(self):
        self._check_stat_op('mean', np.mean)

    def test_prod(self):
        self._check_stat_op('prod', np.prod, skipna_alternative=np.nanprod)

    @pytest.mark.filterwarnings("ignore:Invalid value:RuntimeWarning")
    @pytest.mark.filterwarnings("ignore:All-NaN:RuntimeWarning")
    def test_median(self):
        def wrapper(x):
            if isna(x).any():
                return np.nan
            return np.median(x)

        self._check_stat_op('median', wrapper)

    @pytest.mark.filterwarnings("ignore:Invalid value:RuntimeWarning")
    def test_min(self):
        self._check_stat_op('min', np.min)

    @pytest.mark.filterwarnings("ignore:Invalid value:RuntimeWarning")
    def test_max(self):
        self._check_stat_op('max', np.max)

    @td.skip_if_no_scipy
    def test_skew(self):
        from scipy.stats import skew

        def this_skew(x):
            if len(x) < 3:
                return np.nan
            return skew(x, bias=False)

        self._check_stat_op('skew', this_skew)

    def test_var(self):
        def alt(x):
            if len(x) < 2:
                return np.nan
            return np.var(x, ddof=1)

        self._check_stat_op('var', alt)

    def test_std(self):
        def alt(x):
            if len(x) < 2:
                return np.nan
            return np.std(x, ddof=1)

        self._check_stat_op('std', alt)

    def test_sem(self):
        def alt(x):
            if len(x) < 2:
                return np.nan
            return np.std(x, ddof=1) / np.sqrt(len(x))

        self._check_stat_op('sem', alt)

    def _check_stat_op(self, name, alternative, obj=None, has_skipna=True,
                       skipna_alternative=None):
        if obj is None:
            obj = self.panel

            # # set some NAs
            # obj.loc[5:10] = np.nan
            # obj.loc[15:20, -2:] = np.nan

        f = getattr(obj, name)

        if has_skipna:

            skipna_wrapper = tm._make_skipna_wrapper(alternative,
                                                     skipna_alternative)

            def wrapper(x):
                return alternative(np.asarray(x))

            for i in range(obj.ndim):
                result = f(axis=i, skipna=False)
                assert_frame_equal(result, obj.apply(wrapper, axis=i))
        else:
            skipna_wrapper = alternative
            wrapper = alternative

        for i in range(obj.ndim):
            result = f(axis=i)
            if name in ['sum', 'prod']:
                assert_frame_equal(result, obj.apply(skipna_wrapper, axis=i))

        pytest.raises(Exception, f, axis=obj.ndim)

        # Unimplemented numeric_only parameter.
        if 'numeric_only' in signature(f).args:
            with pytest.raises(NotImplementedError, match=name):
                f(numeric_only=True)


@pytest.mark.filterwarnings("ignore:\\nPanel:FutureWarning")
class SafeForSparse(object):

    def test_get_axis(self):
        assert (self.panel._get_axis(0) is self.panel.items)
        assert (self.panel._get_axis(1) is self.panel.major_axis)
        assert (self.panel._get_axis(2) is self.panel.minor_axis)

    def test_set_axis(self):
        new_items = Index(np.arange(len(self.panel.items)))
        new_major = Index(np.arange(len(self.panel.major_axis)))
        new_minor = Index(np.arange(len(self.panel.minor_axis)))

        # ensure propagate to potentially prior-cached items too
        item = self.panel['ItemA']
        self.panel.items = new_items

        if hasattr(self.panel, '_item_cache'):
            assert 'ItemA' not in self.panel._item_cache
        assert self.panel.items is new_items

        # TODO: unused?
        item = self.panel[0]  # noqa

        self.panel.major_axis = new_major
        assert self.panel[0].index is new_major
        assert self.panel.major_axis is new_major

        # TODO: unused?
        item = self.panel[0]  # noqa

        self.panel.minor_axis = new_minor
        assert self.panel[0].columns is new_minor
        assert self.panel.minor_axis is new_minor

    def test_get_axis_number(self):
        assert self.panel._get_axis_number('items') == 0
        assert self.panel._get_axis_number('major') == 1
        assert self.panel._get_axis_number('minor') == 2

        with pytest.raises(ValueError, match="No axis named foo"):
            self.panel._get_axis_number('foo')

        with pytest.raises(ValueError, match="No axis named foo"):
            self.panel.__ge__(self.panel, axis='foo')

    def test_get_axis_name(self):
        assert self.panel._get_axis_name(0) == 'items'
        assert self.panel._get_axis_name(1) == 'major_axis'
        assert self.panel._get_axis_name(2) == 'minor_axis'

    def test_get_plane_axes(self):
        # what to do here?

        index, columns = self.panel._get_plane_axes('items')
        index, columns = self.panel._get_plane_axes('major_axis')
        index, columns = self.panel._get_plane_axes('minor_axis')
        index, columns = self.panel._get_plane_axes(0)

    def test_truncate(self):
        dates = self.panel.major_axis
        start, end = dates[1], dates[5]

        trunced = self.panel.truncate(start, end, axis='major')
        expected = self.panel['ItemA'].truncate(start, end)

        assert_frame_equal(trunced['ItemA'], expected)

        trunced = self.panel.truncate(before=start, axis='major')
        expected = self.panel['ItemA'].truncate(before=start)

        assert_frame_equal(trunced['ItemA'], expected)

        trunced = self.panel.truncate(after=end, axis='major')
        expected = self.panel['ItemA'].truncate(after=end)

        assert_frame_equal(trunced['ItemA'], expected)

    def test_arith(self):
        self._test_op(self.panel, operator.add)
        self._test_op(self.panel, operator.sub)
        self._test_op(self.panel, operator.mul)
        self._test_op(self.panel, operator.truediv)
        self._test_op(self.panel, operator.floordiv)
        self._test_op(self.panel, operator.pow)

        self._test_op(self.panel, lambda x, y: y + x)
        self._test_op(self.panel, lambda x, y: y - x)
        self._test_op(self.panel, lambda x, y: y * x)
        self._test_op(self.panel, lambda x, y: y / x)
        self._test_op(self.panel, lambda x, y: y ** x)

        self._test_op(self.panel, lambda x, y: x + y)  # panel + 1
        self._test_op(self.panel, lambda x, y: x - y)  # panel - 1
        self._test_op(self.panel, lambda x, y: x * y)  # panel * 1
        self._test_op(self.panel, lambda x, y: x / y)  # panel / 1
        self._test_op(self.panel, lambda x, y: x ** y)  # panel ** 1

        pytest.raises(Exception, self.panel.__add__,
                      self.panel['ItemA'])

    @staticmethod
    def _test_op(panel, op):
        result = op(panel, 1)
        assert_frame_equal(result['ItemA'], op(panel['ItemA'], 1))

    def test_keys(self):
        tm.equalContents(list(self.panel.keys()), self.panel.items)

    def test_iteritems(self):
        # Test panel.iteritems(), aka panel.iteritems()
        # just test that it works
        for k, v in self.panel.iteritems():
            pass

        assert len(list(self.panel.iteritems())) == len(self.panel.items)

    def test_combineFrame(self):
        def check_op(op, name):
            # items
            df = self.panel['ItemA']

            func = getattr(self.panel, name)

            result = func(df, axis='items')

            assert_frame_equal(
                result['ItemB'], op(self.panel['ItemB'], df))

            # major
            xs = self.panel.major_xs(self.panel.major_axis[0])
            result = func(xs, axis='major')

            idx = self.panel.major_axis[1]

            assert_frame_equal(result.major_xs(idx),
                               op(self.panel.major_xs(idx), xs))

            # minor
            xs = self.panel.minor_xs(self.panel.minor_axis[0])
            result = func(xs, axis='minor')

            idx = self.panel.minor_axis[1]

            assert_frame_equal(result.minor_xs(idx),
                               op(self.panel.minor_xs(idx), xs))

        ops = ['add', 'sub', 'mul', 'truediv', 'floordiv', 'pow', 'mod']
        if not compat.PY3:
            ops.append('div')

        for op in ops:
            try:
                check_op(getattr(operator, op), op)
            except AttributeError:
                pprint_thing("Failing operation: %r" % op)
                raise
        if compat.PY3:
            try:
                check_op(operator.truediv, 'div')
            except AttributeError:
                pprint_thing("Failing operation: %r" % 'div')
                raise
Loading ...