Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

/ python / operator_test / elementwise_op_broadcast_test.py






import unittest

from hypothesis import given, assume, settings
import numpy as np
import operator

from caffe2.proto import caffe2_pb2
from caffe2.python import core, workspace
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial


# TODO(jiayq): make them hypothesis tests for better coverage.
class TestElementwiseBroadcast(serial.SerializedTestCase):

    def __generate_test_cases(self):
        """
        generates a set of test cases

        For each iteration, generates X, Y, args, X_out, Y_out
        where
          X, Y         are test input tensors
          args         is a dictionary of arguments to be passed to
                       core.CreateOperator()
          X_out, Y_out are reshaped versions of X and Y
                       which can be used to calculate the expected
                       result with the operator to be tested
        """
        # Set broadcast and no axis, i.e. broadcasting last dimensions.
        X = np.random.rand(2, 3, 4, 5).astype(np.float32)
        Y = np.random.rand(4, 5).astype(np.float32)
        args = {"broadcast": 1}
        yield X, Y, args, X, Y

        # broadcasting intermediate dimensions
        X = np.random.rand(2, 3, 4, 5).astype(np.float32)
        Y = np.random.rand(3, 4).astype(np.float32)
        args = {"broadcast": 1, "axis": 1}
        yield X, Y, args, X, Y[:, :, np.newaxis]

        # broadcasting the first dimension
        X = np.random.rand(2, 3, 4, 5).astype(np.float32)
        Y = np.random.rand(2).astype(np.float32)
        args = {"broadcast": 1, "axis": 0}
        yield X, Y, args, X, Y[:, np.newaxis, np.newaxis, np.newaxis]

        # broadcasting with single elem dimensions at both ends
        X = np.random.rand(2, 3, 4, 5).astype(np.float32)
        Y = np.random.rand(1, 4, 1).astype(np.float32)
        args = {"broadcast": 1, "axis": 1}
        yield X, Y, args, X, Y

    def __test_binary_op(self, gc, dc, caffe2_op, op_function):
        """
        Args:
            caffe2_op: A string. Name of the caffe operator to test.
            op_function: an actual python operator (e.g. operator.add)
        path_prefix: A string. Optional param used to construct db name or path
            where checkpoint files are are stored.
        """

        for X, Y, op_args, X_out, Y_out in self.__generate_test_cases():
            op = core.CreateOperator(caffe2_op, ["X", "Y"], "out", **op_args)
            workspace.FeedBlob("X", X)
            workspace.FeedBlob("Y", Y)
            workspace.RunOperatorOnce(op)
            out = workspace.FetchBlob("out")
            np.testing.assert_array_almost_equal(out, op_function(X_out, Y_out))
            self.assertDeviceChecks(dc, op, [X, Y], [0])
            self.assertGradientChecks(gc, op, [X, Y], 1, [0])

    @given(**hu.gcs)
    @settings(deadline=1000)
    def test_broadcast_Add(self, gc, dc):
        self.__test_binary_op(gc, dc, "Add", operator.add)

    @given(**hu.gcs)
    @settings(deadline=1000)
    def test_broadcast_Mul(self, gc, dc):
        self.__test_binary_op(gc, dc, "Mul", operator.mul)

    @given(**hu.gcs)
    @settings(deadline=1000)
    def test_broadcast_Sub(self, gc, dc):
        self.__test_binary_op(gc, dc, "Sub", operator.sub)

    @given(**hu.gcs)
    @settings(deadline=1000)
    def test_broadcast_powt(self, gc, dc):
        np.random.seed(101)

        #operator
        def powt_op(X, Y):
            return [np.power(X, Y)]

        #two gradients Y*X^(Y-1) and X^Y * ln(X)
        def powt_grad(g_out, outputs, fwd_inputs):
            [X, Y] = fwd_inputs
            Z = outputs[0]
            return ([Y * np.power(X, Y - 1), Z * np.log(X)] * g_out)

        #1. Set broadcast and no axis, i.e. broadcasting last dimensions.
        X = np.random.rand(2, 3, 4, 5).astype(np.float32) + 1.0
        Y = np.random.rand(4, 5).astype(np.float32) + 2.0

        #two gradients Y*X^(Y-1) and X^Y * ln(X)
        #latter gradient is summed over 1 and 0 dims to account for broadcast
        def powt_grad_broadcast(g_out, outputs, fwd_inputs):
            [GX, GY] = powt_grad(g_out, outputs, fwd_inputs)
            return ([GX, np.sum(np.sum(GY, 1), 0)])

        op = core.CreateOperator("Pow", ["X", "Y"], "Z", broadcast=1)
        self.assertReferenceChecks(device_option=gc,
                                   op=op,
                                   inputs=[X, Y],
                                   reference=powt_op,
                                   output_to_grad="Z",
                                   grad_reference=powt_grad_broadcast)

        #2. broadcasting intermediate dimensions
        X = np.random.rand(2, 3, 4, 5).astype(np.float32) + 1.0
        Y = np.random.rand(3, 4).astype(np.float32) + 2.0

        #pow op with the latter array increased by one dim
        def powt_op_axis1(X, Y):
            return powt_op(X, Y[:, :, np.newaxis])

        #two gradients Y*X^(Y-1) and X^Y * ln(X)
        #latter gradient is summed over 3 and 0 dims to account for broadcast
        def powt_grad_axis1(g_out, outputs, fwd_inputs):
            [X, Y] = fwd_inputs
            [GX, GY] = powt_grad(g_out, outputs, [X, Y[:, :, np.newaxis]])
            return ([GX, np.sum(np.sum(GY, 3), 0)])

        op = core.CreateOperator("Pow", ["X", "Y"], "Z", broadcast=1, axis=1)
        self.assertReferenceChecks(device_option=gc,
                                   op=op,
                                   inputs=[X, Y],
                                   reference=powt_op_axis1,
                                   output_to_grad="Z",
                                   grad_reference=powt_grad_axis1)

        #3. broadcasting the first dimension
        X = np.random.rand(2, 3, 4, 5).astype(np.float32) + 1.0
        Y = np.random.rand(2).astype(np.float32) + 2.0

        #pow op with the latter array increased by one dim
        def powt_op_axis0(X, Y):
            return powt_op(X, Y[:, np.newaxis, np.newaxis, np.newaxis])

        #two gradients Y*X^(Y-1) and X^Y * ln(X)
        #latter gradient is summed over 3, 2 and 1 dims to account for broadcast
        def powt_grad_axis0(g_out, outputs, fwd_inputs):
            [X, Y] = fwd_inputs
            [GX, GY] = powt_grad(g_out,
                                 outputs,
                                 [X, Y[:, np.newaxis, np.newaxis, np.newaxis]])
            return ([GX, np.sum(np.sum(np.sum(GY, 3), 2), 1)])

        op = core.CreateOperator("Pow", ["X", "Y"], "Z", broadcast=1, axis=0)
        self.assertReferenceChecks(device_option=gc,
                                   op=op,
                                   inputs=[X, Y],
                                   reference=powt_op_axis0,
                                   output_to_grad="Z",
                                   grad_reference=powt_grad_axis0)

        #4. broadcasting with single elem dimensions at both ends
        X = np.random.rand(2, 3, 4, 5).astype(np.float32) + 1.0
        Y = np.random.rand(1, 4, 1).astype(np.float32) + 2.0

        #pow op with the latter array increased by one dim
        def powt_op_mixed(X, Y):
            return powt_op(X, Y[np.newaxis, :, :, :])

        #two gradients Y*X^(Y-1) and X^Y * ln(X)
        #latter gradient is summed over 0 and 1 dims to account for broadcast
        def powt_grad_mixed(g_out, outputs, fwd_inputs):
            [X, Y] = fwd_inputs
            [GX, GY] = powt_grad(g_out, outputs, [X, Y[np.newaxis, :, :, :]])
            return ([GX, np.reshape(np.sum(np.sum(np.sum(GY, 3), 1), 0),
                                    (1, 4, 1))])

        op = core.CreateOperator("Pow", ["X", "Y"], "Z", broadcast=1, axis=1)
        self.assertReferenceChecks(device_option=gc,
                                   op=op,
                                   inputs=[X, Y],
                                   reference=powt_op_mixed,
                                   output_to_grad="Z",
                                   grad_reference=powt_grad_mixed)

    @given(**hu.gcs)
    def test_broadcast_scalar(self, gc, dc):
        # broadcasting constant
        X = np.random.rand(2, 3, 4, 5).astype(np.float32)
        Y = np.random.rand(1).astype(np.float32)
        op = core.CreateOperator("Add", ["X", "Y"], "out", broadcast=1)
        workspace.FeedBlob("X", X)
        workspace.FeedBlob("Y", Y)
        workspace.RunOperatorOnce(op)
        out = workspace.FetchBlob("out")
        np.testing.assert_array_almost_equal(
            out, X + Y)
        self.assertDeviceChecks(dc, op, [X, Y], [0])

        # broadcasting scalar
        X = np.random.rand(1).astype(np.float32)
        Y = np.random.rand(1).astype(np.float32).reshape([])
        op = core.CreateOperator("Add", ["X", "Y"], "out", broadcast=1)
        workspace.FeedBlob("X", X)
        workspace.FeedBlob("Y", Y)
        workspace.RunOperatorOnce(op)
        out = workspace.FetchBlob("out")
        np.testing.assert_array_almost_equal(
            out, X + Y)
        self.assertDeviceChecks(dc, op, [X, Y], [0])

    @given(**hu.gcs)
    def test_semantic_broadcast(self, gc, dc):
        # NCHW as default
        X = np.random.rand(2, 3, 4, 5).astype(np.float32)
        Y = np.random.rand(3).astype(np.float32)
        op = core.CreateOperator(
            "Add", ["X", "Y"], "out", broadcast=1, axis_str="C")
        workspace.FeedBlob("X", X)
        workspace.FeedBlob("Y", Y)
        workspace.RunOperatorOnce(op)
        out = workspace.FetchBlob("out")
        np.testing.assert_array_almost_equal(
            out, X + Y[:, np.newaxis, np.newaxis])
        self.assertDeviceChecks(dc, op, [X, Y], [0])

        # NHWC
        X = np.random.rand(2, 3, 4, 5).astype(np.float32)
        Y = np.random.rand(5).astype(np.float32)
        op = core.CreateOperator(
            "Add", ["X", "Y"], "out", broadcast=1, axis_str="C", order="NHWC")
        workspace.FeedBlob("X", X)
        workspace.FeedBlob("Y", Y)
        workspace.RunOperatorOnce(op)
        out = workspace.FetchBlob("out")
        np.testing.assert_array_almost_equal(out, X + Y)
        self.assertDeviceChecks(dc, op, [X, Y], [0])

    @given(**hu.gcs)
    def test_sum_reduce_empty_blob(self, gc, dc):
        net = core.Net('test')

        with core.DeviceScope(gc):
            net.GivenTensorFill([], ["X"], values=[], shape=[2, 0, 5])
            net.GivenTensorFill([], ["Y"], values=[], shape=[2, 0])
            net.SumReduceLike(["X", "Y"], "out", axis=0)
            workspace.RunNetOnce(net)

    @given(**hu.gcs)
    def test_sum_reduce(self, gc, dc):
        # Set broadcast and no axis, i.e. broadcasting last dimensions.
        X = np.random.rand(2, 3, 4, 5).astype(np.float32)
        Y = np.random.rand(4, 5).astype(np.float32)
        op = core.CreateOperator(
            "SumReduceLike", ["X", "Y"], "out", broadcast=1)
        workspace.FeedBlob("X", X)
        workspace.FeedBlob("Y", Y)
        workspace.RunOperatorOnce(op)
        out = workspace.FetchBlob("out")
        res = np.sum(X, axis=0)
        res = np.sum(res, axis=0)
        np.testing.assert_array_almost_equal(out, res)
        self.assertDeviceChecks(dc, op, [X, Y], [0])

        # Set broadcast and no axis, i.e. broadcasting last dimensions.
        X = np.random.rand(2, 3, 4, 5).astype(np.float32)
        Y = np.random.rand(2, 3).astype(np.float32)
        op = core.CreateOperator(
            "SumReduceLike", ["X", "Y"], "out", broadcast=1, axis=0)
        workspace.FeedBlob("X", X)
        workspace.FeedBlob("Y", Y)
        workspace.RunOperatorOnce(op)
        out = workspace.FetchBlob("out")
        res = np.sum(X, axis=3)
        res = np.sum(res, axis=2)
        np.testing.assert_array_almost_equal(out, res, decimal=3)
        self.assertDeviceChecks(dc, op, [X, Y], [0])

        # broadcasting intermediate dimensions
        X = np.random.rand(2, 3, 4, 5).astype(np.float32)
        Y = np.random.rand(3, 4).astype(np.float32)
        op = core.CreateOperator(
            "SumReduceLike", ["X", "Y"], "out", broadcast=1, axis=1)
        workspace.FeedBlob("X", X)
        workspace.FeedBlob("Y", Y)
        workspace.RunOperatorOnce(op)
        out = workspace.FetchBlob("out")
        res = np.sum(X, axis=0)
        res = np.sum(res, axis=2)
        np.testing.assert_array_almost_equal(out, res)
        self.assertDeviceChecks(dc, op, [X, Y], [0])

        # broadcasting intermediate dimensions
        X = np.random.rand(2, 3, 4, 500).astype(np.float64)
        Y = np.random.rand(1).astype(np.float64)
        op = core.CreateOperator(
            "SumReduceLike", ["X", "Y"], "out", broadcast=1)
        workspace.FeedBlob("X", X)
        workspace.FeedBlob("Y", Y)
        workspace.RunOperatorOnce(op)
        out = workspace.FetchBlob("out")
        res = np.array(np.sum(X))
        np.testing.assert_array_almost_equal(out, res, decimal=0)

        # broadcasting with single elem dimensions at both ends
        X = np.random.rand(2, 3, 4, 5).astype(np.float32)
        Y = np.random.rand(1, 3, 4, 1).astype(np.float32)
        op = core.CreateOperator(
            "SumReduceLike", ["X", "Y"], "out", broadcast=1)
        workspace.FeedBlob("X", X)
        workspace.FeedBlob("Y", Y)
        workspace.RunOperatorOnce(op)
        out = workspace.FetchBlob("out")
        res = np.sum(X, axis=0)
        res = np.sum(res, axis=2).reshape(Y.shape)
        np.testing.assert_array_almost_equal(out, res)
        self.assertDeviceChecks(dc, op, [X, Y], [0])

        # fp64 is not supported with the CUDA op
        dc_cpu_only = [d for d in dc if d.device_type != caffe2_pb2.CUDA]
        self.assertDeviceChecks(dc_cpu_only, op, [X, Y], [0])

    @unittest.skipIf(not workspace.has_gpu_support, "No gpu support")
    @given(**hu.gcs)
    def test_sum_reduce_fp16(self, gc, dc):
        assume(core.IsGPUDeviceType(gc.device_type))

        # Set broadcast and no axis, i.e. broadcasting last dimensions.
        X = np.random.rand(2, 3, 4, 5).astype(np.float16)
        Y = np.random.rand(4, 5).astype(np.float16)
        op = core.CreateOperator(
            "SumReduceLike", ["X", "Y"], "out", broadcast=1, device_option=gc)

        def ref_op(X, Y):
Loading ...