# -*- coding: utf-8 -*-
from __future__ import division, print_function
from functools import partial
import warnings
import numpy as np
import pytest
from pandas.compat.numpy import _np_version_under1p13
import pandas.util._test_decorators as td
from pandas.core.dtypes.common import is_integer_dtype
import pandas as pd
from pandas import Series, isna
from pandas.core.arrays import DatetimeArray
import pandas.core.nanops as nanops
import pandas.util.testing as tm
use_bn = nanops._USE_BOTTLENECK
class TestnanopsDataFrame(object):
def setup_method(self, method):
np.random.seed(11235)
nanops._USE_BOTTLENECK = False
arr_shape = (11, 7, 5)
self.arr_float = np.random.randn(*arr_shape)
self.arr_float1 = np.random.randn(*arr_shape)
self.arr_complex = self.arr_float + self.arr_float1 * 1j
self.arr_int = np.random.randint(-10, 10, arr_shape)
self.arr_bool = np.random.randint(0, 2, arr_shape) == 0
self.arr_str = np.abs(self.arr_float).astype('S')
self.arr_utf = np.abs(self.arr_float).astype('U')
self.arr_date = np.random.randint(0, 20000,
arr_shape).astype('M8[ns]')
self.arr_tdelta = np.random.randint(0, 20000,
arr_shape).astype('m8[ns]')
self.arr_nan = np.tile(np.nan, arr_shape)
self.arr_float_nan = np.vstack([self.arr_float, self.arr_nan])
self.arr_float1_nan = np.vstack([self.arr_float1, self.arr_nan])
self.arr_nan_float1 = np.vstack([self.arr_nan, self.arr_float1])
self.arr_nan_nan = np.vstack([self.arr_nan, self.arr_nan])
self.arr_inf = self.arr_float * np.inf
self.arr_float_inf = np.vstack([self.arr_float, self.arr_inf])
self.arr_nan_inf = np.vstack([self.arr_nan, self.arr_inf])
self.arr_float_nan_inf = np.vstack([self.arr_float, self.arr_nan,
self.arr_inf])
self.arr_nan_nan_inf = np.vstack([self.arr_nan, self.arr_nan,
self.arr_inf])
self.arr_obj = np.vstack([
self.arr_float.astype('O'),
self.arr_int.astype('O'),
self.arr_bool.astype('O'),
self.arr_complex.astype('O'),
self.arr_str.astype('O'),
self.arr_utf.astype('O'),
self.arr_date.astype('O'),
self.arr_tdelta.astype('O')
])
with np.errstate(invalid='ignore'):
self.arr_nan_nanj = self.arr_nan + self.arr_nan * 1j
self.arr_complex_nan = np.vstack([self.arr_complex,
self.arr_nan_nanj])
self.arr_nan_infj = self.arr_inf * 1j
self.arr_complex_nan_infj = np.vstack([self.arr_complex,
self.arr_nan_infj])
self.arr_float_2d = self.arr_float[:, :, 0]
self.arr_float1_2d = self.arr_float1[:, :, 0]
self.arr_nan_2d = self.arr_nan[:, :, 0]
self.arr_float_nan_2d = self.arr_float_nan[:, :, 0]
self.arr_float1_nan_2d = self.arr_float1_nan[:, :, 0]
self.arr_nan_float1_2d = self.arr_nan_float1[:, :, 0]
self.arr_float_1d = self.arr_float[:, 0, 0]
self.arr_float1_1d = self.arr_float1[:, 0, 0]
self.arr_nan_1d = self.arr_nan[:, 0, 0]
self.arr_float_nan_1d = self.arr_float_nan[:, 0, 0]
self.arr_float1_nan_1d = self.arr_float1_nan[:, 0, 0]
self.arr_nan_float1_1d = self.arr_nan_float1[:, 0, 0]
def teardown_method(self, method):
nanops._USE_BOTTLENECK = use_bn
def check_results(self, targ, res, axis, check_dtype=True):
res = getattr(res, 'asm8', res)
res = getattr(res, 'values', res)
# timedeltas are a beast here
def _coerce_tds(targ, res):
if hasattr(targ, 'dtype') and targ.dtype == 'm8[ns]':
if len(targ) == 1:
targ = targ[0].item()
res = res.item()
else:
targ = targ.view('i8')
return targ, res
try:
if axis != 0 and hasattr(
targ, 'shape') and targ.ndim and targ.shape != res.shape:
res = np.split(res, [targ.shape[0]], axis=0)[0]
except (ValueError, IndexError):
targ, res = _coerce_tds(targ, res)
try:
tm.assert_almost_equal(targ, res, check_dtype=check_dtype)
except AssertionError:
# handle timedelta dtypes
if hasattr(targ, 'dtype') and targ.dtype == 'm8[ns]':
targ, res = _coerce_tds(targ, res)
tm.assert_almost_equal(targ, res, check_dtype=check_dtype)
return
# There are sometimes rounding errors with
# complex and object dtypes.
# If it isn't one of those, re-raise the error.
if not hasattr(res, 'dtype') or res.dtype.kind not in ['c', 'O']:
raise
# convert object dtypes to something that can be split into
# real and imaginary parts
if res.dtype.kind == 'O':
if targ.dtype.kind != 'O':
res = res.astype(targ.dtype)
else:
try:
res = res.astype('c16')
except RuntimeError:
res = res.astype('f8')
try:
targ = targ.astype('c16')
except RuntimeError:
targ = targ.astype('f8')
# there should never be a case where numpy returns an object
# but nanops doesn't, so make that an exception
elif targ.dtype.kind == 'O':
raise
tm.assert_almost_equal(targ.real, res.real,
check_dtype=check_dtype)
tm.assert_almost_equal(targ.imag, res.imag,
check_dtype=check_dtype)
def check_fun_data(self, testfunc, targfunc, testarval, targarval,
targarnanval, check_dtype=True, empty_targfunc=None,
**kwargs):
for axis in list(range(targarval.ndim)) + [None]:
for skipna in [False, True]:
targartempval = targarval if skipna else targarnanval
if skipna and empty_targfunc and isna(targartempval).all():
targ = empty_targfunc(targartempval, axis=axis, **kwargs)
else:
targ = targfunc(targartempval, axis=axis, **kwargs)
try:
res = testfunc(testarval, axis=axis, skipna=skipna,
**kwargs)
self.check_results(targ, res, axis,
check_dtype=check_dtype)
if skipna:
res = testfunc(testarval, axis=axis, **kwargs)
self.check_results(targ, res, axis,
check_dtype=check_dtype)
if axis is None:
res = testfunc(testarval, skipna=skipna, **kwargs)
self.check_results(targ, res, axis,
check_dtype=check_dtype)
if skipna and axis is None:
res = testfunc(testarval, **kwargs)
self.check_results(targ, res, axis,
check_dtype=check_dtype)
except BaseException as exc:
exc.args += ('axis: %s of %s' % (axis, testarval.ndim - 1),
'skipna: %s' % skipna, 'kwargs: %s' % kwargs)
raise
if testarval.ndim <= 1:
return
try:
testarval2 = np.take(testarval, 0, axis=-1)
targarval2 = np.take(targarval, 0, axis=-1)
targarnanval2 = np.take(targarnanval, 0, axis=-1)
except ValueError:
return
self.check_fun_data(testfunc, targfunc, testarval2, targarval2,
targarnanval2, check_dtype=check_dtype,
empty_targfunc=empty_targfunc, **kwargs)
def check_fun(self, testfunc, targfunc, testar, targar=None,
targarnan=None, empty_targfunc=None, **kwargs):
if targar is None:
targar = testar
if targarnan is None:
targarnan = testar
testarval = getattr(self, testar)
targarval = getattr(self, targar)
targarnanval = getattr(self, targarnan)
try:
self.check_fun_data(testfunc, targfunc, testarval, targarval,
targarnanval, empty_targfunc=empty_targfunc,
**kwargs)
except BaseException as exc:
exc.args += ('testar: %s' % testar, 'targar: %s' % targar,
'targarnan: %s' % targarnan)
raise
def check_funs(self, testfunc, targfunc, allow_complex=True,
allow_all_nan=True, allow_str=True, allow_date=True,
allow_tdelta=True, allow_obj=True, **kwargs):
self.check_fun(testfunc, targfunc, 'arr_float', **kwargs)
self.check_fun(testfunc, targfunc, 'arr_float_nan', 'arr_float',
**kwargs)
self.check_fun(testfunc, targfunc, 'arr_int', **kwargs)
self.check_fun(testfunc, targfunc, 'arr_bool', **kwargs)
objs = [self.arr_float.astype('O'), self.arr_int.astype('O'),
self.arr_bool.astype('O')]
if allow_all_nan:
self.check_fun(testfunc, targfunc, 'arr_nan', **kwargs)
if allow_complex:
self.check_fun(testfunc, targfunc, 'arr_complex', **kwargs)
self.check_fun(testfunc, targfunc, 'arr_complex_nan',
'arr_complex', **kwargs)
if allow_all_nan:
self.check_fun(testfunc, targfunc, 'arr_nan_nanj', **kwargs)
objs += [self.arr_complex.astype('O')]
if allow_str:
self.check_fun(testfunc, targfunc, 'arr_str', **kwargs)
self.check_fun(testfunc, targfunc, 'arr_utf', **kwargs)
objs += [self.arr_str.astype('O'), self.arr_utf.astype('O')]
if allow_date:
try:
targfunc(self.arr_date)
except TypeError:
pass
else:
self.check_fun(testfunc, targfunc, 'arr_date', **kwargs)
objs += [self.arr_date.astype('O')]
if allow_tdelta:
try:
targfunc(self.arr_tdelta)
except TypeError:
pass
else:
self.check_fun(testfunc, targfunc, 'arr_tdelta', **kwargs)
objs += [self.arr_tdelta.astype('O')]
if allow_obj:
self.arr_obj = np.vstack(objs)
# some nanops handle object dtypes better than their numpy
# counterparts, so the numpy functions need to be given something
# else
if allow_obj == 'convert':
targfunc = partial(self._badobj_wrap, func=targfunc,
allow_complex=allow_complex)
self.check_fun(testfunc, targfunc, 'arr_obj', **kwargs)
def _badobj_wrap(self, value, func, allow_complex=True, **kwargs):
if value.dtype.kind == 'O':
if allow_complex:
value = value.astype('c16')
else:
value = value.astype('f8')
return func(value, **kwargs)
def test_nanany(self):
self.check_funs(nanops.nanany, np.any, allow_all_nan=False,
allow_str=False, allow_date=False, allow_tdelta=False)
def test_nanall(self):
self.check_funs(nanops.nanall, np.all, allow_all_nan=False,
allow_str=False, allow_date=False, allow_tdelta=False)
def test_nansum(self):
self.check_funs(nanops.nansum, np.sum, allow_str=False,
allow_date=False, allow_tdelta=True, check_dtype=False,
empty_targfunc=np.nansum)
def test_nanmean(self):
self.check_funs(nanops.nanmean, np.mean, allow_complex=False,
allow_obj=False, allow_str=False, allow_date=False,
allow_tdelta=True)
def test_nanmean_overflow(self):
# GH 10155
# In the previous implementation mean can overflow for int dtypes, it
# is now consistent with numpy
for a in [2 ** 55, -2 ** 55, 20150515061816532]:
s = Series(a, index=range(500), dtype=np.int64)
result = s.mean()
np_result = s.values.mean()
assert result == a
assert result == np_result
assert result.dtype == np.float64
def test_returned_dtype(self):
dtypes = [np.int16, np.int32, np.int64, np.float32, np.float64]
if hasattr(np, 'float128'):
dtypes.append(np.float128)
for dtype in dtypes:
s = Series(range(10), dtype=dtype)
group_a = ['mean', 'std', 'var', 'skew', 'kurt']
group_b = ['min', 'max']
for method in group_a + group_b:
result = getattr(s, method)()
if is_integer_dtype(dtype) and method in group_a:
assert result.dtype == np.float64
else:
assert result.dtype == dtype
def test_nanmedian(self):
with warnings.catch_warnings(record=True):
warnings.simplefilter("ignore", RuntimeWarning)
self.check_funs(nanops.nanmedian, np.median, allow_complex=False,
allow_str=False, allow_date=False,
allow_tdelta=True, allow_obj='convert')
@pytest.mark.parametrize('ddof', range(3))
def test_nanvar(self, ddof):
self.check_funs(nanops.nanvar, np.var, allow_complex=False,
allow_str=False, allow_date=False,
allow_tdelta=True, allow_obj='convert', ddof=ddof)
@pytest.mark.parametrize('ddof', range(3))
def test_nanstd(self, ddof):
Loading ...