Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

Version: 1.8.0 

/ python / ideep / conv_op_test.py






import unittest
import hypothesis.strategies as st
from hypothesis import given, settings
import numpy as np
from caffe2.proto import caffe2_pb2
from caffe2.python import core, workspace
from caffe2.python.transformations import optimizeForMKLDNN
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.ideep_test_util as mu


@unittest.skipIf(not workspace.C.use_mkldnn, "No MKLDNN support.")
class ConvTest(hu.HypothesisTestCase):
    @given(stride=st.integers(1, 3),
           pad=st.integers(0, 3),
           kernel=st.integers(3, 5),
           size=st.integers(8, 10),
           input_channels=st.integers(1, 3),
           output_channels=st.integers(1, 5),
           batch_size=st.integers(1, 3),
           use_bias=st.booleans(),
           training_mode=st.booleans(),
           group=st.integers(1, 2),
           **mu.gcs)
    @settings(max_examples=10, deadline=None)
    def test_convolution(self, stride, pad, kernel, size,
                         input_channels, output_channels,
                         batch_size, use_bias, training_mode, group, gc, dc):
        training = 1 if training_mode else 0
        op = core.CreateOperator(
            "Conv",
            ["X", "w", "b"] if use_bias else ["X", "w"],
            ["Y"],
            stride=stride,
            pad=pad,
            kernel=kernel,
            group=group,
            training_mode=training,
        )
        X = np.random.rand(
            batch_size, input_channels * group, size, size).astype(np.float32) - 0.5
        w = np.random.rand(output_channels * group, input_channels, kernel, kernel) \
            .astype(np.float32) - 0.5
        b = np.random.rand(output_channels * group).astype(np.float32) - 0.5

        inputs = [X, w, b] if use_bias else [X, w]
        self.assertDeviceChecks(dc, op, inputs, [0])

        if training_mode:
            for i in range(len(inputs)):
                self.assertGradientChecks(gc, op, inputs, i, [0], threshold=0.01)

    @settings(max_examples=10, deadline=None)
    @given(stride=st.integers(1, 3),
           pad=st.integers(0, 3),
           size=st.integers(8, 10),
           input_channels=st.integers(16, 32),
           output_channels=st.integers(16, 32),
           batch_size=st.integers(1, 3),
           use_bias=st.booleans(),
           training_mode=st.booleans(),
           **mu.gcs)
    def test_winograd_convolution(self, stride, pad, size,
                             input_channels, output_channels,
                             batch_size, use_bias, training_mode, gc, dc):
        training = 1 if training_mode else 0
        conv3x3_winograd_algorithm = 1
        kernel = 3
        op = core.CreateOperator(
            "Conv",
            ["X", "w", "b"] if use_bias else ["X", "w"],
            ["Y"],
            stride=stride,
            pad=pad,
            kernel=kernel,
            training_mode=training,
            algorithm=conv3x3_winograd_algorithm
        )
        X = np.random.rand(
            batch_size, input_channels, size, size).astype(np.float32) - 0.5
        w = np.random.rand(
                output_channels, input_channels, kernel, kernel) \
            .astype(np.float32) - 0.5
        b = np.random.rand(output_channels).astype(np.float32) - 0.5

        inputs = [X, w, b] if use_bias else [X, w]
        self.assertDeviceChecks(dc, op, inputs, [0])

        if training_mode:
            for i in range(len(inputs)):
                self.assertGradientChecks(gc, op, inputs, i, [0], threshold=0.01)

    @given(batch_size=st.integers(1, 3), **mu.gcs)
    def test_depthwise_convolution(self, batch_size, gc, dc):
        op = core.CreateOperator(
            "Conv",
            ["X", "w", "b"],
            ["Y"],
            stride=1,
            pad=0,
            kernel=1,
            group=4,
            device_option=dc[0]
        )
        op1 = core.CreateOperator(
            "Conv",
            ["X", "w", "b"],
            ["Y"],
            stride=1,
            pad=0,
            kernel=1,
            group=4,
            device_option=dc[1]
        )
        X = np.random.rand(batch_size, 544, 14, 14).astype(np.float32)
        w = np.random.rand(544, 136, 1, 1).astype(np.float32)
        b = np.random.rand(544).astype(np.float32)

        workspace.SwitchWorkspace("_device_check_", True)
        workspace.FeedBlob('X', X, dc[0])
        workspace.FeedBlob('w', w, dc[0])
        workspace.FeedBlob('b', b, dc[0])
        workspace.RunOperatorOnce(op)
        Y0 = workspace.FetchBlob('Y')

        workspace.ResetWorkspace()
        workspace.FeedBlob('X', X, dc[1])
        workspace.FeedBlob('w', w, dc[1])
        workspace.FeedBlob('b', b, dc[1])
        net = core.Net("net")
        old_net = caffe2_pb2.NetDef()
        old_net.op.extend([op1])
        net.Proto().CopyFrom(old_net)
        optimizeForMKLDNN(net)
        workspace.RunOperatorOnce(net.Proto().op[0])
        Y1 = workspace.FetchBlob('Y')

        if not np.allclose(Y0, Y1, atol=0.01, rtol=0.01):
            print(Y1.flatten())
            print(Y0.flatten())
            print(np.max(np.abs(Y1 - Y0)))
            self.assertTrue(False)

        workspace.ResetWorkspace()
        workspace.FeedBlob('X', X, dc[1])
        workspace.FeedBlob('w', w, dc[1])
        workspace.FeedBlob('b', b, dc[1])
        workspace.RunOperatorOnce(op1)
        Y2 = workspace.FetchBlob('Y')

        if not np.allclose(Y0, Y2, atol=0.01, rtol=0.01):
            print(Y2.flatten())
            print(Y0.flatten())
            print(np.max(np.abs(Y2 - Y0)))
            self.assertTrue(False)



if __name__ == "__main__":
    unittest.main()