Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
numba / tests / test_array_exprs.py
Size: Mime:
from __future__ import print_function, division, absolute_import

import gc

import numpy as np

from numba import njit, vectorize
from numba import unittest_support as unittest
from numba import compiler, typing, typeof, ir, utils
from numba.compiler import Pipeline, _PipelineManager, Flags
from numba.targets import cpu
from .support import MemoryLeakMixin, TestCase


class Namespace(dict):
    def __getattr__(s, k):
        return s[k] if k in s else super(Namespace, s).__getattr__(k)

def axy(a, x, y):
    return a * x + y

def ax2(a, x, y):
    return a * x + y

def pos_root(As, Bs, Cs):
    return (-Bs + (((Bs ** 2.) - (4. * As * Cs)) ** 0.5)) / (2. * As)

def neg_root_common_subexpr(As, Bs, Cs):
    _2As = 2. * As
    _4AsCs = 2. * _2As * Cs
    _Bs2_4AsCs = (Bs ** 2. - _4AsCs)
    return (-Bs - (_Bs2_4AsCs ** 0.5)) / _2As

def neg_root_complex_subexpr(As, Bs, Cs):
    _2As = 2. * As
    _4AsCs = 2. * _2As * Cs
    _Bs2_4AsCs = (Bs ** 2. - _4AsCs) + 0j # Force into the complex domain.
    return (-Bs - (_Bs2_4AsCs ** 0.5)) / _2As

vaxy = vectorize(axy)

def call_stuff(a0, a1):
    return np.cos(vaxy(a0, np.sin(a1) - 1., 1.))

def are_roots_imaginary(As, Bs, Cs):
    return (Bs ** 2 - 4 * As * Cs) < 0

def div_add(As, Bs, Cs):
    return As / Bs + Cs

def cube(As):
    return As ** 3

def explicit_output(a, b, out):
    np.cos(a, out)
    return np.add(out, b, out)

def variable_name_reuse(a, b, c, d):
    u = a + b
    u = u - a * b
    u = u * c + d
    return u


# From issue #1264
def distance_matrix(vectors):
    n_vectors = vectors.shape[0]
    result = np.empty((n_vectors, n_vectors), dtype=np.float64)

    for i in range(n_vectors):
        for j in range(i, n_vectors):
            result[i,j] = result[j,i] = np.sum(
                (vectors[i] - vectors[j]) ** 2) ** 0.5

    return result


class RewritesTester(Pipeline):
    @classmethod
    def mk_pipeline(cls, args, return_type=None, flags=None, locals={},
                    library=None, typing_context=None, target_context=None):
        if not flags:
            flags = Flags()
        flags.nrt = True
        if typing_context is None:
            typing_context = typing.Context()
        if target_context is None:
            target_context = cpu.CPUContext(typing_context)
        return cls(typing_context, target_context, library, args, return_type,
                   flags, locals)

    @classmethod
    def mk_no_rw_pipeline(cls, args, return_type=None, flags=None, locals={},
                          library=None, **kws):
        if not flags:
            flags = Flags()
        flags.no_rewrites = True
        return cls.mk_pipeline(args, return_type, flags, locals, library, **kws)


class TestArrayExpressions(MemoryLeakMixin, TestCase):

    def _compile_function(self, fn, arg_tys):
        """
        Compile the given function both without and with rewrites enabled.
        """
        control_pipeline = RewritesTester.mk_no_rw_pipeline(arg_tys)
        cres_0 = control_pipeline.compile_extra(fn)
        control_cfunc = cres_0.entry_point

        test_pipeline = RewritesTester.mk_pipeline(arg_tys)
        cres_1 = test_pipeline.compile_extra(fn)
        test_cfunc = cres_1.entry_point

        return control_pipeline, control_cfunc, test_pipeline, test_cfunc

    def test_simple_expr(self):
        '''
        Using a simple array expression, verify that rewriting is taking
        place, and is fusing loops.
        '''
        A = np.linspace(0,1,10)
        X = np.linspace(2,1,10)
        Y = np.linspace(1,2,10)
        arg_tys = [typeof(arg) for arg in (A, X, Y)]

        control_pipeline, nb_axy_0, test_pipeline, nb_axy_1 = \
            self._compile_function(axy, arg_tys)

        control_pipeline2 = RewritesTester.mk_no_rw_pipeline(arg_tys)
        cres_2 = control_pipeline2.compile_extra(ax2)
        nb_ctl = cres_2.entry_point

        expected = nb_axy_0(A, X, Y)
        actual = nb_axy_1(A, X, Y)
        control = nb_ctl(A, X, Y)
        np.testing.assert_array_equal(expected, actual)
        np.testing.assert_array_equal(control, actual)

        ir0 = control_pipeline.func_ir.blocks
        ir1 = test_pipeline.func_ir.blocks
        ir2 = control_pipeline2.func_ir.blocks
        self.assertEqual(len(ir0), len(ir1))
        self.assertEqual(len(ir0), len(ir2))
        # The rewritten IR should be smaller than the original.
        self.assertGreater(len(ir0[0].body), len(ir1[0].body))
        self.assertEqual(len(ir0[0].body), len(ir2[0].body))

    def _get_array_exprs(self, block):
        for instr in block:
            if isinstance(instr, ir.Assign):
                if isinstance(instr.value, ir.Expr):
                    if instr.value.op == 'arrayexpr':
                        yield instr

    def _array_expr_to_set(self, expr, out=None):
        '''
        Convert an array expression tree into a set of operators.
        '''
        if out is None:
            out = set()
        if not isinstance(expr, tuple):
            raise ValueError("{0} not a tuple".format(expr))
        operation, operands = expr
        processed_operands = []
        for operand in operands:
            if isinstance(operand, tuple):
                operand, _ = self._array_expr_to_set(operand, out)
            processed_operands.append(operand)
        processed_expr = operation, tuple(processed_operands)
        out.add(processed_expr)
        return processed_expr, out

    def _test_root_function(self, fn=pos_root):
        A = np.random.random(10)
        B = np.random.random(10) + 1. # Increase likelihood of real
                                      # root (could add 2 to force all
                                      # roots to be real).
        C = np.random.random(10)
        arg_tys = [typeof(arg) for arg in (A, B, C)]

        control_pipeline = RewritesTester.mk_no_rw_pipeline(arg_tys)
        control_cres = control_pipeline.compile_extra(fn)
        nb_fn_0 = control_cres.entry_point

        test_pipeline = RewritesTester.mk_pipeline(arg_tys)
        test_cres = test_pipeline.compile_extra(fn)
        nb_fn_1 = test_cres.entry_point

        np_result = fn(A, B, C)
        nb_result_0 = nb_fn_0(A, B, C)
        nb_result_1 = nb_fn_1(A, B, C)
        np.testing.assert_array_almost_equal(np_result, nb_result_0)
        np.testing.assert_array_almost_equal(nb_result_0, nb_result_1)

        return Namespace(locals())

    def _test_cube_function(self, fn=cube):
        A = np.arange(10, dtype=np.float64)
        arg_tys = (typeof(A),)

        control_pipeline = RewritesTester.mk_no_rw_pipeline(arg_tys)
        control_cres = control_pipeline.compile_extra(fn)
        nb_fn_0 = control_cres.entry_point

        test_pipeline = RewritesTester.mk_pipeline(arg_tys)
        test_cres = test_pipeline.compile_extra(fn)
        nb_fn_1 = test_cres.entry_point

        expected = A ** 3
        self.assertPreciseEqual(expected, nb_fn_0(A))
        self.assertPreciseEqual(expected, nb_fn_1(A))

        return Namespace(locals())

    def _test_explicit_output_function(self, fn):
        """
        Test function having a (a, b, out) signature where *out* is
        an output array the function writes into.
        """
        A = np.arange(10, dtype=np.float64)
        B = A + 1
        arg_tys = (typeof(A),) * 3

        control_pipeline, control_cfunc, test_pipeline, test_cfunc = \
            self._compile_function(fn, arg_tys)

        def run_func(fn):
            out = np.zeros_like(A)
            fn(A, B, out)
            return out

        expected = run_func(fn)
        self.assertPreciseEqual(expected, run_func(control_cfunc))
        self.assertPreciseEqual(expected, run_func(test_cfunc))

        return Namespace(locals())

    def _assert_array_exprs(self, block, expected_count):
        """
        Assert the *block* has the expected number of array expressions
        in it.
        """
        rewrite_count = len(list(self._get_array_exprs(block)))
        self.assertEqual(rewrite_count, expected_count)

    def _assert_total_rewrite(self, control_ir, test_ir, trivial=False):
        """
        Given two dictionaries of Numba IR blocks, check to make sure the
        control IR has no array expressions, while the test IR
        contains one and only one.
        """
        # Both IRs have the same number of blocks (presumably 1)
        self.assertEqual(len(control_ir), len(test_ir))
        control_block = control_ir[0].body
        test_block = test_ir[0].body
        self._assert_array_exprs(control_block, 0)
        self._assert_array_exprs(test_block, 1)
        if not trivial:
            # If the expression wasn't trivial, the block length should
            # have decreased (since a sequence of exprs was replaced
            # with a single nested array expr).
            self.assertGreater(len(control_block), len(test_block))

    def _assert_no_rewrite(self, control_ir, test_ir):
        """
        Given two dictionaries of Numba IR blocks, check to make sure
        the control IR and the test IR both have no array expressions.
        """
        self.assertEqual(len(control_ir), len(test_ir))
        # All blocks should be identical, and not rewritten
        for k, v in control_ir.items():
            control_block = v.body
            test_block = test_ir[k].body
            self.assertEqual(len(control_block), len(test_block))
            self._assert_array_exprs(control_block, 0)
            self._assert_array_exprs(test_block, 0)

    def test_trivial_expr(self):
        """
        Ensure even a non-nested expression is rewritten, as it can enable
        scalar optimizations such as rewriting `x ** 2`.
        """
        ns = self._test_cube_function()
        self._assert_total_rewrite(ns.control_pipeline.func_ir.blocks,
                                   ns.test_pipeline.func_ir.blocks,
                                   trivial=True)

    def test_complicated_expr(self):
        '''
        Using the polynomial root function, ensure the full expression is
        being put in the same kernel with no remnants of intermediate
        array expressions.
        '''
        ns = self._test_root_function()
        self._assert_total_rewrite(ns.control_pipeline.func_ir.blocks,
                                   ns.test_pipeline.func_ir.blocks)

    def test_common_subexpressions(self, fn=neg_root_common_subexpr):
        '''
        Attempt to verify that rewriting will incorporate user common
        subexpressions properly.
        '''
        ns = self._test_root_function(fn)
        ir0 = ns.control_pipeline.func_ir.blocks
        ir1 = ns.test_pipeline.func_ir.blocks
        self.assertEqual(len(ir0), len(ir1))
        self.assertGreater(len(ir0[0].body), len(ir1[0].body))
        self.assertEqual(len(list(self._get_array_exprs(ir0[0].body))), 0)
        # Verify that we didn't rewrite everything into a monolithic
        # array expression since we stored temporary values in
        # variables that might be used later (from the optimization's
        # point of view).
        array_expr_instrs = list(self._get_array_exprs(ir1[0].body))
        self.assertGreater(len(array_expr_instrs), 1)
        # Now check that we haven't duplicated any subexpressions in
        # the rewritten code.
        array_sets = list(self._array_expr_to_set(instr.value.expr)[1]
                          for instr in array_expr_instrs)
        for expr_set_0, expr_set_1 in zip(array_sets[:-1], array_sets[1:]):
            intersections = expr_set_0.intersection(expr_set_1)
            if intersections:
                self.fail("Common subexpressions detected in array "
                          "expressions ({0})".format(intersections))

    def test_complex_subexpression(self):
        return self.test_common_subexpressions(neg_root_complex_subexpr)

    def test_ufunc_and_dufunc_calls(self):
        '''
        Verify that ufunc and DUFunc calls are being properly included in
        array expressions.
        '''
        A = np.random.random(10)
        B = np.random.random(10)
        arg_tys = [typeof(arg) for arg in (A, B)]

        vaxy_descr = vaxy._dispatcher.targetdescr
        control_pipeline = RewritesTester.mk_no_rw_pipeline(
            arg_tys,
            typing_context=vaxy_descr.typing_context,
            target_context=vaxy_descr.target_context)
        cres_0 = control_pipeline.compile_extra(call_stuff)
        nb_call_stuff_0 = cres_0.entry_point

        test_pipeline = RewritesTester.mk_pipeline(
            arg_tys,
            typing_context=vaxy_descr.typing_context,
            target_context=vaxy_descr.target_context)
        cres_1 = test_pipeline.compile_extra(call_stuff)
        nb_call_stuff_1 = cres_1.entry_point

        expected = call_stuff(A, B)
        control = nb_call_stuff_0(A, B)
        actual = nb_call_stuff_1(A, B)
        np.testing.assert_array_almost_equal(expected, control)
        np.testing.assert_array_almost_equal(expected, actual)

        self._assert_total_rewrite(control_pipeline.func_ir.blocks,
                                   test_pipeline.func_ir.blocks)

    def test_cmp_op(self):
        '''
        Verify that comparison operators are supported by the rewriter.
        '''
        ns = self._test_root_function(are_roots_imaginary)
        self._assert_total_rewrite(ns.control_pipeline.func_ir.blocks,
                                   ns.test_pipeline.func_ir.blocks)

    def test_explicit_output(self):
        """
        Check that ufunc calls with explicit outputs are not rewritten.
        """
        ns = self._test_explicit_output_function(explicit_output)
        self._assert_no_rewrite(ns.control_pipeline.func_ir.blocks,
                                ns.test_pipeline.func_ir.blocks)


class TestRewriteIssues(MemoryLeakMixin, TestCase):

    def test_issue_1184(self):
        from numba import jit
        import numpy as np

        @jit(nopython=True)
        def foo(arr):
            return arr

        @jit(nopython=True)
        def bar(arr):
            c = foo(arr)
            d = foo(arr)   # two calls to trigger rewrite
            return c, d

        arr = np.arange(10)
        out_c, out_d = bar(arr)
        self.assertIs(out_c, out_d)
        self.assertIs(out_c, arr)

    def test_issue_1264(self):
        n = 100
        x = np.random.uniform(size=n*3).reshape((n,3))
        expected = distance_matrix(x)
        actual = njit(distance_matrix)(x)
        np.testing.assert_array_almost_equal(expected, actual)
        # Avoid sporadic failures in MemoryLeakMixin.tearDown()
        gc.collect()

    def test_issue_1372(self):
        """Test array expression with duplicated term"""
        from numba import njit

        @njit
        def foo(a, b):
            b = np.sin(b)
            return b + b + a

        a = np.random.uniform(10)
        b = np.random.uniform(10)
        expect = foo.py_func(a, b)
        got = foo(a, b)
        np.testing.assert_allclose(got, expect)

    def test_unary_arrayexpr(self):
        """
        Typing of unary array expression (np.negate) can be incorrect.
        """
        @njit
        def foo(a, b):
            return b - a + -a

        b = 1.5
        a = np.arange(10, dtype=np.int32)

        expect = foo.py_func(a, b)
        got = foo(a, b)
        self.assertPreciseEqual(got, expect)

    def test_bitwise_arrayexpr(self):
        """
        Typing of bitwise boolean array expression can be incorrect
        (issue #1813).
        """
        @njit
        def foo(a, b):
            return ~(a & (~b))

        a = np.array([True, True, False, False])
        b = np.array([False, True, False, True])

        expect = foo.py_func(a, b)
        got = foo(a, b)
        self.assertPreciseEqual(got, expect)

    def test_annotations(self):
        """
        Type annotation of array expressions with disambiguated
        variable names (issue #1466).
        """
        cfunc = njit(variable_name_reuse)

        a = np.linspace(0, 1, 10)
        cfunc(a, a, a, a)

        buf = utils.StringIO()
        cfunc.inspect_types(buf)
        res = buf.getvalue()
        self.assertIn("#   u.1 = ", res)
        self.assertIn("#   u.2 = ", res)


class TestSemantics(MemoryLeakMixin, unittest.TestCase):

    def test_division_by_zero(self):
        # Array expressions should follow the Numpy error model
        # i.e. 1./0. returns +inf instead of raising ZeroDivisionError
        pyfunc = div_add
        cfunc = njit(pyfunc)

        a = np.float64([0.0, 1.0, float('inf')])
        b = np.float64([0.0, 0.0, 1.0])
        c = np.ones_like(a)

        expect = pyfunc(a, b, c)
        got = cfunc(a, b, c)
        np.testing.assert_array_equal(expect, got)


if __name__ == "__main__":
    unittest.main()