from caffe2.python import brew, core, workspace
from caffe2.python.model_helper import ModelHelper
from functools import partial
from hypothesis import given, settings
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
import hypothesis.strategies as st
import numpy as np
import torch
import unittest
def _layer_norm_ref(axis, epsilon, X):
left = int(np.prod(X.shape[:axis]))
reshaped = np.reshape(X, [left, -1])
mean = np.mean(reshaped, axis=1).reshape([left, 1])
std = np.sqrt(np.mean(np.square(reshaped), axis=1).reshape(
[left, 1]) - np.square(mean) + epsilon)
Y = (reshaped - mean) / (std)
Y = np.reshape(Y, X.shape)
mean = np.reshape(mean, X.shape[:axis] + (1,))
std = np.reshape(std, X.shape[:axis] + (1,))
return (Y, mean, std)
def _layer_norm_with_affine_ref(axis, epsilon, X, gamma, beta):
Y, mean, std = _layer_norm_ref(axis, epsilon, X)
Y = Y * gamma + beta
return (Y, mean, std)
def _layer_norm_grad_ref(axis, gout_full, norm, mean_full, stdev_full, X_full):
left = int(np.prod(X_full.shape[:axis]))
right = int(np.prod(X_full.shape[axis:]))
X = np.reshape(X_full, [left, right])
stdev = np.reshape(stdev_full, [left, 1])
mean = np.reshape(mean_full, [left, 1])
gout = np.reshape(gout_full, [left, right])
dstdev_end = (-1.0) / np.power(stdev, 2.0) \
* np.sum((X - mean) * gout, axis=1).reshape([left, 1])
dmean_end = np.sum(-1.0 / stdev * gout, axis=1).reshape([left, 1])
dx_end = 1.0 / stdev * gout
# stdev block
dmean_stdev = -1.0 * mean / stdev * dstdev_end
dx_stdev = X / (right * stdev) * dstdev_end
# mean block
dmean = dmean_end + dmean_stdev
dxmean = (1.0 / right) * dmean
# final outputs
dx = dx_end + dx_stdev + dxmean
dx = dx.reshape(X_full.shape)
return [dx]
class TestLayerNormOp(serial.SerializedTestCase):
@given(X=hu.tensor(min_dim=2), **hu.gcs)
@settings(deadline=10000)
def test_layer_norm_grad_op(self, X, gc, dc):
axis = np.random.randint(0, len(X.shape))
epsilon = 1e-4
op = core.CreateOperator(
"LayerNormGradient",
["gout", "out", "mean", "stdev", "in"],
["gin"],
axis=axis,
epsilon=epsilon,
)
norm, mean, stdev = _layer_norm_ref(axis, epsilon, X)
gout = norm
self.assertReferenceChecks(
device_option=gc,
op=op,
inputs=[gout, norm, mean, stdev, X],
reference=partial(_layer_norm_grad_ref, axis)
)
self.assertDeviceChecks(
device_options=dc,
op=op,
inputs=[gout, norm, mean, stdev, X],
outputs_to_check=[0],
)
@given(X=hu.tensor(min_dim=2),
eps=st.floats(1e-5, 1e-3),
elementwise_affine=st.booleans(),
**hu.gcs)
def test_layer_norm_op(self, X, eps, elementwise_affine, gc, dc):
axis = np.random.randint(0, len(X.shape))
op = core.CreateOperator(
"LayerNorm",
["X", "gamma", "beta"] if elementwise_affine else ["X"],
["Y", "mean", "std"],
axis=axis,
epsilon=eps,
elementwise_affine=elementwise_affine,
)
if elementwise_affine:
ref = partial(_layer_norm_with_affine_ref, axis, eps)
else:
ref = partial(_layer_norm_ref, axis, eps)
if elementwise_affine:
gamma = np.random.randn(*X.shape[axis:]).astype(np.float32)
beta = np.random.randn(*X.shape[axis:]).astype(np.float32)
inputs = [X, gamma, beta]
else:
inputs = [X]
self.assertReferenceChecks(
device_option=gc,
op=op,
inputs=inputs,
reference=ref,
)
self.assertDeviceChecks(
device_options=dc,
op=op,
inputs=inputs,
outputs_to_check=[0, 1, 2],
)
@given(M=st.integers(1, 10),
N=st.integers(10, 20),
axis=st.integers(0, 1),
eps=st.floats(1e-5, 1e-3),
elementwise_affine=st.booleans(),
**hu.gcs)
@settings(deadline=10000)
def test_layer_norm_grad(
self, M, N, axis, eps, elementwise_affine, gc, dc):
op = core.CreateOperator(
"LayerNorm",
["X", "gamma", "beta"] if elementwise_affine else ["X"],
["Y", "mean", "std"],
axis=axis,
epsilon=eps,
elementwise_affine=elementwise_affine,
)
X = np.arange(M * N).astype(np.float32)
np.random.shuffle(X)
X = X.reshape((M, N))
if elementwise_affine:
gamma = np.random.randn(*X.shape[axis:]).astype(np.float32)
beta = np.random.randn(*X.shape[axis:]).astype(np.float32)
inputs = [X, gamma, beta]
else:
inputs = [X]
for i in range(len(inputs)):
self.assertGradientChecks(gc, op, inputs, i, [0])
@unittest.skipIf(workspace.has_hip_support,
"Operator cross-calling doesn't work with hip yet")
@given(X=hu.tensor(min_dim=2),
eps=st.floats(1e-5, 1e-3),
elementwise_affine=st.booleans(),
**hu.gcs)
@settings(deadline=10000)
def test_layer_norm_op_c10(self, X, eps, elementwise_affine, gc, dc):
axis = np.random.randint(0, len(X.shape))
op = core.CreateOperator(
"C10LayerNorm_DontUseThisOpYet",
["X", "gamma", "beta"] if elementwise_affine else ["X"],
["Y", "mean", "std"],
axis=axis,
epsilon=eps,
elementwise_affine=elementwise_affine,
)
if elementwise_affine:
ref = partial(_layer_norm_with_affine_ref, axis, eps)
else:
ref = partial(_layer_norm_ref, axis, eps)
if elementwise_affine:
gamma = np.random.randn(*X.shape[axis:]).astype(np.float32)
beta = np.random.randn(*X.shape[axis:]).astype(np.float32)
inputs = [X, gamma, beta]
else:
inputs = [X]
self.assertReferenceChecks(
device_option=gc,
op=op,
inputs=inputs,
reference=ref,
)
self.assertDeviceChecks(
device_options=dc,
op=op,
inputs=inputs,
outputs_to_check=[0, 1, 2],
)
@unittest.skipIf(workspace.has_hip_support,
"Operator cross-calling doesn't work with hip yet")
@given(X=hu.tensor(min_dim=2),
eps=st.floats(1e-5, 1e-3),
elementwise_affine=st.booleans(),
**hu.gcs)
def test_layer_norm_op_c10_preallocated_outputs(
self, X, eps, elementwise_affine, gc, dc):
# This test case ensures that it works correctly when output tensors are
# preallocated.
axis = np.random.randint(0, len(X.shape))
self.ws.create_blob("X").feed(X)
if elementwise_affine:
gamma = np.random.randn(*X.shape[axis:]).astype(np.float32)
beta = np.random.randn(*X.shape[axis:]).astype(np.float32)
self.ws.create_blob("gamma").feed(gamma)
self.ws.create_blob("beta").feed(beta)
m = ModelHelper(name="test")
m.net.C10LayerNorm_DontUseThisOpYet(
["X", "gamma", "beta"] if elementwise_affine else ["X"],
["Y", "mean", "std"],
axis=axis,
epsilon=eps,
elementwise_affine=elementwise_affine,
)
self.ws.create_net(m.param_init_net).run()
net = self.ws.create_net(m.net)
# run two times to be extra sure that the outputs are preallocated
net.run()
net.run()
if elementwise_affine:
expected_norm, expected_mean, expected_std = \
_layer_norm_with_affine_ref(axis, eps, X, gamma, beta)
else:
expected_norm, expected_mean, expected_std = _layer_norm_ref(
axis, eps, X)
actual_norm = self.ws.fetch_blob('Y')
actual_mean = self.ws.fetch_blob('mean')
actual_std = self.ws.fetch_blob('std')
torch.testing.assert_allclose(
expected_norm, actual_norm, rtol=1e-4, atol=1e-4)
torch.testing.assert_allclose(expected_mean, actual_mean)
torch.testing.assert_allclose(expected_std, actual_std)
@given(X=hu.tensor(min_dim=2),
eps=st.floats(1e-5, 1e-3),
elementwise_affine=st.booleans(),
**hu.gcs)
def test_layer_norm_op_pytorch(self, X, eps, elementwise_affine, gc, dc):
axis = np.random.randint(0, len(X.shape))
if elementwise_affine:
gamma = np.random.randn(*X.shape[axis:]).astype(np.float32)
beta = np.random.randn(*X.shape[axis:]).astype(np.float32)
expected_norm, expected_mean, expected_std = \
_layer_norm_with_affine_ref(axis, eps, X, gamma, beta)
actual_norm, actual_mean, actual_std = torch.ops._caffe2.LayerNorm(
torch.tensor(X), torch.tensor(gamma), torch.tensor(beta),
axis, eps, True)
else:
expected_norm, expected_mean, expected_std = _layer_norm_ref(
axis, eps, X)
actual_norm, actual_mean, actual_std = torch.ops._caffe2.LayerNorm(
torch.tensor(X), None, None, axis, eps)
torch.testing.assert_allclose(
expected_norm, actual_norm, rtol=1e-4, atol=1e-4)
torch.testing.assert_allclose(expected_mean, actual_mean)
torch.testing.assert_allclose(expected_std, actual_std)
# Test case is using workspace.has_cuda_support and not
# workspace.has_gpu_support to exclude it from HIP because tensor interop
# doesn't work for HIP tensors yet
@unittest.skipIf(not workspace.has_cuda_support, "No cuda support")
@given(X=hu.tensor(min_dim=2),
eps=st.floats(1e-5, 1e-3),
elementwise_affine=st.booleans())
def test_layer_norm_op_pytorch_cuda(self, X, eps, elementwise_affine):
axis = np.random.randint(0, len(X.shape))
if elementwise_affine:
gamma = np.random.randn(*X.shape[axis:]).astype(np.float32)
beta = np.random.randn(*X.shape[axis:]).astype(np.float32)
expected_norm, expected_mean, expected_std = \
_layer_norm_with_affine_ref(axis, eps, X, gamma, beta)
actual_norm, actual_mean, actual_std = torch.ops._caffe2.LayerNorm(
torch.tensor(X).cuda(),
torch.tensor(gamma).cuda(),
torch.tensor(beta).cuda(),
axis,
eps,
True)
else:
expected_norm, expected_mean, expected_std = _layer_norm_ref(
axis, eps, X)
actual_norm, actual_mean, actual_std = torch.ops._caffe2.LayerNorm(
torch.tensor(X).cuda(), None, None, axis, eps)
torch.testing.assert_allclose(
expected_norm, actual_norm.cpu(), rtol=1e-4, atol=1e-4)
torch.testing.assert_allclose(expected_mean, actual_mean.cpu())
torch.testing.assert_allclose(expected_std, actual_std.cpu())
@given(X=hu.tensor(min_dim=2),
eps=st.floats(1e-5, 1e-3),
elementwise_affine=st.booleans(),
**hu.gcs)
@settings(deadline=1000)
def test_layer_norm_op_jit(self, X, eps, elementwise_affine, gc, dc):
@torch.jit.script
def jit_layer_norm(X, gamma=None, beta=None, axis=1, eps=1e-5,
elementwise_affine=False):
# type: (Tensor, Optional[Tensor], Optional[Tensor], int, float, bool) -> Tuple[Tensor, Tensor, Tensor]
return torch.ops._caffe2.LayerNorm(
X, gamma, beta, axis, eps, elementwise_affine)
axis = np.random.randint(0, len(X.shape))
if elementwise_affine:
gamma = np.random.randn(*X.shape[axis:]).astype(np.float32)
beta = np.random.randn(*X.shape[axis:]).astype(np.float32)
expected_norm, expected_mean, expected_std = \
_layer_norm_with_affine_ref(axis, eps, X, gamma, beta)
actual_norm, actual_mean, actual_std = jit_layer_norm(
torch.Tensor(X), torch.tensor(gamma), torch.tensor(beta),
axis, eps, elementwise_affine)
else:
expected_norm, expected_mean, expected_std = _layer_norm_ref(
axis, eps, X)
Loading ...