# -*- coding: utf-8 -*-
# pylint: disable=W0612,E1101
from datetime import datetime
import operator
from warnings import catch_warnings, simplefilter
import numpy as np
import pytest
from pandas.compat import OrderedDict, StringIO, lrange, range, signature
import pandas.util._test_decorators as td
from pandas.core.dtypes.common import is_float_dtype
from pandas import (
DataFrame, Index, MultiIndex, Series, compat, date_range, isna, notna)
from pandas.core.nanops import nanall, nanany
import pandas.core.panel as panelm
from pandas.core.panel import Panel
import pandas.util.testing as tm
from pandas.util.testing import (
assert_almost_equal, assert_frame_equal, assert_panel_equal,
assert_series_equal, ensure_clean, makeCustomDataframe as mkdf,
makeMixedDataFrame)
from pandas.io.formats.printing import pprint_thing
from pandas.tseries.offsets import BDay, MonthEnd
def make_test_panel():
with catch_warnings(record=True):
simplefilter("ignore", FutureWarning)
_panel = tm.makePanel()
tm.add_nans(_panel)
_panel = _panel.copy()
return _panel
@pytest.mark.filterwarnings("ignore:\\nPanel:FutureWarning")
class PanelTests(object):
panel = None
def test_pickle(self):
unpickled = tm.round_trip_pickle(self.panel)
assert_frame_equal(unpickled['ItemA'], self.panel['ItemA'])
def test_rank(self):
pytest.raises(NotImplementedError, lambda: self.panel.rank())
def test_cumsum(self):
cumsum = self.panel.cumsum()
assert_frame_equal(cumsum['ItemA'], self.panel['ItemA'].cumsum())
def not_hashable(self):
c_empty = Panel()
c = Panel(Panel([[[1]]]))
pytest.raises(TypeError, hash, c_empty)
pytest.raises(TypeError, hash, c)
@pytest.mark.filterwarnings("ignore:\\nPanel:FutureWarning")
class SafeForLongAndSparse(object):
def test_repr(self):
repr(self.panel)
def test_copy_names(self):
for attr in ('major_axis', 'minor_axis'):
getattr(self.panel, attr).name = None
cp = self.panel.copy()
getattr(cp, attr).name = 'foo'
assert getattr(self.panel, attr).name is None
def test_iter(self):
tm.equalContents(list(self.panel), self.panel.items)
def test_count(self):
f = lambda s: notna(s).sum()
self._check_stat_op('count', f, obj=self.panel, has_skipna=False)
def test_sum(self):
self._check_stat_op('sum', np.sum, skipna_alternative=np.nansum)
def test_mean(self):
self._check_stat_op('mean', np.mean)
def test_prod(self):
self._check_stat_op('prod', np.prod, skipna_alternative=np.nanprod)
@pytest.mark.filterwarnings("ignore:Invalid value:RuntimeWarning")
@pytest.mark.filterwarnings("ignore:All-NaN:RuntimeWarning")
def test_median(self):
def wrapper(x):
if isna(x).any():
return np.nan
return np.median(x)
self._check_stat_op('median', wrapper)
@pytest.mark.filterwarnings("ignore:Invalid value:RuntimeWarning")
def test_min(self):
self._check_stat_op('min', np.min)
@pytest.mark.filterwarnings("ignore:Invalid value:RuntimeWarning")
def test_max(self):
self._check_stat_op('max', np.max)
@td.skip_if_no_scipy
def test_skew(self):
from scipy.stats import skew
def this_skew(x):
if len(x) < 3:
return np.nan
return skew(x, bias=False)
self._check_stat_op('skew', this_skew)
def test_var(self):
def alt(x):
if len(x) < 2:
return np.nan
return np.var(x, ddof=1)
self._check_stat_op('var', alt)
def test_std(self):
def alt(x):
if len(x) < 2:
return np.nan
return np.std(x, ddof=1)
self._check_stat_op('std', alt)
def test_sem(self):
def alt(x):
if len(x) < 2:
return np.nan
return np.std(x, ddof=1) / np.sqrt(len(x))
self._check_stat_op('sem', alt)
def _check_stat_op(self, name, alternative, obj=None, has_skipna=True,
skipna_alternative=None):
if obj is None:
obj = self.panel
# # set some NAs
# obj.loc[5:10] = np.nan
# obj.loc[15:20, -2:] = np.nan
f = getattr(obj, name)
if has_skipna:
skipna_wrapper = tm._make_skipna_wrapper(alternative,
skipna_alternative)
def wrapper(x):
return alternative(np.asarray(x))
for i in range(obj.ndim):
result = f(axis=i, skipna=False)
assert_frame_equal(result, obj.apply(wrapper, axis=i))
else:
skipna_wrapper = alternative
wrapper = alternative
for i in range(obj.ndim):
result = f(axis=i)
if name in ['sum', 'prod']:
assert_frame_equal(result, obj.apply(skipna_wrapper, axis=i))
pytest.raises(Exception, f, axis=obj.ndim)
# Unimplemented numeric_only parameter.
if 'numeric_only' in signature(f).args:
with pytest.raises(NotImplementedError, match=name):
f(numeric_only=True)
@pytest.mark.filterwarnings("ignore:\\nPanel:FutureWarning")
class SafeForSparse(object):
def test_get_axis(self):
assert (self.panel._get_axis(0) is self.panel.items)
assert (self.panel._get_axis(1) is self.panel.major_axis)
assert (self.panel._get_axis(2) is self.panel.minor_axis)
def test_set_axis(self):
new_items = Index(np.arange(len(self.panel.items)))
new_major = Index(np.arange(len(self.panel.major_axis)))
new_minor = Index(np.arange(len(self.panel.minor_axis)))
# ensure propagate to potentially prior-cached items too
item = self.panel['ItemA']
self.panel.items = new_items
if hasattr(self.panel, '_item_cache'):
assert 'ItemA' not in self.panel._item_cache
assert self.panel.items is new_items
# TODO: unused?
item = self.panel[0] # noqa
self.panel.major_axis = new_major
assert self.panel[0].index is new_major
assert self.panel.major_axis is new_major
# TODO: unused?
item = self.panel[0] # noqa
self.panel.minor_axis = new_minor
assert self.panel[0].columns is new_minor
assert self.panel.minor_axis is new_minor
def test_get_axis_number(self):
assert self.panel._get_axis_number('items') == 0
assert self.panel._get_axis_number('major') == 1
assert self.panel._get_axis_number('minor') == 2
with pytest.raises(ValueError, match="No axis named foo"):
self.panel._get_axis_number('foo')
with pytest.raises(ValueError, match="No axis named foo"):
self.panel.__ge__(self.panel, axis='foo')
def test_get_axis_name(self):
assert self.panel._get_axis_name(0) == 'items'
assert self.panel._get_axis_name(1) == 'major_axis'
assert self.panel._get_axis_name(2) == 'minor_axis'
def test_get_plane_axes(self):
# what to do here?
index, columns = self.panel._get_plane_axes('items')
index, columns = self.panel._get_plane_axes('major_axis')
index, columns = self.panel._get_plane_axes('minor_axis')
index, columns = self.panel._get_plane_axes(0)
def test_truncate(self):
dates = self.panel.major_axis
start, end = dates[1], dates[5]
trunced = self.panel.truncate(start, end, axis='major')
expected = self.panel['ItemA'].truncate(start, end)
assert_frame_equal(trunced['ItemA'], expected)
trunced = self.panel.truncate(before=start, axis='major')
expected = self.panel['ItemA'].truncate(before=start)
assert_frame_equal(trunced['ItemA'], expected)
trunced = self.panel.truncate(after=end, axis='major')
expected = self.panel['ItemA'].truncate(after=end)
assert_frame_equal(trunced['ItemA'], expected)
def test_arith(self):
self._test_op(self.panel, operator.add)
self._test_op(self.panel, operator.sub)
self._test_op(self.panel, operator.mul)
self._test_op(self.panel, operator.truediv)
self._test_op(self.panel, operator.floordiv)
self._test_op(self.panel, operator.pow)
self._test_op(self.panel, lambda x, y: y + x)
self._test_op(self.panel, lambda x, y: y - x)
self._test_op(self.panel, lambda x, y: y * x)
self._test_op(self.panel, lambda x, y: y / x)
self._test_op(self.panel, lambda x, y: y ** x)
self._test_op(self.panel, lambda x, y: x + y) # panel + 1
self._test_op(self.panel, lambda x, y: x - y) # panel - 1
self._test_op(self.panel, lambda x, y: x * y) # panel * 1
self._test_op(self.panel, lambda x, y: x / y) # panel / 1
self._test_op(self.panel, lambda x, y: x ** y) # panel ** 1
pytest.raises(Exception, self.panel.__add__,
self.panel['ItemA'])
@staticmethod
def _test_op(panel, op):
result = op(panel, 1)
assert_frame_equal(result['ItemA'], op(panel['ItemA'], 1))
def test_keys(self):
tm.equalContents(list(self.panel.keys()), self.panel.items)
def test_iteritems(self):
# Test panel.iteritems(), aka panel.iteritems()
# just test that it works
for k, v in self.panel.iteritems():
pass
assert len(list(self.panel.iteritems())) == len(self.panel.items)
def test_combineFrame(self):
def check_op(op, name):
# items
df = self.panel['ItemA']
func = getattr(self.panel, name)
result = func(df, axis='items')
assert_frame_equal(
result['ItemB'], op(self.panel['ItemB'], df))
# major
xs = self.panel.major_xs(self.panel.major_axis[0])
result = func(xs, axis='major')
idx = self.panel.major_axis[1]
assert_frame_equal(result.major_xs(idx),
op(self.panel.major_xs(idx), xs))
# minor
xs = self.panel.minor_xs(self.panel.minor_axis[0])
result = func(xs, axis='minor')
idx = self.panel.minor_axis[1]
assert_frame_equal(result.minor_xs(idx),
op(self.panel.minor_xs(idx), xs))
ops = ['add', 'sub', 'mul', 'truediv', 'floordiv', 'pow', 'mod']
if not compat.PY3:
ops.append('div')
for op in ops:
try:
check_op(getattr(operator, op), op)
except AttributeError:
pprint_thing("Failing operation: %r" % op)
raise
if compat.PY3:
try:
check_op(operator.truediv, 'div')
except AttributeError:
pprint_thing("Failing operation: %r" % 'div')
raise
Loading ...