Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

/ contrib / nccl / nccl_ops_test.py






import unittest
import hypothesis.strategies as st
from hypothesis import given, assume
import numpy as np
import time
import os
from caffe2.proto import caffe2_pb2
from caffe2.python import core, workspace, muji, dyndep
import caffe2.python.hypothesis_test_util as hu

np.random.seed(1)

dyndep.InitOpsLibrary('@/caffe2/caffe2/contrib/nccl:nccl_ops')


def gpu_device(i):
    device_option = caffe2_pb2.DeviceOption()
    device_option.device_type = workspace.GpuDeviceType
    device_option.device_id = i
    return device_option


def benchmark(ws, net, warmups=5, iters=100):
    for _ in range(warmups):
        ws.run(net)
    plan = core.Plan("plan")
    plan.AddStep(core.ExecutionStep("test-step", net, iters))
    before = time.time()
    ws.run(plan)
    after = time.time()
    print("Timing network, time taken per-iteration: {:.6f}ms".format((
        after - before) / float(iters) * 1000.0))
    return after - before


@unittest.skipIf(not workspace.has_cuda_support, "NCCL only on CUDA GPU")
class NCCLOpsTest(hu.HypothesisTestCase):
    @given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
           m=st.integers(min_value=1, max_value=1000),
           in_place=st.booleans())
    def test_nccl_allreduce(self, n, m, in_place):
        xs = [np.random.randn(m).astype(np.float32) for i in range(n)]
        inputs = [str("x_{}".format(i)) for i in range(n)]
        prefix = "" if in_place else "o"
        outputs = [str("{}x_{}".format(prefix, i)) for i in range(n)]
        op = core.CreateOperator("NCCLAllreduce", inputs, outputs)
        input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}

        def allreduce(*args):
            assert len(args) == n
            output = np.sum(args, axis=0)
            return [output for _ in range(n)]

        outputs = self.assertReferenceChecks(
            hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
            allreduce, input_device_options)
        for output in outputs:
            np.testing.assert_array_equal(outputs[0], output)
            self.assertEqual(outputs[0].tobytes(), output.tobytes())

    @given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
           m=st.integers(min_value=1, max_value=1000),
           root=st.integers(min_value=0,
                            max_value=workspace.NumGpuDevices() - 1))
    def test_nccl_broadcast(self, n, m, root):
        assume(root < n)
        xs = [np.random.randn(m).astype(np.float32) for i in range(n)]
        inputs = [str("x_{}".format(i)) for i in range(n)]
        op = core.CreateOperator("NCCLBroadcast", inputs, inputs, root=root)
        input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}

        def broadcast(*args):
            assert len(args) == n
            return [args[root] for _ in range(n)]

        self.assertReferenceChecks(
            hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
            broadcast, input_device_options)

    @given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
           m=st.integers(min_value=1, max_value=1000),
           # NCCL Reduce seems to deadlock for non-zero roots.
           root=st.integers(min_value=0, max_value=0),
           in_place=st.booleans())
    def test_nccl_reduce(self, n, m, root, in_place):
        assume(in_place is False or root == 0)
        xs = [np.random.randn(m).astype(np.float32) for i in range(n)]
        inputs = [str("x_{}".format(i)) for i in range(n)]
        op = core.CreateOperator(
            "NCCLReduce", inputs,
            inputs[root] if in_place else b"o", root=root)
        input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}

        def reduce(*args):
            assert len(args) == n
            return [np.sum(args, axis=0)]

        self.assertReferenceChecks(
            hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
            reduce, input_device_options)

    @given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
           m=st.integers(min_value=1, max_value=1000))
    def test_nccl_allgather(self, n, m):
        xs = [np.random.randn(m).astype(np.float32) for i in range(n)]
        inputs = [str("x_{}".format(i)) for i in range(n)]
        outputs = [str("o_{}".format(i)) for i in range(n)]
        op = core.CreateOperator("NCCLAllGather", inputs, outputs)
        input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}

        def allgather(*args):
            assert len(args) == n
            return [np.stack(args, axis=0) for _ in range(n)]

        outputs = self.assertReferenceChecks(
            hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
            allgather, input_device_options)
        for output in outputs:
            np.testing.assert_array_equal(outputs[0], output)
            self.assertEqual(outputs[0].tobytes(), output.tobytes())

    @given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
           m=st.integers(min_value=1, max_value=1000))
    def test_nccl_reduce_scatter(self, n, m):
        xs = [np.random.randn(n, m).astype(np.float32) for i in range(n)]
        inputs = [str("x_{}".format(i)) for i in range(n)]
        outputs = [str("o_{}".format(i)) for i in range(n)]
        op = core.CreateOperator("NCCLReduceScatter", inputs, outputs)
        input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}

        def reduce_scatter(*args):
            assert len(args) == n
            reduced = sum(args)
            assert len(reduced.shape) > 1
            ref = [reduced[i, :] for i in range(n)]
            return ref

        self.assertReferenceChecks(
            hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
            reduce_scatter, input_device_options)

    @given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
           m=st.integers(min_value=100000, max_value=100000),
           iters=st.integers(min_value=1, max_value=100),
           net_type=st.sampled_from(["dag", "async_dag", "simple"]))
    def _test_nccl_sync(self, n, m, iters, net_type):
        inputs = [str("x_{}".format(i)) for i in range(n)]
        extra_inputs = [str("xe_{}".format(i)) for i in range(n)]
        net = core.Net("asdf")
        net.Proto().type = net_type
        net.Proto().num_workers = n
        for i in range(n):
            net.ConstantFill([], inputs[i], shape=[m], value=0.0,
                             device_option=gpu_device(i))
            net.ConstantFill([], extra_inputs[i], shape=[m], value=1.0,
                             device_option=gpu_device(i))
            for _ in range(iters):
                net.Sum([inputs[i], extra_inputs[i]], [inputs[i]],
                        device_option=gpu_device(i))
        net.NCCLReduce(inputs, [inputs[0]], device_option=gpu_device(0))
        self.ws.run(net)
        np.testing.assert_array_equal(
            self.ws.blobs[inputs[0]].fetch(),
            np.full(shape=(m,), fill_value=iters * n, dtype=np.float32))

    @unittest.skipIf(not os.environ.get("CAFFE2_BENCHMARK"), "Benchmark")
    def test_timings(self):
        for n in range(2, workspace.NumGpuDevices()):
            for in_place in [False, True]:
                xs = [np.random.randn(1e7).astype(np.float32)
                      for i in range(n)]
                inputs = [str("x_{}".format(i)) for i in range(n)]
                prefix = "" if in_place else "o"
                outputs = [str("{}x_{}".format(prefix, i)) for i in range(n)]

                net = core.Net("test")
                net.NCCLAllreduce(inputs, outputs)
                net.RunAllOnGPU()
                for i in range(n):
                    self.ws.create_blob(inputs[i]).feed(xs[i], gpu_device(i))
                self.ws.run(net)
                net_time = benchmark(self.ws, net)
                vanilla = core.Net("vanilla")
                muji.Allreduce(vanilla, inputs)
                vanilla_time = benchmark(self.ws, vanilla)
                print("Speedup for NCCL: {:.2f}".format(
                    vanilla_time / net_time))