Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aaronreidsmith / pandas   python

Repository URL to install this package:

Version: 0.25.3 

/ tests / frame / test_rank.py

from datetime import datetime, timedelta

import numpy as np
import pytest

from pandas import DataFrame, Series
import pandas.util.testing as tm
from pandas.util.testing import assert_frame_equal


class TestRank:
    s = Series([1, 3, 4, 2, np.nan, 2, 1, 5, np.nan, 3])
    df = DataFrame({"A": s, "B": s})

    results = {
        "average": np.array([1.5, 5.5, 7.0, 3.5, np.nan, 3.5, 1.5, 8.0, np.nan, 5.5]),
        "min": np.array([1, 5, 7, 3, np.nan, 3, 1, 8, np.nan, 5]),
        "max": np.array([2, 6, 7, 4, np.nan, 4, 2, 8, np.nan, 6]),
        "first": np.array([1, 5, 7, 3, np.nan, 4, 2, 8, np.nan, 6]),
        "dense": np.array([1, 3, 4, 2, np.nan, 2, 1, 5, np.nan, 3]),
    }

    @pytest.fixture(params=["average", "min", "max", "first", "dense"])
    def method(self, request):
        """
        Fixture for trying all rank methods
        """
        return request.param

    def test_rank(self, float_frame):
        rankdata = pytest.importorskip("scipy.stats.rankdata")

        float_frame["A"][::2] = np.nan
        float_frame["B"][::3] = np.nan
        float_frame["C"][::4] = np.nan
        float_frame["D"][::5] = np.nan

        ranks0 = float_frame.rank()
        ranks1 = float_frame.rank(1)
        mask = np.isnan(float_frame.values)

        fvals = float_frame.fillna(np.inf).values

        exp0 = np.apply_along_axis(rankdata, 0, fvals)
        exp0[mask] = np.nan

        exp1 = np.apply_along_axis(rankdata, 1, fvals)
        exp1[mask] = np.nan

        tm.assert_almost_equal(ranks0.values, exp0)
        tm.assert_almost_equal(ranks1.values, exp1)

        # integers
        df = DataFrame(np.random.randint(0, 5, size=40).reshape((10, 4)))

        result = df.rank()
        exp = df.astype(float).rank()
        tm.assert_frame_equal(result, exp)

        result = df.rank(1)
        exp = df.astype(float).rank(1)
        tm.assert_frame_equal(result, exp)

    def test_rank2(self):
        df = DataFrame([[1, 3, 2], [1, 2, 3]])
        expected = DataFrame([[1.0, 3.0, 2.0], [1, 2, 3]]) / 3.0
        result = df.rank(1, pct=True)
        tm.assert_frame_equal(result, expected)

        df = DataFrame([[1, 3, 2], [1, 2, 3]])
        expected = df.rank(0) / 2.0
        result = df.rank(0, pct=True)
        tm.assert_frame_equal(result, expected)

        df = DataFrame([["b", "c", "a"], ["a", "c", "b"]])
        expected = DataFrame([[2.0, 3.0, 1.0], [1, 3, 2]])
        result = df.rank(1, numeric_only=False)
        tm.assert_frame_equal(result, expected)

        expected = DataFrame([[2.0, 1.5, 1.0], [1, 1.5, 2]])
        result = df.rank(0, numeric_only=False)
        tm.assert_frame_equal(result, expected)

        df = DataFrame([["b", np.nan, "a"], ["a", "c", "b"]])
        expected = DataFrame([[2.0, np.nan, 1.0], [1.0, 3.0, 2.0]])
        result = df.rank(1, numeric_only=False)
        tm.assert_frame_equal(result, expected)

        expected = DataFrame([[2.0, np.nan, 1.0], [1.0, 1.0, 2.0]])
        result = df.rank(0, numeric_only=False)
        tm.assert_frame_equal(result, expected)

        # f7u12, this does not work without extensive workaround
        data = [
            [datetime(2001, 1, 5), np.nan, datetime(2001, 1, 2)],
            [datetime(2000, 1, 2), datetime(2000, 1, 3), datetime(2000, 1, 1)],
        ]
        df = DataFrame(data)

        # check the rank
        expected = DataFrame([[2.0, np.nan, 1.0], [2.0, 3.0, 1.0]])
        result = df.rank(1, numeric_only=False, ascending=True)
        tm.assert_frame_equal(result, expected)

        expected = DataFrame([[1.0, np.nan, 2.0], [2.0, 1.0, 3.0]])
        result = df.rank(1, numeric_only=False, ascending=False)
        tm.assert_frame_equal(result, expected)

        df = DataFrame({"a": [1e-20, -5, 1e-20 + 1e-40, 10, 1e60, 1e80, 1e-30]})
        exp = DataFrame({"a": [3.5, 1.0, 3.5, 5.0, 6.0, 7.0, 2.0]})
        tm.assert_frame_equal(df.rank(), exp)

    def test_rank_mixed_frame(self, float_string_frame):
        float_string_frame["datetime"] = datetime.now()
        float_string_frame["timedelta"] = timedelta(days=1, seconds=1)

        result = float_string_frame.rank(1)
        expected = float_string_frame.rank(1, numeric_only=True)
        tm.assert_frame_equal(result, expected)

    def test_rank_na_option(self, float_frame):
        rankdata = pytest.importorskip("scipy.stats.rankdata")

        float_frame["A"][::2] = np.nan
        float_frame["B"][::3] = np.nan
        float_frame["C"][::4] = np.nan
        float_frame["D"][::5] = np.nan

        # bottom
        ranks0 = float_frame.rank(na_option="bottom")
        ranks1 = float_frame.rank(1, na_option="bottom")

        fvals = float_frame.fillna(np.inf).values

        exp0 = np.apply_along_axis(rankdata, 0, fvals)
        exp1 = np.apply_along_axis(rankdata, 1, fvals)

        tm.assert_almost_equal(ranks0.values, exp0)
        tm.assert_almost_equal(ranks1.values, exp1)

        # top
        ranks0 = float_frame.rank(na_option="top")
        ranks1 = float_frame.rank(1, na_option="top")

        fval0 = float_frame.fillna((float_frame.min() - 1).to_dict()).values
        fval1 = float_frame.T
        fval1 = fval1.fillna((fval1.min() - 1).to_dict()).T
        fval1 = fval1.fillna(np.inf).values

        exp0 = np.apply_along_axis(rankdata, 0, fval0)
        exp1 = np.apply_along_axis(rankdata, 1, fval1)

        tm.assert_almost_equal(ranks0.values, exp0)
        tm.assert_almost_equal(ranks1.values, exp1)

        # descending

        # bottom
        ranks0 = float_frame.rank(na_option="top", ascending=False)
        ranks1 = float_frame.rank(1, na_option="top", ascending=False)

        fvals = float_frame.fillna(np.inf).values

        exp0 = np.apply_along_axis(rankdata, 0, -fvals)
        exp1 = np.apply_along_axis(rankdata, 1, -fvals)

        tm.assert_almost_equal(ranks0.values, exp0)
        tm.assert_almost_equal(ranks1.values, exp1)

        # descending

        # top
        ranks0 = float_frame.rank(na_option="bottom", ascending=False)
        ranks1 = float_frame.rank(1, na_option="bottom", ascending=False)

        fval0 = float_frame.fillna((float_frame.min() - 1).to_dict()).values
        fval1 = float_frame.T
        fval1 = fval1.fillna((fval1.min() - 1).to_dict()).T
        fval1 = fval1.fillna(np.inf).values

        exp0 = np.apply_along_axis(rankdata, 0, -fval0)
        exp1 = np.apply_along_axis(rankdata, 1, -fval1)

        tm.assert_numpy_array_equal(ranks0.values, exp0)
        tm.assert_numpy_array_equal(ranks1.values, exp1)

        # bad values throw error
        msg = "na_option must be one of 'keep', 'top', or 'bottom'"

        with pytest.raises(ValueError, match=msg):
            float_frame.rank(na_option="bad", ascending=False)

        # invalid type
        with pytest.raises(ValueError, match=msg):
            float_frame.rank(na_option=True, ascending=False)

    def test_rank_axis(self):
        # check if using axes' names gives the same result
        df = DataFrame([[2, 1], [4, 3]])
        tm.assert_frame_equal(df.rank(axis=0), df.rank(axis="index"))
        tm.assert_frame_equal(df.rank(axis=1), df.rank(axis="columns"))

    def test_rank_methods_frame(self):
        pytest.importorskip("scipy.stats.special")
        rankdata = pytest.importorskip("scipy.stats.rankdata")

        xs = np.random.randint(0, 21, (100, 26))
        xs = (xs - 10.0) / 10.0
        cols = [chr(ord("z") - i) for i in range(xs.shape[1])]

        for vals in [xs, xs + 1e6, xs * 1e-6]:
            df = DataFrame(vals, columns=cols)

            for ax in [0, 1]:
                for m in ["average", "min", "max", "first", "dense"]:
                    result = df.rank(axis=ax, method=m)
                    sprank = np.apply_along_axis(
                        rankdata, ax, vals, m if m != "first" else "ordinal"
                    )
                    sprank = sprank.astype(np.float64)
                    expected = DataFrame(sprank, columns=cols).astype("float64")
                    tm.assert_frame_equal(result, expected)

    @pytest.mark.parametrize("dtype", ["O", "f8", "i8"])
    def test_rank_descending(self, method, dtype):

        if "i" in dtype:
            df = self.df.dropna()
        else:
            df = self.df.astype(dtype)

        res = df.rank(ascending=False)
        expected = (df.max() - df).rank()
        assert_frame_equal(res, expected)

        if method == "first" and dtype == "O":
            return

        expected = (df.max() - df).rank(method=method)

        if dtype != "O":
            res2 = df.rank(method=method, ascending=False, numeric_only=True)
            assert_frame_equal(res2, expected)

        res3 = df.rank(method=method, ascending=False, numeric_only=False)
        assert_frame_equal(res3, expected)

    @pytest.mark.parametrize("axis", [0, 1])
    @pytest.mark.parametrize("dtype", [None, object])
    def test_rank_2d_tie_methods(self, method, axis, dtype):
        df = self.df

        def _check2d(df, expected, method="average", axis=0):
            exp_df = DataFrame({"A": expected, "B": expected})

            if axis == 1:
                df = df.T
                exp_df = exp_df.T

            result = df.rank(method=method, axis=axis)
            assert_frame_equal(result, exp_df)

        disabled = {(object, "first")}
        if (dtype, method) in disabled:
            return
        frame = df if dtype is None else df.astype(dtype)
        _check2d(frame, self.results[method], method=method, axis=axis)

    @pytest.mark.parametrize(
        "method,exp",
        [
            ("dense", [[1.0, 1.0, 1.0], [1.0, 0.5, 2.0 / 3], [1.0, 0.5, 1.0 / 3]]),
            (
                "min",
                [
                    [1.0 / 3, 1.0, 1.0],
                    [1.0 / 3, 1.0 / 3, 2.0 / 3],
                    [1.0 / 3, 1.0 / 3, 1.0 / 3],
                ],
            ),
            (
                "max",
                [[1.0, 1.0, 1.0], [1.0, 2.0 / 3, 2.0 / 3], [1.0, 2.0 / 3, 1.0 / 3]],
            ),
            (
                "average",
                [[2.0 / 3, 1.0, 1.0], [2.0 / 3, 0.5, 2.0 / 3], [2.0 / 3, 0.5, 1.0 / 3]],
            ),
            (
                "first",
                [
                    [1.0 / 3, 1.0, 1.0],
                    [2.0 / 3, 1.0 / 3, 2.0 / 3],
                    [3.0 / 3, 2.0 / 3, 1.0 / 3],
                ],
            ),
        ],
    )
    def test_rank_pct_true(self, method, exp):
        # see gh-15630.

        df = DataFrame([[2012, 66, 3], [2012, 65, 2], [2012, 65, 1]])
        result = df.rank(method=method, pct=True)

        expected = DataFrame(exp)
        tm.assert_frame_equal(result, expected)

    @pytest.mark.single
    @pytest.mark.high_memory
    def test_pct_max_many_rows(self):
        # GH 18271
        df = DataFrame(
            {"A": np.arange(2 ** 24 + 1), "B": np.arange(2 ** 24 + 1, 0, -1)}
        )
        result = df.rank(pct=True).max()
        assert (result == 1).all()