import unittest
import hypothesis.strategies as st
from hypothesis import assume, given, settings
import numpy as np
from caffe2.proto import caffe2_pb2
from caffe2.python import core, workspace
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.ideep_test_util as mu
@unittest.skipIf(not workspace.C.use_mkldnn, "No MKLDNN support.")
class PoolTest(hu.HypothesisTestCase):
@given(stride=st.integers(1, 3),
pad=st.integers(0, 3),
kernel=st.integers(3, 5),
size=st.integers(7, 9),
input_channels=st.integers(1, 3),
batch_size=st.integers(1, 3),
method=st.sampled_from(["MaxPool", "AveragePool"]),
**mu.gcs)
@settings(deadline=10000)
def test_pooling(self, stride, pad, kernel, size,
input_channels, batch_size,
method, gc, dc):
assume(pad < kernel)
op = core.CreateOperator(
method,
["X"],
["Y"],
stride=stride,
pad=pad,
kernel=kernel,
device_option=dc[0],
)
X = np.random.rand(
batch_size, input_channels, size, size
).astype(np.float32)
self.assertDeviceChecks(dc, op, [X], [0])
if 'MaxPool' not in method:
self.assertGradientChecks(gc, op, [X], 0, [0])
@given(stride=st.integers(1, 3),
pad=st.integers(0, 3),
kernel=st.integers(3, 5),
size=st.integers(7, 9),
input_channels=st.integers(1, 3),
batch_size=st.integers(1, 3),
method=st.sampled_from(["MaxPool", "AveragePool"]),
**mu.gcs_cpu_ideep)
def test_int8_pooling(self, stride, pad, kernel, size,
input_channels, batch_size,
method, gc, dc):
assume(pad < kernel)
pool_fp32 = core.CreateOperator(
method,
["X"],
["Y"],
stride=stride,
pad=pad,
kernel=kernel,
device_option=dc[0]
)
X = np.random.rand(
batch_size, input_channels, size, size).astype(np.float32)
if X.min() >=0:
scale = np.absolute(X).max() / 0xFF
zero_point = 0
else:
scale = np.absolute(X).max() / 0x7F
zero_point = 128
old_ws_name = workspace.CurrentWorkspace()
workspace.SwitchWorkspace("_device_check_", True)
workspace.FeedBlob("X", X, dc[0])
workspace.RunOperatorOnce(pool_fp32)
Y = workspace.FetchBlob("Y")
workspace.ResetWorkspace()
sw2nhwc = core.CreateOperator(
"NCHW2NHWC",
["Xi"],
["Xi_nhwc"],
device_option=dc[1]
)
quantize = core.CreateOperator(
"Int8Quantize",
["Xi_nhwc"],
["Xi_quantized"],
engine="DNNLOWP",
device_option=dc[1],
Y_zero_point=zero_point,
Y_scale=scale,
)
pool = core.CreateOperator(
"Int8{}".format(method),
["Xi_quantized"],
["Y_quantized"],
stride=stride,
pad=pad,
kernel=kernel,
engine="DNNLOWP",
device_option=dc[1],
)
dequantize = core.CreateOperator(
"Int8Dequantize",
["Y_quantized"],
["Y_nhwc"],
engine="DNNLOWP",
device_option=dc[1],
)
sw2nchw = core.CreateOperator(
"NHWC2NCHW",
["Y_nhwc"],
["Y_out"],
device_option=dc[1]
)
net = caffe2_pb2.NetDef()
net.op.extend([sw2nhwc, quantize, pool, dequantize, sw2nchw])
workspace.FeedBlob("Xi", X, dc[1])
workspace.RunNetOnce(net)
Y_out = workspace.FetchBlob("Y_out")
MSE = np.square(np.subtract(Y, Y_out)).mean()
if MSE > 0.005:
print(Y.flatten())
print(Y_out.flatten())
print(np.max(np.abs(Y_out - Y)))
print("MSE", MSE)
self.assertTrue(False)
workspace.SwitchWorkspace(old_ws_name)
if __name__ == "__main__":
unittest.main()