Repository URL to install this package:
Version:
0.24.2 ▾
|
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta
from distutils.version import LooseVersion
import numpy as np
import pytest
from pandas import DataFrame, Series
from pandas.tests.frame.common import TestData
import pandas.util.testing as tm
from pandas.util.testing import assert_frame_equal
class TestRank(TestData):
s = Series([1, 3, 4, 2, np.nan, 2, 1, 5, np.nan, 3])
df = DataFrame({'A': s, 'B': s})
results = {
'average': np.array([1.5, 5.5, 7.0, 3.5, np.nan,
3.5, 1.5, 8.0, np.nan, 5.5]),
'min': np.array([1, 5, 7, 3, np.nan, 3, 1, 8, np.nan, 5]),
'max': np.array([2, 6, 7, 4, np.nan, 4, 2, 8, np.nan, 6]),
'first': np.array([1, 5, 7, 3, np.nan, 4, 2, 8, np.nan, 6]),
'dense': np.array([1, 3, 4, 2, np.nan, 2, 1, 5, np.nan, 3]),
}
@pytest.fixture(params=['average', 'min', 'max', 'first', 'dense'])
def method(self, request):
"""
Fixture for trying all rank methods
"""
return request.param
def test_rank(self):
rankdata = pytest.importorskip('scipy.stats.rankdata')
self.frame['A'][::2] = np.nan
self.frame['B'][::3] = np.nan
self.frame['C'][::4] = np.nan
self.frame['D'][::5] = np.nan
ranks0 = self.frame.rank()
ranks1 = self.frame.rank(1)
mask = np.isnan(self.frame.values)
fvals = self.frame.fillna(np.inf).values
exp0 = np.apply_along_axis(rankdata, 0, fvals)
exp0[mask] = np.nan
exp1 = np.apply_along_axis(rankdata, 1, fvals)
exp1[mask] = np.nan
tm.assert_almost_equal(ranks0.values, exp0)
tm.assert_almost_equal(ranks1.values, exp1)
# integers
df = DataFrame(np.random.randint(0, 5, size=40).reshape((10, 4)))
result = df.rank()
exp = df.astype(float).rank()
tm.assert_frame_equal(result, exp)
result = df.rank(1)
exp = df.astype(float).rank(1)
tm.assert_frame_equal(result, exp)
def test_rank2(self):
df = DataFrame([[1, 3, 2], [1, 2, 3]])
expected = DataFrame([[1.0, 3.0, 2.0], [1, 2, 3]]) / 3.0
result = df.rank(1, pct=True)
tm.assert_frame_equal(result, expected)
df = DataFrame([[1, 3, 2], [1, 2, 3]])
expected = df.rank(0) / 2.0
result = df.rank(0, pct=True)
tm.assert_frame_equal(result, expected)
df = DataFrame([['b', 'c', 'a'], ['a', 'c', 'b']])
expected = DataFrame([[2.0, 3.0, 1.0], [1, 3, 2]])
result = df.rank(1, numeric_only=False)
tm.assert_frame_equal(result, expected)
expected = DataFrame([[2.0, 1.5, 1.0], [1, 1.5, 2]])
result = df.rank(0, numeric_only=False)
tm.assert_frame_equal(result, expected)
df = DataFrame([['b', np.nan, 'a'], ['a', 'c', 'b']])
expected = DataFrame([[2.0, np.nan, 1.0], [1.0, 3.0, 2.0]])
result = df.rank(1, numeric_only=False)
tm.assert_frame_equal(result, expected)
expected = DataFrame([[2.0, np.nan, 1.0], [1.0, 1.0, 2.0]])
result = df.rank(0, numeric_only=False)
tm.assert_frame_equal(result, expected)
# f7u12, this does not work without extensive workaround
data = [[datetime(2001, 1, 5), np.nan, datetime(2001, 1, 2)],
[datetime(2000, 1, 2), datetime(2000, 1, 3),
datetime(2000, 1, 1)]]
df = DataFrame(data)
# check the rank
expected = DataFrame([[2., np.nan, 1.],
[2., 3., 1.]])
result = df.rank(1, numeric_only=False, ascending=True)
tm.assert_frame_equal(result, expected)
expected = DataFrame([[1., np.nan, 2.],
[2., 1., 3.]])
result = df.rank(1, numeric_only=False, ascending=False)
tm.assert_frame_equal(result, expected)
# mixed-type frames
self.mixed_frame['datetime'] = datetime.now()
self.mixed_frame['timedelta'] = timedelta(days=1, seconds=1)
result = self.mixed_frame.rank(1)
expected = self.mixed_frame.rank(1, numeric_only=True)
tm.assert_frame_equal(result, expected)
df = DataFrame({"a": [1e-20, -5, 1e-20 + 1e-40, 10,
1e60, 1e80, 1e-30]})
exp = DataFrame({"a": [3.5, 1., 3.5, 5., 6., 7., 2.]})
tm.assert_frame_equal(df.rank(), exp)
def test_rank_na_option(self):
rankdata = pytest.importorskip('scipy.stats.rankdata')
self.frame['A'][::2] = np.nan
self.frame['B'][::3] = np.nan
self.frame['C'][::4] = np.nan
self.frame['D'][::5] = np.nan
# bottom
ranks0 = self.frame.rank(na_option='bottom')
ranks1 = self.frame.rank(1, na_option='bottom')
fvals = self.frame.fillna(np.inf).values
exp0 = np.apply_along_axis(rankdata, 0, fvals)
exp1 = np.apply_along_axis(rankdata, 1, fvals)
tm.assert_almost_equal(ranks0.values, exp0)
tm.assert_almost_equal(ranks1.values, exp1)
# top
ranks0 = self.frame.rank(na_option='top')
ranks1 = self.frame.rank(1, na_option='top')
fval0 = self.frame.fillna((self.frame.min() - 1).to_dict()).values
fval1 = self.frame.T
fval1 = fval1.fillna((fval1.min() - 1).to_dict()).T
fval1 = fval1.fillna(np.inf).values
exp0 = np.apply_along_axis(rankdata, 0, fval0)
exp1 = np.apply_along_axis(rankdata, 1, fval1)
tm.assert_almost_equal(ranks0.values, exp0)
tm.assert_almost_equal(ranks1.values, exp1)
# descending
# bottom
ranks0 = self.frame.rank(na_option='top', ascending=False)
ranks1 = self.frame.rank(1, na_option='top', ascending=False)
fvals = self.frame.fillna(np.inf).values
exp0 = np.apply_along_axis(rankdata, 0, -fvals)
exp1 = np.apply_along_axis(rankdata, 1, -fvals)
tm.assert_almost_equal(ranks0.values, exp0)
tm.assert_almost_equal(ranks1.values, exp1)
# descending
# top
ranks0 = self.frame.rank(na_option='bottom', ascending=False)
ranks1 = self.frame.rank(1, na_option='bottom', ascending=False)
fval0 = self.frame.fillna((self.frame.min() - 1).to_dict()).values
fval1 = self.frame.T
fval1 = fval1.fillna((fval1.min() - 1).to_dict()).T
fval1 = fval1.fillna(np.inf).values
exp0 = np.apply_along_axis(rankdata, 0, -fval0)
exp1 = np.apply_along_axis(rankdata, 1, -fval1)
tm.assert_numpy_array_equal(ranks0.values, exp0)
tm.assert_numpy_array_equal(ranks1.values, exp1)
# bad values throw error
msg = "na_option must be one of 'keep', 'top', or 'bottom'"
with pytest.raises(ValueError, match=msg):
self.frame.rank(na_option='bad', ascending=False)
# invalid type
with pytest.raises(ValueError, match=msg):
self.frame.rank(na_option=True, ascending=False)
def test_rank_axis(self):
# check if using axes' names gives the same result
df = DataFrame([[2, 1], [4, 3]])
tm.assert_frame_equal(df.rank(axis=0), df.rank(axis='index'))
tm.assert_frame_equal(df.rank(axis=1), df.rank(axis='columns'))
def test_rank_methods_frame(self):
pytest.importorskip('scipy.stats.special')
rankdata = pytest.importorskip('scipy.stats.rankdata')
import scipy
xs = np.random.randint(0, 21, (100, 26))
xs = (xs - 10.0) / 10.0
cols = [chr(ord('z') - i) for i in range(xs.shape[1])]
for vals in [xs, xs + 1e6, xs * 1e-6]:
df = DataFrame(vals, columns=cols)
for ax in [0, 1]:
for m in ['average', 'min', 'max', 'first', 'dense']:
result = df.rank(axis=ax, method=m)
sprank = np.apply_along_axis(
rankdata, ax, vals,
m if m != 'first' else 'ordinal')
sprank = sprank.astype(np.float64)
expected = DataFrame(sprank, columns=cols)
if (LooseVersion(scipy.__version__) >=
LooseVersion('0.17.0')):
expected = expected.astype('float64')
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize('dtype', ['O', 'f8', 'i8'])
def test_rank_descending(self, method, dtype):
if 'i' in dtype:
df = self.df.dropna()
else:
df = self.df.astype(dtype)
res = df.rank(ascending=False)
expected = (df.max() - df).rank()
assert_frame_equal(res, expected)
if method == 'first' and dtype == 'O':
return
expected = (df.max() - df).rank(method=method)
if dtype != 'O':
res2 = df.rank(method=method, ascending=False,
numeric_only=True)
assert_frame_equal(res2, expected)
res3 = df.rank(method=method, ascending=False,
numeric_only=False)
assert_frame_equal(res3, expected)
@pytest.mark.parametrize('axis', [0, 1])
@pytest.mark.parametrize('dtype', [None, object])
def test_rank_2d_tie_methods(self, method, axis, dtype):
df = self.df
def _check2d(df, expected, method='average', axis=0):
exp_df = DataFrame({'A': expected, 'B': expected})
if axis == 1:
df = df.T
exp_df = exp_df.T
result = df.rank(method=method, axis=axis)
assert_frame_equal(result, exp_df)
disabled = {(object, 'first')}
if (dtype, method) in disabled:
return
frame = df if dtype is None else df.astype(dtype)
_check2d(frame, self.results[method], method=method, axis=axis)
@pytest.mark.parametrize(
"method,exp", [("dense",
[[1., 1., 1.],
[1., 0.5, 2. / 3],
[1., 0.5, 1. / 3]]),
("min",
[[1. / 3, 1., 1.],
[1. / 3, 1. / 3, 2. / 3],
[1. / 3, 1. / 3, 1. / 3]]),
("max",
[[1., 1., 1.],
[1., 2. / 3, 2. / 3],
[1., 2. / 3, 1. / 3]]),
("average",
[[2. / 3, 1., 1.],
[2. / 3, 0.5, 2. / 3],
[2. / 3, 0.5, 1. / 3]]),
("first",
[[1. / 3, 1., 1.],
[2. / 3, 1. / 3, 2. / 3],
[3. / 3, 2. / 3, 1. / 3]])])
def test_rank_pct_true(self, method, exp):
# see gh-15630.
df = DataFrame([[2012, 66, 3], [2012, 65, 2], [2012, 65, 1]])
result = df.rank(method=method, pct=True)
expected = DataFrame(exp)
tm.assert_frame_equal(result, expected)
@pytest.mark.single
def test_pct_max_many_rows(self):
# GH 18271
df = DataFrame({'A': np.arange(2**24 + 1),
'B': np.arange(2**24 + 1, 0, -1)})
result = df.rank(pct=True).max()
assert (result == 1).all()