Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

Version: 1.8.0 

/ python / ideep / fc_op_test.py






import unittest
from functools import reduce
import hypothesis.strategies as st
from hypothesis import given, settings
import numpy as np
from caffe2.proto import caffe2_pb2
from caffe2.python import core, workspace
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.ideep_test_util as mu


@unittest.skipIf(not workspace.C.use_mkldnn, "No MKLDNN support.")
class FcTest(hu.HypothesisTestCase):
    @given(n=st.integers(1, 5), m=st.integers(1, 5),
           k=st.integers(1, 5), **mu.gcs)
    @settings(deadline=1000)
    def test_fc_2_dims(self, n, m, k, gc, dc):
        X = np.random.rand(m, k).astype(np.float32) - 0.5
        W = np.random.rand(n, k).astype(np.float32) - 0.5
        b = np.random.rand(n).astype(np.float32) - 0.5

        op = core.CreateOperator(
            'FC',
            ['X', 'W', 'b'],
            ["Y"]
        )

        self.assertDeviceChecks(dc, op, [X, W, b], [0])

        for i in range(3):
            self.assertGradientChecks(gc, op, [X, W, b], i, [0])

    @given(n=st.integers(1, 5),
           m=st.integers(1, 5),
           c=st.integers(1, 5),
           h=st.integers(1, 5),
           w=st.integers(1, 5),
           axis=st.integers(1, 3),
           **mu.gcs)
    def test_fc_with_axis(self, n, m, c, h, w, axis, gc, dc):
        X = np.random.rand(n, c, h, w).astype(np.float32) - 0.5
        k = reduce((lambda x, y: x * y), [n, c, h, w][axis - 4:])
        nn = reduce((lambda x, y: x * y), [n, c, h, w][:axis])
        W = np.random.rand(m, k).astype(np.float32) - 0.5
        b = np.random.rand(m).astype(np.float32) - 0.5
        dY = np.random.rand(nn, m).astype(np.float32) - 0.5

        op0 = core.CreateOperator(
            'FC',
            ['X', 'W', 'b'],
            ["Y"],
            axis=axis,
            device_option=dc[0]
        )

        op0_bw = core.CreateOperator(
            'FCGradient',
            ['X', 'W', 'dY'],
            ["dW", "db"],
            axis=axis,
            device_option=dc[0]
        )

        workspace.ResetWorkspace()
        workspace.FeedBlob('X', X, dc[0])
        workspace.FeedBlob('W', W, dc[0])
        workspace.FeedBlob('b', b, dc[0])
        workspace.RunOperatorOnce(op0)
        Y0 = workspace.FetchBlob('Y')

        workspace.FeedBlob('dY', dY, dc[0])
        workspace.RunOperatorOnce(op0_bw)
        dW0 = workspace.FetchBlob('dW')
        db0 = workspace.FetchBlob('db')

        op1 = core.CreateOperator(
            'FC',
            ['X', 'W', 'b'],
            ["Y"],
            axis=axis,
            device_option=dc[1]
        )

        op1_bw = core.CreateOperator(
            'FCGradient',
            ['X', 'W', 'dY'],
            ["dW", "db"],
            axis=axis,
            device_option=dc[1]
        )

        workspace.SwitchWorkspace("_device_check_", True)
        workspace.FeedBlob('X', X, dc[1])
        workspace.FeedBlob('W', W, dc[1])
        workspace.FeedBlob('b', b, dc[1])
        workspace.RunOperatorOnce(op1)
        Y1 = workspace.FetchBlob('Y')

        workspace.FeedBlob('dY', dY, dc[1])
        workspace.RunOperatorOnce(op1_bw)
        dW1 = workspace.FetchBlob('dW')
        db1 = workspace.FetchBlob('db')

        Y0 = Y0.flatten()
        Y1 = Y1.flatten()
        if not np.allclose(Y0, Y1, atol=0.01, rtol=0.01):
            print(Y1)
            print(Y0)
            print(np.max(np.abs(Y1 - Y0)))
            self.assertTrue(False)

        dW0 = dW0.flatten()
        dW1 = dW1.flatten()
        if not np.allclose(dW0, dW1, atol=0.01, rtol=0.01):
            print(dW1)
            print(dW0)
            print(np.max(np.abs(dW1 - dW0)))
            self.assertTrue(False)

        db0 = db0.flatten()
        db1 = db1.flatten()
        if not np.allclose(db0, db1, atol=0.01, rtol=0.01):
            print(db1)
            print(db0)
            print(np.max(np.abs(db1 - db0)))
            self.assertTrue(False)

    @given(n=st.integers(1, 5),
           o=st.integers(1, 5),
           i=st.integers(1, 5),
           h=st.integers(1, 5),
           w=st.integers(1, 5),
           axis_w=st.integers(1, 3),
           **mu.gcs)
    @settings(deadline=1000)
    def test_fc_with_axis_w(self, n, o, i, h, w, axis_w, gc, dc):
        W = np.random.rand(o, i, h, w).astype(np.float32) - 0.5
        k = reduce((lambda x, y: x * y), [o, i, h, w][axis_w - 4:])
        m = reduce((lambda x, y: x * y), [o, i, h, w][:axis_w])
        X = np.random.rand(n, k).astype(np.float32) - 0.5
        b = np.random.rand(m).astype(np.float32) - 0.5
        dY = np.random.rand(n, m).astype(np.float32) - 0.5

        op0 = core.CreateOperator(
            'FC',
            ['X', 'W', 'b'],
            ["Y"],
            axis_w=axis_w,
            device_option=dc[0]
        )

        op0_bw = core.CreateOperator(
            'FCGradient',
            ['X', 'W', 'dY'],
            ["dW", "db"],
            axis_w=axis_w,
            device_option=dc[0]
        )

        workspace.ResetWorkspace()
        workspace.FeedBlob('X', X, dc[0])
        workspace.FeedBlob('W', W, dc[0])
        workspace.FeedBlob('b', b, dc[0])
        workspace.RunOperatorOnce(op0)
        Y0 = workspace.FetchBlob('Y')

        workspace.FeedBlob('dY', dY, dc[0])
        workspace.RunOperatorOnce(op0_bw)
        dW0 = workspace.FetchBlob('dW')
        db0 = workspace.FetchBlob('db')

        op1 = core.CreateOperator(
            'FC',
            ['X', 'W', 'b'],
            ["Y"],
            axis_w=axis_w,
            device_option=dc[1]
        )

        op1_bw = core.CreateOperator(
            'FCGradient',
            ['X', 'W', 'dY'],
            ["dW", "db"],
            axis_w=axis_w,
            device_option=dc[1]
        )

        workspace.SwitchWorkspace("_device_check_", True)
        workspace.FeedBlob('X', X, dc[1])
        workspace.FeedBlob('W', W, dc[1])
        workspace.FeedBlob('b', b, dc[1])
        workspace.RunOperatorOnce(op1)
        Y1 = workspace.FetchBlob('Y')

        workspace.FeedBlob('dY', dY, dc[1])
        workspace.RunOperatorOnce(op1_bw)
        dW1 = workspace.FetchBlob('dW')
        db1 = workspace.FetchBlob('db')

        Y0 = Y0.flatten()
        Y1 = Y1.flatten()
        if not np.allclose(Y0, Y1, atol=0.01, rtol=0.01):
            print(Y1)
            print(Y0)
            print(np.max(np.abs(Y1 - Y0)))
            self.assertTrue(False)

        dW0 = dW0.flatten()
        dW1 = dW1.flatten()
        if not np.allclose(dW0, dW1, atol=0.01, rtol=0.01):
            print(dW1)
            print(dW0)
            print(np.max(np.abs(dW1 - dW0)))
            self.assertTrue(False)

        db0 = db0.flatten()
        db1 = db1.flatten()
        if not np.allclose(db0, db1, atol=0.01, rtol=0.01):
            print(db1)
            print(db0)
            print(np.max(np.abs(db1 - db0)))
            self.assertTrue(False)

    @given(n=st.integers(1, 5), m=st.integers(1, 5),
           k=st.integers(1, 5), **mu.gcs)
    @settings(deadline=10000)
    def test_fc_4_dims_src(self, n, m, k, gc, dc):
        X = np.random.rand(m, k, m, m).astype(np.float32) - 0.5
        W = np.random.rand(n, k * m * m).astype(np.float32) - 0.5
        b = np.random.rand(n).astype(np.float32) - 0.5

        op = core.CreateOperator(
            'FC',
            ['X', 'W', 'b'],
            ["Y"]
        )

        self.assertDeviceChecks(dc, op, [X, W, b], [0])

        for i in range(3):
            self.assertGradientChecks(gc, op, [X, W, b], i, [0])

    @given(n=st.integers(1, 5), m=st.integers(1, 5),
           k=st.integers(1, 5), **mu.gcs)
    @settings(deadline=10000)
    def test_fc_4_dims(self, n, m, k, gc, dc):
        X = np.random.rand(m, k, m, m).astype(np.float32) - 0.5
        W = np.random.rand(n, k, m, m).astype(np.float32) - 0.5
        b = np.random.rand(n).astype(np.float32) - 0.5

        op = core.CreateOperator(
            'FC',
            ['X', 'W', 'b'],
            ["Y"]
        )

        self.assertDeviceChecks(dc, op, [X, W, b], [0])

        for i in range(3):
            self.assertGradientChecks(gc, op, [X, W, b], i, [0])

    @given(n=st.integers(2, 5), m=st.integers(2, 5),
           k=st.integers(2, 5), **mu.gcs_cpu_ideep)
    def test_int8_fc_4_dims(self, n, m, k, gc, dc):
        X = np.random.rand(m, k, m, m).astype(np.float32) - 0.5
        w = np.random.rand(n, k, m, m).astype(np.float32) - 0.5
        b = np.random.rand(n).astype(np.float32) - 0.5

        fc_fp32 = core.CreateOperator(
            'FC',
            ['X', 'w', 'b'],
            ["Y"]
        )

        old_ws_name = workspace.CurrentWorkspace()
        workspace.SwitchWorkspace("_device_check_", True)

        workspace.FeedBlob('X', X, dc[0])
        workspace.FeedBlob('w', w, dc[0])
        workspace.FeedBlob('b', b, dc[0])
        workspace.RunOperatorOnce(fc_fp32)
        Y = workspace.FetchBlob('Y')

        workspace.ResetWorkspace()

        Y_absmax = np.array([np.absolute(Y).max()]).astype(np.float32)
        if Y.min() >= 0:
            Y_scale = Y_absmax / 0xFF
            Y_zero_point = 0
        else:
            Y_scale = Y_absmax / 0x7F
            Y_zero_point = 128

        X_absmax = np.array([np.absolute(X).max()]).astype(np.float32)
        if X.min() >= 0:
            X_scale = X_absmax / 0xFF
            X_zero_point = 0
        else:
            X_scale = X_absmax / 0x7F
            X_zero_point = 128

        w_absmax = np.array([np.absolute(w[i, ...]).max() for i in range(w.shape[0])]).astype(np.float32)
        w_scale = w_absmax / 0x7F
        w_zero_point = 128
        w = np.transpose(w, (0, 2, 3, 1)).astype(np.float32)
        w_bytes = np.rint([w[i, ...] / w_scale[i] for i in range(w.shape[0])]).astype(np.int8) + w_zero_point

        w_filler = core.CreateOperator(
            "Int8GivenTensorFill",
            [], ["wi"],
            shape=w.shape,
            values=w_bytes.astype(np.uint8).tobytes(),
            Y_zero_point=w_zero_point,
            Y_scales=w_scale,
            device_option=dc[1],
        )

        b_scale = w_scale * X_scale
        b_zero_point = 0
        b_bytes = np.rint([b[i] / b_scale[i] for i in range(b.shape[0])]).astype(np.int32)
        b_filler = core.CreateOperator(
            "Int8GivenIntTensorFill",
            [], ["bi"],
            shape=b.shape,
            values=b_bytes,
            Y_zero_point=b_zero_point,
            Y_scales=b_scale,
            device_option=dc[1],
        )

        sw2nhwc = core.CreateOperator(
            "NCHW2NHWC",
            ["Xi"],
            ["Xi_nhwc"],
            device_option=dc[1]
        )

        quantize_X = core.CreateOperator(
            "Int8Quantize",
            ["Xi_nhwc"],
Loading ...