import unittest
import hypothesis.strategies as st
from hypothesis import given, assume
import numpy as np
import time
import os
from caffe2.proto import caffe2_pb2
from caffe2.python import core, workspace, muji, dyndep
import caffe2.python.hypothesis_test_util as hu
np.random.seed(1)
dyndep.InitOpsLibrary('@/caffe2/caffe2/contrib/nccl:nccl_ops')
def gpu_device(i):
device_option = caffe2_pb2.DeviceOption()
device_option.device_type = workspace.GpuDeviceType
device_option.device_id = i
return device_option
def benchmark(ws, net, warmups=5, iters=100):
for _ in range(warmups):
ws.run(net)
plan = core.Plan("plan")
plan.AddStep(core.ExecutionStep("test-step", net, iters))
before = time.time()
ws.run(plan)
after = time.time()
print("Timing network, time taken per-iteration: {:.6f}ms".format((
after - before) / float(iters) * 1000.0))
return after - before
@unittest.skipIf(not workspace.has_cuda_support, "NCCL only on CUDA GPU")
class NCCLOpsTest(hu.HypothesisTestCase):
@given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
m=st.integers(min_value=1, max_value=1000),
in_place=st.booleans())
def test_nccl_allreduce(self, n, m, in_place):
xs = [np.random.randn(m).astype(np.float32) for i in range(n)]
inputs = [str("x_{}".format(i)) for i in range(n)]
prefix = "" if in_place else "o"
outputs = [str("{}x_{}".format(prefix, i)) for i in range(n)]
op = core.CreateOperator("NCCLAllreduce", inputs, outputs)
input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}
def allreduce(*args):
assert len(args) == n
output = np.sum(args, axis=0)
return [output for _ in range(n)]
outputs = self.assertReferenceChecks(
hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
allreduce, input_device_options)
for output in outputs:
np.testing.assert_array_equal(outputs[0], output)
self.assertEqual(outputs[0].tobytes(), output.tobytes())
@given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
m=st.integers(min_value=1, max_value=1000),
root=st.integers(min_value=0,
max_value=workspace.NumGpuDevices() - 1))
def test_nccl_broadcast(self, n, m, root):
assume(root < n)
xs = [np.random.randn(m).astype(np.float32) for i in range(n)]
inputs = [str("x_{}".format(i)) for i in range(n)]
op = core.CreateOperator("NCCLBroadcast", inputs, inputs, root=root)
input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}
def broadcast(*args):
assert len(args) == n
return [args[root] for _ in range(n)]
self.assertReferenceChecks(
hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
broadcast, input_device_options)
@given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
m=st.integers(min_value=1, max_value=1000),
# NCCL Reduce seems to deadlock for non-zero roots.
root=st.integers(min_value=0, max_value=0),
in_place=st.booleans())
def test_nccl_reduce(self, n, m, root, in_place):
assume(in_place is False or root == 0)
xs = [np.random.randn(m).astype(np.float32) for i in range(n)]
inputs = [str("x_{}".format(i)) for i in range(n)]
op = core.CreateOperator(
"NCCLReduce", inputs,
inputs[root] if in_place else b"o", root=root)
input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}
def reduce(*args):
assert len(args) == n
return [np.sum(args, axis=0)]
self.assertReferenceChecks(
hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
reduce, input_device_options)
@given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
m=st.integers(min_value=1, max_value=1000))
def test_nccl_allgather(self, n, m):
xs = [np.random.randn(m).astype(np.float32) for i in range(n)]
inputs = [str("x_{}".format(i)) for i in range(n)]
outputs = [str("o_{}".format(i)) for i in range(n)]
op = core.CreateOperator("NCCLAllGather", inputs, outputs)
input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}
def allgather(*args):
assert len(args) == n
return [np.stack(args, axis=0) for _ in range(n)]
outputs = self.assertReferenceChecks(
hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
allgather, input_device_options)
for output in outputs:
np.testing.assert_array_equal(outputs[0], output)
self.assertEqual(outputs[0].tobytes(), output.tobytes())
@given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
m=st.integers(min_value=1, max_value=1000))
def test_nccl_reduce_scatter(self, n, m):
xs = [np.random.randn(n, m).astype(np.float32) for i in range(n)]
inputs = [str("x_{}".format(i)) for i in range(n)]
outputs = [str("o_{}".format(i)) for i in range(n)]
op = core.CreateOperator("NCCLReduceScatter", inputs, outputs)
input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}
def reduce_scatter(*args):
assert len(args) == n
reduced = sum(args)
assert len(reduced.shape) > 1
ref = [reduced[i, :] for i in range(n)]
return ref
self.assertReferenceChecks(
hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
reduce_scatter, input_device_options)
@given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
m=st.integers(min_value=100000, max_value=100000),
iters=st.integers(min_value=1, max_value=100),
net_type=st.sampled_from(["dag", "async_dag", "simple"]))
def _test_nccl_sync(self, n, m, iters, net_type):
inputs = [str("x_{}".format(i)) for i in range(n)]
extra_inputs = [str("xe_{}".format(i)) for i in range(n)]
net = core.Net("asdf")
net.Proto().type = net_type
net.Proto().num_workers = n
for i in range(n):
net.ConstantFill([], inputs[i], shape=[m], value=0.0,
device_option=gpu_device(i))
net.ConstantFill([], extra_inputs[i], shape=[m], value=1.0,
device_option=gpu_device(i))
for _ in range(iters):
net.Sum([inputs[i], extra_inputs[i]], [inputs[i]],
device_option=gpu_device(i))
net.NCCLReduce(inputs, [inputs[0]], device_option=gpu_device(0))
self.ws.run(net)
np.testing.assert_array_equal(
self.ws.blobs[inputs[0]].fetch(),
np.full(shape=(m,), fill_value=iters * n, dtype=np.float32))
@unittest.skipIf(not os.environ.get("CAFFE2_BENCHMARK"), "Benchmark")
def test_timings(self):
for n in range(2, workspace.NumGpuDevices()):
for in_place in [False, True]:
xs = [np.random.randn(1e7).astype(np.float32)
for i in range(n)]
inputs = [str("x_{}".format(i)) for i in range(n)]
prefix = "" if in_place else "o"
outputs = [str("{}x_{}".format(prefix, i)) for i in range(n)]
net = core.Net("test")
net.NCCLAllreduce(inputs, outputs)
net.RunAllOnGPU()
for i in range(n):
self.ws.create_blob(inputs[i]).feed(xs[i], gpu_device(i))
self.ws.run(net)
net_time = benchmark(self.ws, net)
vanilla = core.Net("vanilla")
muji.Allreduce(vanilla, inputs)
vanilla_time = benchmark(self.ws, vanilla)
print("Speedup for NCCL: {:.2f}".format(
vanilla_time / net_time))