import numpy as np
import unittest
from caffe2.python import core, workspace, muji, test_util
@unittest.skipIf(not workspace.has_gpu_support, "no gpu")
class TestMuji(test_util.TestCase):
def RunningAllreduceWithGPUs(self, gpu_ids, allreduce_function):
"""A base function to test different scenarios."""
net = core.Net("mujitest")
for id in gpu_ids:
net.ConstantFill(
[],
"testblob_gpu_" + str(id),
shape=[1, 2, 3, 4],
value=float(id + 1),
device_option=muji.OnGPU(id)
)
allreduce_function(
net, ["testblob_gpu_" + str(i)
for i in gpu_ids], "_reduced", gpu_ids
)
workspace.RunNetOnce(net)
target_value = sum(gpu_ids) + len(gpu_ids)
all_blobs = workspace.Blobs()
all_blobs.sort()
for blob in all_blobs:
print('{} {}'.format(blob, workspace.FetchBlob(blob)))
for idx in gpu_ids:
blob = workspace.FetchBlob("testblob_gpu_" + str(idx) + "_reduced")
np.testing.assert_array_equal(
blob,
target_value,
err_msg="gpu id %d of %s" % (idx, str(gpu_ids))
)
def testAllreduceFallback(self):
self.RunningAllreduceWithGPUs(
list(range(workspace.NumGpuDevices())), muji.AllreduceFallback
)
def testAllreduceSingleGPU(self):
for i in range(workspace.NumGpuDevices()):
self.RunningAllreduceWithGPUs([i], muji.Allreduce)
def testAllreduceWithTwoGPUs(self):
pattern = workspace.GetGpuPeerAccessPattern()
if pattern.shape[0] >= 2 and np.all(pattern[:2, :2]):
self.RunningAllreduceWithGPUs([0, 1], muji.Allreduce2)
else:
print('Skipping allreduce with 2 gpus. Not peer access ready.')
def testAllreduceWithFourGPUs(self):
pattern = workspace.GetGpuPeerAccessPattern()
if pattern.shape[0] >= 4 and np.all(pattern[:4, :4]):
self.RunningAllreduceWithGPUs([0, 1, 2, 3], muji.Allreduce4)
else:
print('Skipping allreduce with 4 gpus. Not peer access ready.')
def testAllreduceWithFourGPUsAndTwoGroups(self):
pattern = workspace.GetGpuPeerAccessPattern()
if pattern.shape[0] >= 4 and np.all(pattern[:2, :2]) and np.all(pattern[2:4, 2:4]):
self.RunningAllreduceWithGPUs([0, 1, 2, 3], muji.Allreduce4Group2)
else:
print('Skipping allreduce with 4 gpus and 2 groups. Not peer access ready.')
def testAllreduceWithEightGPUs(self):
pattern = workspace.GetGpuPeerAccessPattern()
if (
pattern.shape[0] >= 8 and np.all(pattern[:4, :4]) and
np.all(pattern[4:, 4:])
):
self.RunningAllreduceWithGPUs(
list(range(8)), muji.Allreduce8)
else:
print('Skipping allreduce with 8 gpus. Not peer access ready.')
if __name__ == '__main__':
unittest.main()