from caffe2.python import brew, core, utils, workspace
import caffe2.python.hip_test_util as hiputl
import caffe2.python.hypothesis_test_util as hu
from caffe2.python.model_helper import ModelHelper
import caffe2.python.serialized_test.serialized_test_util as serial
from hypothesis import given, assume, settings
import hypothesis.strategies as st
import numpy as np
import unittest
class TestSpatialBN(serial.SerializedTestCase):
@serial.given(size=st.integers(7, 10),
input_channels=st.integers(1, 10),
batch_size=st.integers(0, 3),
seed=st.integers(0, 65535),
order=st.sampled_from(["NCHW", "NHWC"]),
epsilon=st.floats(min_value=1e-5, max_value=1e-2),
inplace=st.booleans(),
engine=st.sampled_from(["", "CUDNN"]),
**hu.gcs)
def test_spatialbn_test_mode_3d(
self, size, input_channels, batch_size, seed, order, epsilon,
inplace, engine, gc, dc):
op = core.CreateOperator(
"SpatialBN",
["X", "scale", "bias", "mean", "var"],
["X" if inplace else "Y"],
order=order,
is_test=True,
epsilon=epsilon,
engine=engine,
)
def reference_spatialbn_test(X, scale, bias, mean, var):
if order == "NCHW":
scale = scale[np.newaxis, :,
np.newaxis, np.newaxis, np.newaxis]
bias = bias[np.newaxis, :, np.newaxis, np.newaxis, np.newaxis]
mean = mean[np.newaxis, :, np.newaxis, np.newaxis, np.newaxis]
var = var[np.newaxis, :, np.newaxis, np.newaxis, np.newaxis]
return ((X - mean) / np.sqrt(var + epsilon) * scale + bias,)
np.random.seed(1701)
scale = np.random.rand(input_channels).astype(np.float32) + 0.5
bias = np.random.rand(input_channels).astype(np.float32) - 0.5
mean = np.random.randn(input_channels).astype(np.float32)
var = np.random.rand(input_channels).astype(np.float32) + 0.5
X = np.random.rand(batch_size, input_channels, size, size, size)\
.astype(np.float32) - 0.5
if order == "NHWC":
X = utils.NCHW2NHWC(X)
self.assertReferenceChecks(gc, op, [X, scale, bias, mean, var],
reference_spatialbn_test)
self.assertDeviceChecks(dc, op, [X, scale, bias, mean, var], [0])
@unittest.skipIf(not workspace.has_gpu_support, "No gpu support")
@given(size=st.integers(7, 10),
input_channels=st.integers(1, 10),
batch_size=st.integers(0, 3),
seed=st.integers(0, 65535),
order=st.sampled_from(["NCHW", "NHWC"]),
epsilon=st.floats(min_value=1e-5, max_value=1e-2),
inplace=st.booleans(),
engine=st.sampled_from(["", "CUDNN"]),
**hu.gcs)
def test_spatialbn_test_mode_1d(
self, size, input_channels, batch_size, seed, order, epsilon,
inplace, engine, gc, dc):
# Currently MIOPEN SpatialBN only supports 2D
if hiputl.run_in_hip(gc, dc):
assume(engine != "CUDNN")
op = core.CreateOperator(
"SpatialBN",
["X", "scale", "bias", "mean", "var"],
["X" if inplace else "Y"],
order=order,
is_test=True,
epsilon=epsilon,
engine=engine,
)
def reference_spatialbn_test(X, scale, bias, mean, var):
if order == "NCHW":
scale = scale[np.newaxis, :, np.newaxis]
bias = bias[np.newaxis, :, np.newaxis]
mean = mean[np.newaxis, :, np.newaxis]
var = var[np.newaxis, :, np.newaxis]
return ((X - mean) / np.sqrt(var + epsilon) * scale + bias,)
np.random.seed(1701)
scale = np.random.rand(input_channels).astype(np.float32) + 0.5
bias = np.random.rand(input_channels).astype(np.float32) - 0.5
mean = np.random.randn(input_channels).astype(np.float32)
var = np.random.rand(input_channels).astype(np.float32) + 0.5
X = np.random.rand(
batch_size, input_channels, size).astype(np.float32) - 0.5
if order == "NHWC":
X = X.swapaxes(1, 2)
self.assertReferenceChecks(gc, op, [X, scale, bias, mean, var],
reference_spatialbn_test)
self.assertDeviceChecks(dc, op, [X, scale, bias, mean, var], [0])
@given(size=st.integers(7, 10),
input_channels=st.integers(1, 10),
batch_size=st.integers(0, 3),
seed=st.integers(0, 65535),
order=st.sampled_from(["NCHW", "NHWC"]),
epsilon=st.floats(min_value=1e-5, max_value=1e-2),
engine=st.sampled_from(["", "CUDNN"]),
inplace=st.booleans(),
**hu.gcs)
def test_spatialbn_test_mode(
self, size, input_channels, batch_size, seed, order, epsilon,
inplace, engine, gc, dc):
# Currently HIP SpatialBN only supports NCHW
if hiputl.run_in_hip(gc, dc):
assume(order == "NCHW")
op = core.CreateOperator(
"SpatialBN",
["X", "scale", "bias", "mean", "var"],
["X" if inplace else "Y"],
order=order,
is_test=True,
epsilon=epsilon,
engine=engine
)
def reference_spatialbn_test(X, scale, bias, mean, var):
if order == "NCHW":
scale = scale[np.newaxis, :, np.newaxis, np.newaxis]
bias = bias[np.newaxis, :, np.newaxis, np.newaxis]
mean = mean[np.newaxis, :, np.newaxis, np.newaxis]
var = var[np.newaxis, :, np.newaxis, np.newaxis]
return ((X - mean) / np.sqrt(var + epsilon) * scale + bias,)
np.random.seed(1701)
scale = np.random.rand(input_channels).astype(np.float32) + 0.5
bias = np.random.rand(input_channels).astype(np.float32) - 0.5
mean = np.random.randn(input_channels).astype(np.float32)
var = np.random.rand(input_channels).astype(np.float32) + 0.5
X = np.random.rand(
batch_size, input_channels, size, size).astype(np.float32) - 0.5
if order == "NHWC":
X = X.swapaxes(1, 2).swapaxes(2, 3)
self.assertReferenceChecks(gc, op, [X, scale, bias, mean, var],
reference_spatialbn_test)
self.assertDeviceChecks(dc, op, [X, scale, bias, mean, var], [0])
@given(size=st.integers(1, 10),
input_channels=st.integers(1, 10),
batch_size=st.integers(0, 3),
seed=st.integers(0, 65535),
order=st.sampled_from(["NCHW", "NHWC"]),
epsilon=st.floats(1e-5, 1e-2),
momentum=st.floats(0.5, 0.9),
engine=st.sampled_from(["", "CUDNN"]),
inplace=st.sampled_from([True, False]),
**hu.gcs)
def test_spatialbn_train_mode(
self, size, input_channels, batch_size, seed, order, epsilon,
momentum, inplace, engine, gc, dc):
# Currently HIP SpatialBN only supports NCHW
if hiputl.run_in_hip(gc, dc):
assume(order == "NCHW")
assume(batch_size == 0 or batch_size * size * size > 1)
op = core.CreateOperator(
"SpatialBN",
["X", "scale", "bias", "running_mean", "running_var"],
["X" if inplace else "Y",
"running_mean", "running_var", "saved_mean", "saved_var"],
order=order,
is_test=False,
epsilon=epsilon,
momentum=momentum,
engine=engine,
)
np.random.seed(1701)
scale = np.random.randn(input_channels).astype(np.float32)
bias = np.random.rand(input_channels).astype(np.float32) - 0.5
mean = np.random.randn(input_channels).astype(np.float32)
var = np.random.rand(input_channels).astype(np.float32) + 0.5
X = np.random.randn(
batch_size, input_channels, size, size).astype(np.float32)
if order == "NHWC":
X = np.transpose(X, (0, 2, 3, 1))
def batch_norm_ref(X, scale, bias, running_mean, running_var):
if batch_size == 0:
Y = np.zeros(X.shape)
saved_mean = np.zeros(running_mean.shape)
saved_var = np.zeros(running_var.shape)
return (Y, running_mean, running_var, saved_mean, saved_var)
if order == "NHWC":
X = np.transpose(X, (0, 3, 1, 2))
C = X.shape[1]
reduce_size = batch_size * size * size
saved_mean = np.mean(X, (0, 2, 3))
saved_var = np.var(X, (0, 2, 3))
if reduce_size == 1:
unbias_scale = float('inf')
else:
unbias_scale = reduce_size / (reduce_size - 1)
running_mean = momentum * running_mean + (
1.0 - momentum) * saved_mean
running_var = momentum * running_var + (
1.0 - momentum) * unbias_scale * saved_var
std = np.sqrt(saved_var + epsilon)
broadcast_shape = (1, C, 1, 1)
Y = (X - np.reshape(saved_mean, broadcast_shape)) / np.reshape(
std, broadcast_shape) * np.reshape(
scale, broadcast_shape) + np.reshape(bias, broadcast_shape)
if order == "NHWC":
Y = np.transpose(Y, (0, 2, 3, 1))
return (Y, running_mean, running_var, saved_mean, 1.0 / std)
self.assertReferenceChecks(gc, op, [X, scale, bias, mean, var],
batch_norm_ref)
self.assertDeviceChecks(dc, op, [X, scale, bias, mean, var],
[0, 1, 2, 3, 4])
@given(size=st.integers(7, 10),
input_channels=st.integers(1, 10),
batch_size=st.integers(0, 3),
seed=st.integers(0, 65535),
order=st.sampled_from(["NCHW", "NHWC"]),
epsilon=st.floats(min_value=1e-5, max_value=1e-2),
momentum=st.floats(0.5, 0.9),
engine=st.sampled_from(["", "CUDNN"]),
**hu.gcs)
@settings(deadline=None, max_examples=50)
def test_spatialbn_train_mode_gradient_check(
self, size, input_channels, batch_size, seed, order, epsilon,
momentum, engine, gc, dc):
# Currently HIP SpatialBN only supports NCHW
if hiputl.run_in_hip(gc, dc):
assume(order == "NCHW")
op = core.CreateOperator(
"SpatialBN",
["X", "scale", "bias", "mean", "var"],
["Y", "mean", "var", "saved_mean", "saved_var"],
order=order,
is_test=False,
epsilon=epsilon,
momentum=momentum,
engine=engine
)
np.random.seed(seed)
scale = np.random.rand(input_channels).astype(np.float32) + 0.5
bias = np.random.rand(input_channels).astype(np.float32) - 0.5
mean = np.random.randn(input_channels).astype(np.float32)
var = np.random.rand(input_channels).astype(np.float32) + 0.5
X = np.random.rand(
batch_size, input_channels, size, size).astype(np.float32) - 0.5
if order == "NHWC":
X = X.swapaxes(1, 2).swapaxes(2, 3)
for input_to_check in [0, 1, 2]: # dX, dScale, dBias
self.assertGradientChecks(gc, op, [X, scale, bias, mean, var],
input_to_check, [0])
@given(size=st.integers(7, 10),
input_channels=st.integers(1, 10),
batch_size=st.integers(0, 3),
seed=st.integers(0, 65535),
order=st.sampled_from(["NCHW", "NHWC"]),
epsilon=st.floats(min_value=1e-5, max_value=1e-2),
momentum=st.floats(min_value=0.5, max_value=0.9),
engine=st.sampled_from(["", "CUDNN"]),
**hu.gcs)
@settings(deadline=10000)
def test_spatialbn_train_mode_gradient_check_1d(
self, size, input_channels, batch_size, seed, order, epsilon,
momentum, engine, gc, dc):
# Currently MIOPEN SpatialBN only supports 2D
if hiputl.run_in_hip(gc, dc):
assume(engine != "CUDNN")
op = core.CreateOperator(
"SpatialBN",
["X", "scale", "bias", "mean", "var"],
["Y", "mean", "var", "saved_mean", "saved_var"],
order=order,
is_test=False,
epsilon=epsilon,
momentum=momentum,
engine=engine,
)
np.random.seed(seed)
scale = np.random.rand(input_channels).astype(np.float32) + 0.5
bias = np.random.rand(input_channels).astype(np.float32) - 0.5
mean = np.random.randn(input_channels).astype(np.float32)
var = np.random.rand(input_channels).astype(np.float32) + 0.5
X = np.random.rand(
batch_size, input_channels, size).astype(np.float32) - 0.5
if order == "NHWC":
X = X.swapaxes(1, 2)
for input_to_check in [0, 1, 2]: # dX, dScale, dBias
self.assertGradientChecks(gc, op, [X, scale, bias, mean, var],
input_to_check, [0], stepsize=0.01)
@given(N=st.integers(0, 5),
C=st.integers(1, 10),
H=st.integers(1, 5),
W=st.integers(1, 5),
epsilon=st.floats(1e-5, 1e-2),
momentum=st.floats(0.5, 0.9),
order=st.sampled_from(["NCHW", "NHWC"]),
num_batches=st.integers(2, 5),
in_place=st.booleans(),
engine=st.sampled_from(["", "CUDNN"]),
**hu.gcs)
def test_spatial_bn_multi_batch(
self, N, C, H, W, epsilon, momentum, order, num_batches, in_place,
engine, gc, dc):
if in_place:
outputs = ["Y", "mean", "var", "batch_mean", "batch_var"]
else:
outputs = ["Y", "mean", "var", "saved_mean", "saved_var"]
op = core.CreateOperator(
"SpatialBN",
["X", "scale", "bias", "mean", "var", "batch_mean", "batch_var"],
outputs,
Loading ...