Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

/ python / operator_test / reduce_ops_test.py






from caffe2.python import core, workspace
from hypothesis import given, settings

import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
import hypothesis.strategies as st
import numpy as np
import itertools as it


class TestReduceOps(serial.SerializedTestCase):
    def run_reduce_op_test_impl(
            self, op_name, X, axes, keepdims, ref_func, gc, dc):
        if axes is None:
            op = core.CreateOperator(
                op_name,
                ["X"],
                ["Y"],
                keepdims=keepdims,
            )
        else:
            op = core.CreateOperator(
                op_name,
                ["X"],
                ["Y"],
                axes=axes,
                keepdims=keepdims,
            )

        def ref(X):
            return [ref_func(
                X, axis=None if axes is None else tuple(axes),
                keepdims=keepdims)]

        self.assertReferenceChecks(gc, op, [X], ref)
        self.assertDeviceChecks(dc, op, [X], [0])
        self.assertGradientChecks(gc, op, [X], 0, [0])

    def run_reduce_op_test(
            self, op_name, X, keepdims, num_axes, ref_func, gc, dc):
        self.run_reduce_op_test_impl(
            op_name, X, None, keepdims, ref_func, gc, dc)

        num_dims = len(X.shape)
        if num_dims < num_axes:
            self.run_reduce_op_test_impl(
                op_name, X, range(num_dims), keepdims, ref_func, gc, dc)
        else:
            for axes in it.combinations(range(num_dims), num_axes):
                self.run_reduce_op_test_impl(
                    op_name, X, axes, keepdims, ref_func, gc, dc)

    @serial.given(
        X=hu.tensor(max_dim=3, dtype=np.float32), keepdims=st.booleans(),
        num_axes=st.integers(1, 3), **hu.gcs)
    def test_reduce_min(self, X, keepdims, num_axes, gc, dc):
        X_dims = X.shape
        X_size = X.size
        X = np.arange(X_size, dtype=np.float32)
        np.random.shuffle(X)
        X = X.reshape(X_dims)
        self.run_reduce_op_test(
            "ReduceMin", X, keepdims, num_axes, np.min, gc, dc)

    @serial.given(
        X=hu.tensor(max_dim=3, dtype=np.float32), keepdims=st.booleans(),
        num_axes=st.integers(1, 3), **hu.gcs)
    def test_reduce_max(self, X, keepdims, num_axes, gc, dc):
        X_dims = X.shape
        X_size = X.size
        X = np.arange(X_size, dtype=np.float32)
        np.random.shuffle(X)
        X = X.reshape(X_dims)
        self.run_reduce_op_test(
            "ReduceMax", X, keepdims, num_axes, np.max, gc, dc)

    @given(n=st.integers(0, 5), m=st.integers(0, 5), k=st.integers(0, 5),
           t=st.integers(0, 5), keepdims=st.booleans(),
           num_axes=st.integers(1, 3), **hu.gcs)
    @settings(deadline=10000)
    def test_reduce_sum(self, n, m, k, t, keepdims, num_axes, gc, dc):
        X = np.random.randn(n, m, k, t).astype(np.float32)
        self.run_reduce_op_test(
            "ReduceSum", X, keepdims, num_axes, np.sum, gc, dc)

    @serial.given(X=hu.tensor(dtype=np.float32), keepdims=st.booleans(),
           num_axes=st.integers(1, 4), **hu.gcs)
    def test_reduce_mean(self, X, keepdims, num_axes, gc, dc):
        self.run_reduce_op_test(
            "ReduceMean", X, keepdims, num_axes, np.mean, gc, dc)

    @given(n=st.integers(1, 3), m=st.integers(1, 3), k=st.integers(1, 3),
           keepdims=st.booleans(), num_axes=st.integers(1, 3), **hu.gcs_cpu_only)
    @settings(deadline=1000)
    def test_reduce_l1(self, n, m, k, keepdims, num_axes, gc, dc):
        X = np.arange(n * m * k, dtype=np.float32) - 0.5
        np.random.shuffle(X)
        X = X.reshape((m, n, k))
        self.run_reduce_op_test(
            "ReduceL1", X, keepdims, num_axes, getNorm(1), gc, dc)

    @serial.given(n=st.integers(1, 5), m=st.integers(1, 5), k=st.integers(1, 5),
           keepdims=st.booleans(), num_axes=st.integers(1, 3), **hu.gcs_cpu_only)
    def test_reduce_l2(self, n, m, k, keepdims, num_axes, gc, dc):
        X = np.random.randn(n, m, k).astype(np.float32)
        self.run_reduce_op_test(
            "ReduceL2", X, keepdims, num_axes, getNorm(2), gc, dc)


def getNorm(p):
    if p == 1:
        def norm(X, axis, keepdims):
            return np.sum(np.abs(X), axis=axis, keepdims=keepdims)
    elif p == 2:
        def norm(X, axis, keepdims):
            return np.sqrt(np.sum(np.power(X, 2), axis=axis, keepdims=keepdims))
    else:
        raise RuntimeError("Only L1 and L2 norms supported")
    return norm


class TestReduceFrontReductions(serial.SerializedTestCase):
    def grad_variant_input_test(self, grad_op_name, X, ref, num_reduce_dim):
        workspace.ResetWorkspace()

        Y = np.array(ref(X)[0]).astype(np.float32)
        dY = np.array(np.random.rand(*Y.shape)).astype(np.float32)
        shape = np.array(X.shape).astype(np.int64)

        workspace.FeedBlob("X", X)
        workspace.FeedBlob("dY", dY)
        workspace.FeedBlob("shape", shape)

        grad_op = core.CreateOperator(
            grad_op_name, ["dY", "X"], ["dX"], num_reduce_dim=num_reduce_dim)

        grad_op1 = core.CreateOperator(
            grad_op_name, ["dY", "shape"], ["dX1"],
            num_reduce_dim=num_reduce_dim)

        workspace.RunOperatorOnce(grad_op)
        workspace.RunOperatorOnce(grad_op1)

        dX = workspace.FetchBlob("dX")
        dX1 = workspace.FetchBlob("dX1")
        np.testing.assert_array_equal(dX, dX1)

    def max_op_test(
            self, op_name, num_reduce_dim, gc, dc, in_data, in_names, ref_max):

        op = core.CreateOperator(
            op_name,
            in_names,
            ["outputs"],
            num_reduce_dim=num_reduce_dim
        )

        self.assertReferenceChecks(
            device_option=gc,
            op=op,
            inputs=in_data,
            reference=ref_max,
        )

        # Skip gradient check because it is too unreliable with max.
        # Just check CPU and CUDA have same results
        Y = np.array(ref_max(*in_data)[0]).astype(np.float32)
        dY = np.array(np.random.rand(*Y.shape)).astype(np.float32)
        if len(in_data) == 2:
            grad_in_names = ["dY", in_names[0], "Y", in_names[1]]
            grad_in_data = [dY, in_data[0], Y, in_data[1]]
        else:
            grad_in_names = ["dY", in_names[0], "Y"]
            grad_in_data = [dY, in_data[0], Y]

        grad_op = core.CreateOperator(
            op_name + "Gradient",
            grad_in_names,
            ["dX"],
            num_reduce_dim=num_reduce_dim
        )
        self.assertDeviceChecks(dc, grad_op, grad_in_data, [0])

    def reduce_op_test(self, op_name, op_ref, in_data, in_names,
                       num_reduce_dims, device):
        op = core.CreateOperator(
            op_name,
            in_names,
            ["outputs"],
            num_reduce_dim=num_reduce_dims
        )

        self.assertReferenceChecks(
            device_option=device,
            op=op,
            inputs=in_data,
            reference=op_ref
        )

        self.assertGradientChecks(
            device, op, in_data, 0, [0], stepsize=1e-2, threshold=1e-2)

    @given(num_reduce_dim=st.integers(0, 4), **hu.gcs)
    @settings(deadline=10000)
    def test_reduce_front_sum(self, num_reduce_dim, gc, dc):
        X = np.random.rand(7, 4, 3, 5).astype(np.float32)

        def ref_sum(X):
            return [np.sum(X, axis=(tuple(range(num_reduce_dim))))]

        self.reduce_op_test(
            "ReduceFrontSum", ref_sum, [X], ["input"], num_reduce_dim, gc)
        self.grad_variant_input_test(
            "ReduceFrontSumGradient", X, ref_sum, num_reduce_dim)

    @given(num_reduce_dim=st.integers(0, 4), seed=st.integers(0, 4), **hu.gcs)
    def test_reduce_front_sum_empty_batch(self, num_reduce_dim, seed, gc, dc):
        np.random.seed(seed)
        X = np.random.rand(0, 4, 3, 5).astype(np.float32)

        def ref_sum(X):
            return [np.sum(X, axis=(tuple(range(num_reduce_dim))))]

        self.reduce_op_test(
            "ReduceFrontSum", ref_sum, [X], ["input"], num_reduce_dim, gc)
        self.grad_variant_input_test(
            "ReduceFrontSumGradient", X, ref_sum, num_reduce_dim)

        # test the second iteration
        not_empty_X = np.random.rand(2, 4, 3, 5).astype(np.float32)
        net = core.Net('test')
        with core.DeviceScope(gc):
            net.ReduceFrontSum(
                ['X'], ['output'],
                num_reduce_dim=num_reduce_dim
            )
            workspace.CreateNet(net)

            workspace.FeedBlob('X', not_empty_X)
            workspace.RunNet(workspace.GetNetName(net))
            output = workspace.FetchBlob('output')
            np.testing.assert_allclose(
                output, ref_sum(not_empty_X)[0], atol=1e-3)

            workspace.FeedBlob('X', X)
            workspace.RunNet(workspace.GetNetName(net))
            output = workspace.FetchBlob('output')
            np.testing.assert_allclose(output, ref_sum(X)[0], atol=1e-3)

    @given(**hu.gcs)
    @settings(deadline=1000)
    def test_reduce_front_sum_with_length(self, dc, gc):
        num_reduce_dim = 1
        X = np.random.rand(2, 3, 4, 5).astype(np.float32)
        batch_size = int(np.prod([2, 3, 4, 5][num_reduce_dim:]))
        d = 120 // batch_size
        lengths = np.random.randint(1, d, size=batch_size).astype(np.int32)

        def ref_sum(X, lengths):
            Y = X.reshape(d, lengths.size)
            rv = np.zeros((lengths.size, 1)).astype(np.float32)
            for ii in range(lengths.size):
                rv[ii] = np.sum(Y[:lengths[ii], ii])
            return [rv.reshape((2, 3, 4, 5)[num_reduce_dim:])]

        self.reduce_op_test(
            "ReduceFrontSum", ref_sum, [X, lengths], ["input", "lengths"],
            num_reduce_dim, gc)

    @given(num_reduce_dim=st.integers(0, 4), **hu.gcs)
    @settings(deadline=10000)
    def test_reduce_front_mean(self, num_reduce_dim, gc, dc):
        X = np.random.rand(6, 7, 8, 2).astype(np.float32)

        def ref_mean(X):
            return [np.mean(X, axis=(tuple(range(num_reduce_dim))))]

        self.reduce_op_test(
            "ReduceFrontMean", ref_mean, [X], ["input"], num_reduce_dim, gc)
        self.grad_variant_input_test(
            "ReduceFrontMeanGradient", X, ref_mean, num_reduce_dim)

    @given(**hu.gcs)
    @settings(deadline=1000)
    def test_reduce_front_mean_with_length(self, dc, gc):
        num_reduce_dim = 1
        X = np.random.rand(2, 3, 4, 5).astype(np.float32)
        batch_size = int(np.prod([2, 3, 4, 5][num_reduce_dim:]))
        d = 120 // batch_size
        lengths = np.random.randint(1, d, size=batch_size).astype(np.int32)

        def ref_mean(X, lengths):
            Y = X.reshape(d, lengths.size)
            rv = np.zeros((lengths.size, 1)).astype(np.float32)
            for ii in range(lengths.size):
                rv[ii] = np.mean(Y[:lengths[ii], ii])
            return [rv.reshape((2, 3, 4, 5)[num_reduce_dim:])]

        self.reduce_op_test(
            "ReduceFrontMean", ref_mean, [X, lengths], ["input", "lengths"],
            num_reduce_dim, gc)

    @serial.given(num_reduce_dim=st.integers(0, 4), **hu.gcs)
    def test_reduce_front_max(self, num_reduce_dim, gc, dc):
        X = np.random.rand(6, 7, 8, 2).astype(np.float32)

        def ref_frontmax(X):
            return [np.max(X, axis=(tuple(range(num_reduce_dim))))]

        self.max_op_test(
            "ReduceFrontMax", num_reduce_dim, gc, dc, [X], ["X"], ref_frontmax)

    @given(**hu.gcs)
    def test_reduce_front_max_with_length(self, dc, gc):
        num_reduce_dim = 1
        X = np.random.rand(2, 3, 4, 5).astype(np.float32)
        batch_size = int(np.prod([2, 3, 4, 5][num_reduce_dim:]))
        d = 120 // batch_size
        lengths = np.random.randint(1, d, size=batch_size).astype(np.int32)

        def ref_max(X, lengths):
            Y = X.reshape(d, lengths.size)
            rv = np.zeros((lengths.size, 1)).astype(np.float32)
            for ii in range(lengths.size):
                rv[ii] = np.max(Y[:lengths[ii], ii])
            return [rv.reshape((2, 3, 4, 5)[num_reduce_dim:])]

        self.max_op_test(
            "ReduceFrontMax", num_reduce_dim, gc, dc, [X, lengths],
            ["X", "lengths"], ref_max)

    @serial.given(num_reduce_dim=st.integers(0, 4), **hu.gcs)
    def test_reduce_back_max(self, num_reduce_dim, gc, dc):
        X = np.random.rand(6, 7, 8, 2).astype(np.float32)

        def ref_backmax(X):
            return [np.max(X, axis=(0, 1, 2, 3)[4 - num_reduce_dim:])]

        self.max_op_test(
            "ReduceBackMax", num_reduce_dim, gc, dc, [X], ["X"], ref_backmax)
Loading ...