from caffe2.python.test_util import TestCase
from caffe2.proto import caffe2_pb2
import unittest
import numpy as np
from caffe2.python import core, workspace
@unittest.skipIf(not workspace.C.use_mkldnn, "No MKLDNN support.")
class TestReShapeOps(TestCase):
def test_reshape_ops(self):
device_opt = core.DeviceOption(caffe2_pb2.IDEEP, 0)
with core.DeviceScope(device_opt):
workspace.FeedBlob('res', np.array([[0, 0, 0, 0]], dtype=np.float32))
workspace.FeedBlob('shape', np.array([1, 4], dtype=np.int32), core.DeviceOption(caffe2_pb2.CPU, 0))
workspace.FeedBlob('input', np.zeros((2, 2), dtype=np.float32))
workspace.RunOperatorOnce(core.CreateOperator(
'Reshape', ['input', 'shape'], ['output', 'old_shape']))
assert ((workspace.FetchBlob('output') ==
workspace.FetchBlob('res')).all())
def test_basic_reshape(self):
_test_reshape(old_shape=(4, 2, 1), new_shape=(2, 4))
_test_reshape(old_shape=(4, 2, 1), new_shape=(2, 4), arg_shape=False)
def test_int64_reshape_input(self):
_test_reshape(old_shape=(4, 2, 1), new_shape=(2, 4), arg_shape=False, shape_dtype=np.int64)
def test_missing_dim(self):
_test_reshape(old_shape=(4, 2, 1), new_shape=(-1, 8))
_test_reshape(old_shape=(4, 2, 1), new_shape=(-1, 8), arg_shape=False)
def test_in_place(self):
_test_reshape(old_shape=(4, 2, 1), new_shape=(-1, 8), in_place=True)
_test_reshape(old_shape=(4, 2, 1), new_shape=(-1, 8),
in_place=True, arg_shape=False)
def test_zero_dim(self):
_test_reshape(old_shape=(4, 2, 1), new_shape=(0, 0, 0),
expected_shape=(4, 2, 1))
_test_reshape(old_shape=(4, 2, 1), new_shape=(0, 0, 0),
expected_shape=(4, 2, 1), arg_shape=False)
_test_reshape(old_shape=(4, 2, 1), new_shape=(0, 2, 1),
expected_shape=(4, 2, 1))
_test_reshape(old_shape=(4, 2, 1), new_shape=(0, 2, 1),
expected_shape=(4, 2, 1), arg_shape=False)
def test_zero_dim_and_missing_dim(self):
_test_reshape(old_shape=(4, 2, 1), new_shape=(0, -1, 0),
expected_shape=(4, 2, 1))
_test_reshape(old_shape=(4, 2, 1), new_shape=(0, -1, 0),
expected_shape=(4, 2, 1), arg_shape=False)
_test_reshape(old_shape=(4, 3, 2), new_shape=(-1, 0),
expected_shape=(8, 3))
_test_reshape(old_shape=(4, 3, 2), new_shape=(-1, 0),
expected_shape=(8, 3), arg_shape=False)
def test_backprop(self):
device_opt = core.DeviceOption(caffe2_pb2.IDEEP, 0)
with core.DeviceScope(device_opt):
old_shape = (4, 2, 1)
new_shape = (1, 8)
X = np.random.rand(*old_shape).astype(np.float32)
Y = np.random.rand(*new_shape).astype(np.float32)
net = core.Net('net')
net.GivenTensorFill([], 'X', shape=old_shape, values=X.flatten())
net.GivenTensorFill([], 'Y', shape=new_shape, values=Y.flatten())
net.Reshape(['X'], ['X_out', 'old_shape'], shape=new_shape)
net.Mul(['X_out', 'Y'], 'Z')
net.AddGradientOperators(['Z'])
workspace.RunNetOnce(net)
Z = workspace.FetchBlob('Z')
X_grad = workspace.FetchBlob('X_grad')
# Check forward computation
np.testing.assert_allclose(
Z.squeeze(), (X.reshape(new_shape) * Y).squeeze(), rtol=1e-5)
# Check the shape of the gradient
np.testing.assert_array_equal(X_grad.shape, X.shape)
# Check the gradient
np.testing.assert_allclose(X_grad, Y.reshape(old_shape), rtol=1e-5)
def test_input_shape_changes(self):
device_opt = core.DeviceOption(caffe2_pb2.IDEEP, 0)
with core.DeviceScope(device_opt):
workspace.FeedBlob(
'input_blob',
np.array(np.random.rand(10, 20, 10), dtype=np.float32))
net = core.Net('mynet')
z, _ = net.Reshape('input_blob',
['z_reshape', 'dummy_size'],
shape=(-1, 10))
workspace.CreateNet(net)
workspace.RunNet(net)
workspace.FeedBlob(
'input_blob',
np.array(np.random.rand(10, 40, 10), dtype=np.float32))
workspace.RunNet(net)
def _test_reshape(old_shape, new_shape, expected_shape=None, arg_shape=True,
in_place=False, shape_dtype=np.int32):
devices = [core.DeviceOption(caffe2_pb2.IDEEP, 0)]
for device_opt in devices:
with core.DeviceScope(device_opt):
if expected_shape is None:
expected_shape = new_shape
X = np.random.rand(*old_shape).astype(np.float32)
blob_in = 'X'
blob_out = blob_in if in_place else blob_in + '_out'
if arg_shape:
op = core.CreateOperator('Reshape',
[blob_in],
[blob_out, 'old_shape'],
shape=new_shape)
else:
op = core.CreateOperator('Reshape',
[blob_in, 'new_shape'],
[blob_out, 'old_shape'])
workspace.FeedBlob('new_shape', np.asarray(new_shape, dtype=shape_dtype),
core.DeviceOption(caffe2_pb2.CPU, 0))
workspace.FeedBlob(blob_in, X)
workspace.RunOperatorOnce(op)
Y = workspace.FetchBlob(blob_out)
np.testing.assert_allclose(Y, X.reshape(expected_shape))
if __name__ == "__main__":
unittest.main()