Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aaronreidsmith / pandas   python

Repository URL to install this package:

Version: 0.25.3 

/ tests / test_expressions.py

import operator
import re

import numpy as np
from numpy.random import randn
import pytest

from pandas.core.api import DataFrame
from pandas.core.computation import expressions as expr
import pandas.util.testing as tm
from pandas.util.testing import (
    assert_almost_equal,
    assert_frame_equal,
    assert_series_equal,
)

from pandas.io.formats.printing import pprint_thing

_frame = DataFrame(randn(10000, 4), columns=list("ABCD"), dtype="float64")
_frame2 = DataFrame(randn(100, 4), columns=list("ABCD"), dtype="float64")
_mixed = DataFrame(
    {
        "A": _frame["A"].copy(),
        "B": _frame["B"].astype("float32"),
        "C": _frame["C"].astype("int64"),
        "D": _frame["D"].astype("int32"),
    }
)
_mixed2 = DataFrame(
    {
        "A": _frame2["A"].copy(),
        "B": _frame2["B"].astype("float32"),
        "C": _frame2["C"].astype("int64"),
        "D": _frame2["D"].astype("int32"),
    }
)
_integer = DataFrame(
    np.random.randint(1, 100, size=(10001, 4)), columns=list("ABCD"), dtype="int64"
)
_integer2 = DataFrame(
    np.random.randint(1, 100, size=(101, 4)), columns=list("ABCD"), dtype="int64"
)


@pytest.mark.skipif(not expr._USE_NUMEXPR, reason="not using numexpr")
class TestExpressions:
    def setup_method(self, method):

        self.frame = _frame.copy()
        self.frame2 = _frame2.copy()
        self.mixed = _mixed.copy()
        self.mixed2 = _mixed2.copy()
        self.integer = _integer.copy()
        self._MIN_ELEMENTS = expr._MIN_ELEMENTS

    def teardown_method(self, method):
        expr._MIN_ELEMENTS = self._MIN_ELEMENTS

    def run_arithmetic(self, df, other, assert_func, check_dtype=False, test_flex=True):
        expr._MIN_ELEMENTS = 0
        operations = ["add", "sub", "mul", "mod", "truediv", "floordiv"]
        for arith in operations:

            operator_name = arith
            if arith == "div":
                operator_name = "truediv"

            if test_flex:
                op = lambda x, y: getattr(x, arith)(y)
                op.__name__ = arith
            else:
                op = getattr(operator, operator_name)
            expr.set_use_numexpr(False)
            expected = op(df, other)
            expr.set_use_numexpr(True)

            result = op(df, other)
            try:
                if check_dtype:
                    if arith == "truediv":
                        assert expected.dtype.kind == "f"
                assert_func(expected, result)
            except Exception:
                pprint_thing("Failed test with operator {op.__name__!r}".format(op=op))
                raise

    def test_integer_arithmetic(self):
        self.run_arithmetic(self.integer, self.integer, assert_frame_equal)
        self.run_arithmetic(
            self.integer.iloc[:, 0],
            self.integer.iloc[:, 0],
            assert_series_equal,
            check_dtype=True,
        )

    def run_binary(
        self,
        df,
        other,
        assert_func,
        test_flex=False,
        numexpr_ops={"gt", "lt", "ge", "le", "eq", "ne"},
    ):
        """
        tests solely that the result is the same whether or not numexpr is
        enabled.  Need to test whether the function does the correct thing
        elsewhere.
        """
        expr._MIN_ELEMENTS = 0
        expr.set_test_mode(True)
        operations = ["gt", "lt", "ge", "le", "eq", "ne"]

        for arith in operations:
            if test_flex:
                op = lambda x, y: getattr(df, arith)(y)
                op.__name__ = arith
            else:
                op = getattr(operator, arith)
            expr.set_use_numexpr(False)
            expected = op(df, other)
            expr.set_use_numexpr(True)
            expr.get_test_result()
            result = op(df, other)
            used_numexpr = expr.get_test_result()
            try:
                if arith in numexpr_ops:
                    assert used_numexpr, "Did not use numexpr as expected."
                else:
                    assert not used_numexpr, "Used numexpr unexpectedly."
                assert_func(expected, result)
            except Exception:
                pprint_thing("Failed test with operation {arith!r}".format(arith=arith))
                pprint_thing("test_flex was {test_flex!r}".format(test_flex=test_flex))
                raise

    def run_frame(self, df, other, binary_comp=None, run_binary=True, **kwargs):
        self.run_arithmetic(df, other, assert_frame_equal, test_flex=False, **kwargs)
        self.run_arithmetic(df, other, assert_frame_equal, test_flex=True, **kwargs)
        if run_binary:
            if binary_comp is None:
                expr.set_use_numexpr(False)
                binary_comp = other + 1
                expr.set_use_numexpr(True)
            self.run_binary(
                df, binary_comp, assert_frame_equal, test_flex=False, **kwargs
            )
            self.run_binary(
                df, binary_comp, assert_frame_equal, test_flex=True, **kwargs
            )

    def run_series(self, ser, other, binary_comp=None, **kwargs):
        self.run_arithmetic(ser, other, assert_series_equal, test_flex=False, **kwargs)
        self.run_arithmetic(ser, other, assert_almost_equal, test_flex=True, **kwargs)
        # series doesn't uses vec_compare instead of numexpr...
        # if binary_comp is None:
        #     binary_comp = other + 1
        # self.run_binary(ser, binary_comp, assert_frame_equal,
        # test_flex=False, **kwargs)
        # self.run_binary(ser, binary_comp, assert_frame_equal,
        # test_flex=True, **kwargs)

    def test_integer_arithmetic_frame(self):
        self.run_frame(self.integer, self.integer)

    def test_integer_arithmetic_series(self):
        self.run_series(self.integer.iloc[:, 0], self.integer.iloc[:, 0])

    def test_float_arithemtic_frame(self):
        self.run_frame(self.frame2, self.frame2)

    def test_float_arithmetic_series(self):
        self.run_series(self.frame2.iloc[:, 0], self.frame2.iloc[:, 0])

    def test_mixed_arithmetic_frame(self):
        # TODO: FIGURE OUT HOW TO GET IT TO WORK...
        # can't do arithmetic because comparison methods try to do *entire*
        # frame instead of by-column
        self.run_frame(self.mixed2, self.mixed2, run_binary=False)

    def test_mixed_arithmetic_series(self):
        for col in self.mixed2.columns:
            self.run_series(self.mixed2[col], self.mixed2[col], binary_comp=4)

    def test_float_arithemtic(self):
        self.run_arithmetic(self.frame, self.frame, assert_frame_equal)
        self.run_arithmetic(
            self.frame.iloc[:, 0],
            self.frame.iloc[:, 0],
            assert_series_equal,
            check_dtype=True,
        )

    def test_mixed_arithmetic(self):
        self.run_arithmetic(self.mixed, self.mixed, assert_frame_equal)
        for col in self.mixed.columns:
            self.run_arithmetic(self.mixed[col], self.mixed[col], assert_series_equal)

    def test_integer_with_zeros(self):
        self.integer *= np.random.randint(0, 2, size=np.shape(self.integer))
        self.run_arithmetic(self.integer, self.integer, assert_frame_equal)
        self.run_arithmetic(
            self.integer.iloc[:, 0], self.integer.iloc[:, 0], assert_series_equal
        )

    def test_invalid(self):

        # no op
        result = expr._can_use_numexpr(
            operator.add, None, self.frame, self.frame, "evaluate"
        )
        assert not result

        # mixed
        result = expr._can_use_numexpr(
            operator.add, "+", self.mixed, self.frame, "evaluate"
        )
        assert not result

        # min elements
        result = expr._can_use_numexpr(
            operator.add, "+", self.frame2, self.frame2, "evaluate"
        )
        assert not result

        # ok, we only check on first part of expression
        result = expr._can_use_numexpr(
            operator.add, "+", self.frame, self.frame2, "evaluate"
        )
        assert result

    def test_binary_ops(self):
        def testit():

            for f, f2 in [(self.frame, self.frame2), (self.mixed, self.mixed2)]:

                for op, op_str in [
                    ("add", "+"),
                    ("sub", "-"),
                    ("mul", "*"),
                    ("div", "/"),
                    ("pow", "**"),
                ]:

                    if op == "pow":
                        continue

                    if op == "div":
                        op = getattr(operator, "truediv", None)
                    else:
                        op = getattr(operator, op, None)
                    if op is not None:
                        result = expr._can_use_numexpr(op, op_str, f, f, "evaluate")
                        assert result != f._is_mixed_type

                        result = expr.evaluate(op, op_str, f, f, use_numexpr=True)
                        expected = expr.evaluate(op, op_str, f, f, use_numexpr=False)

                        if isinstance(result, DataFrame):
                            tm.assert_frame_equal(result, expected)
                        else:
                            tm.assert_numpy_array_equal(result, expected.values)

                        result = expr._can_use_numexpr(op, op_str, f2, f2, "evaluate")
                        assert not result

        expr.set_use_numexpr(False)
        testit()
        expr.set_use_numexpr(True)
        expr.set_numexpr_threads(1)
        testit()
        expr.set_numexpr_threads()
        testit()

    def test_boolean_ops(self):
        def testit():
            for f, f2 in [(self.frame, self.frame2), (self.mixed, self.mixed2)]:

                f11 = f
                f12 = f + 1

                f21 = f2
                f22 = f2 + 1

                for op, op_str in [
                    ("gt", ">"),
                    ("lt", "<"),
                    ("ge", ">="),
                    ("le", "<="),
                    ("eq", "=="),
                    ("ne", "!="),
                ]:

                    op = getattr(operator, op)

                    result = expr._can_use_numexpr(op, op_str, f11, f12, "evaluate")
                    assert result != f11._is_mixed_type

                    result = expr.evaluate(op, op_str, f11, f12, use_numexpr=True)
                    expected = expr.evaluate(op, op_str, f11, f12, use_numexpr=False)
                    if isinstance(result, DataFrame):
                        tm.assert_frame_equal(result, expected)
                    else:
                        tm.assert_numpy_array_equal(result, expected.values)

                    result = expr._can_use_numexpr(op, op_str, f21, f22, "evaluate")
                    assert not result

        expr.set_use_numexpr(False)
        testit()
        expr.set_use_numexpr(True)
        expr.set_numexpr_threads(1)
        testit()
        expr.set_numexpr_threads()
        testit()

    def test_where(self):
        def testit():
            for f in [self.frame, self.frame2, self.mixed, self.mixed2]:

                for cond in [True, False]:
                    c = np.empty(f.shape, dtype=np.bool_)
                    c.fill(cond)
                    result = expr.where(c, f.values, f.values + 1)
                    expected = np.where(c, f.values, f.values + 1)
                    tm.assert_numpy_array_equal(result, expected)

        expr.set_use_numexpr(False)
        testit()
        expr.set_use_numexpr(True)
        expr.set_numexpr_threads(1)
        testit()
        expr.set_numexpr_threads()
        testit()

    def test_bool_ops_raise_on_arithmetic(self):
        df = DataFrame({"a": np.random.rand(10) > 0.5, "b": np.random.rand(10) > 0.5})
        names = "truediv", "floordiv", "pow"
        ops = "/", "//", "**"
        msg = "operator %r not implemented for bool dtypes"
        for op, name in zip(ops, names):
            f = getattr(operator, name)
            err_msg = re.escape(msg % op)

            with pytest.raises(NotImplementedError, match=err_msg):
                f(df, df)

            with pytest.raises(NotImplementedError, match=err_msg):
                f(df.a, df.b)

            with pytest.raises(NotImplementedError, match=err_msg):
                f(df.a, True)

            with pytest.raises(NotImplementedError, match=err_msg):
                f(False, df.a)

            with pytest.raises(NotImplementedError, match=err_msg):
                f(False, df)

            with pytest.raises(NotImplementedError, match=err_msg):
                f(df, True)

    def test_bool_ops_warn_on_arithmetic(self):
        n = 10
        df = DataFrame({"a": np.random.rand(n) > 0.5, "b": np.random.rand(n) > 0.5})
        names = "add", "mul", "sub"
        ops = "+", "*", "-"
        subs = {"+": "|", "*": "&", "-": "^"}
        sub_funcs = {"|": "or_", "&": "and_", "^": "xor"}
        for op, name in zip(ops, names):
            f = getattr(operator, name)
            fe = getattr(operator, sub_funcs[subs[op]])

            if op == "-":
                # raises TypeError
                continue

            with tm.use_numexpr(True, min_elements=5):
                with tm.assert_produces_warning(check_stacklevel=False):
                    r = f(df, df)
                    e = fe(df, df)
                    tm.assert_frame_equal(r, e)

                with tm.assert_produces_warning(check_stacklevel=False):
                    r = f(df.a, df.b)
                    e = fe(df.a, df.b)
                    tm.assert_series_equal(r, e)

                with tm.assert_produces_warning(check_stacklevel=False):
                    r = f(df.a, True)
                    e = fe(df.a, True)
                    tm.assert_series_equal(r, e)

                with tm.assert_produces_warning(check_stacklevel=False):
                    r = f(False, df.a)
                    e = fe(False, df.a)
                    tm.assert_series_equal(r, e)

                with tm.assert_produces_warning(check_stacklevel=False):
                    r = f(False, df)
                    e = fe(False, df)
                    tm.assert_frame_equal(r, e)

                with tm.assert_produces_warning(check_stacklevel=False):
                    r = f(df, True)
                    e = fe(df, True)
                    tm.assert_frame_equal(r, e)

    @pytest.mark.parametrize(
        "test_input,expected",
        [
            (
                DataFrame(
                    [[0, 1, 2, "aa"], [0, 1, 2, "aa"]], columns=["a", "b", "c", "dtype"]
                ),
                DataFrame([[False, False], [False, False]], columns=["a", "dtype"]),
            ),
            (
                DataFrame(
                    [[0, 3, 2, "aa"], [0, 4, 2, "aa"], [0, 1, 1, "bb"]],
                    columns=["a", "b", "c", "dtype"],
                ),
                DataFrame(
                    [[False, False], [False, False], [False, False]],
                    columns=["a", "dtype"],
                ),
            ),
        ],
    )
    def test_bool_ops_column_name_dtype(self, test_input, expected):
        # GH 22383 - .ne fails if columns containing column name 'dtype'
        result = test_input.loc[:, ["a", "dtype"]].ne(test_input.loc[:, ["a", "dtype"]])
        assert_frame_equal(result, expected)

    @pytest.mark.parametrize(
        "arith", ("add", "sub", "mul", "mod", "truediv", "floordiv")
    )
    @pytest.mark.parametrize("axis", (0, 1))
    def test_frame_series_axis(self, axis, arith):
        # GH#26736 Dataframe.floordiv(Series, axis=1) fails
        if axis == 1 and arith == "floordiv":
            pytest.xfail("'floordiv' does not succeed with axis=1 #27636")

        df = self.frame
        if axis == 1:
            other = self.frame.iloc[0, :]
        else:
            other = self.frame.iloc[:, 0]

        expr._MIN_ELEMENTS = 0

        op_func = getattr(df, arith)

        expr.set_use_numexpr(False)
        expected = op_func(other, axis=axis)
        expr.set_use_numexpr(True)

        result = op_func(other, axis=axis)
        assert_frame_equal(expected, result)