import copy
from collections import namedtuple
import itertools
import random
import math
import os
import sys
import time
import tempfile
import unittest
from contextlib import contextmanager, suppress
from datetime import timedelta
from functools import reduce
from typing import Union, NamedTuple
import torch
import torch.cuda
import torch.distributed as dist
import torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook as powerSGD
from torch.distributed.algorithms.ddp_comm_hooks import default_hooks as default
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel.distributed import _dump_DDP_relevant_env_vars
import torch.nn as nn
import torch.nn.functional as F
from torch.distributed.distributed_c10d import _get_default_group, AllreduceOptions, GroupMember
from torch.testing._internal.common_utils import FILE_SCHEMA
from torch.testing._internal.common_distributed import (
MultiProcessTestCase,
TEST_SKIPS,
initialize_temp_directories,
cleanup_temp_dir,
simple_sparse_reduce_tests,
skip_if_rocm,
skip_if_small_worldsize,
skip_if_lt_x_gpu,
skip_if_no_gpu,
require_n_gpus_for_nccl_backend,
requires_nccl_version,
captured_output,
)
from torch._utils_internal import TEST_MASTER_ADDR as MASTER_ADDR
from torch._utils_internal import TEST_MASTER_PORT as MASTER_PORT
try:
import torchvision
HAS_TORCHVISION = True
except ImportError:
HAS_TORCHVISION = False
if sys.platform == 'win32':
import msvcrt
else:
import fcntl
class Foo:
def __init__(self, x):
# Can be tensor or int
self.x = x
def __eq__(self, other):
def eq(value, other):
if isinstance(value, torch.Tensor):
return torch.equal(value, other)
return value == other
for attr, value in self.__dict__.items():
other_value = other.__dict__[attr]
if not eq(value, other_value):
return False
return True
f = Foo(10)
f.bar = 1
foo_cpu_tensor = Foo(torch.randn(3, 3))
COLLECTIVES_OBJECT_TEST_LIST = [
{"key1": 3, "key2": 4, "key3": {"nested": True}},
f,
foo_cpu_tensor,
"foo",
[1, 2, True, "string", [4, 5, "nested"]],
]
# Allowlist of distributed backends where profiling collectives is supported.
PROFILING_SUPPORTED_BACKENDS = [
dist.Backend.NCCL,
dist.Backend.GLOO,
]
# Allowlist of distributed backends where profiling is supported with use_cuda=True
CUDA_PROFILING_SUPPORTED_BACKENDS = [
dist.Backend.GLOO
]
# Dummy NamedTuple data structures to test DDP support for NamedTuple types.
EXPECTED_FIELDS = ("a", "b")
TestNamedTupleInput_0 = namedtuple("NamedTuple", EXPECTED_FIELDS)
class TestNamedTupleInput_1(NamedTuple):
a: torch.tensor
b: torch.tensor
skipIfNoTorchVision = unittest.skipIf(not HAS_TORCHVISION, "no torchvision")
BACKEND = os.environ["BACKEND"]
INIT_METHOD = os.getenv("INIT_METHOD", "env://")
DEFAULT_TIMEOUT = 300
CUSTOMIZED_TIMEOUT = {"test_DistributedDataParallel": 500}
class _FC2(nn.Module):
def __init__(self):
super(_FC2, self).__init__()
self.fc = nn.Linear(10, 50, bias=True)
self.fc.bias.requires_grad = False
def forward(self, x):
x = self.fc(x)
return x
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.fc1 = nn.Linear(2, 10, bias=False)
self.fc2 = _FC2()
self.fc3 = nn.Linear(50, 4, bias=False)
self.relu = nn.ReLU()
self.no_grad_param = nn.Parameter(torch.tensor([2, 2]).long(),
requires_grad=False)
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return F.softmax(x, dim=1)
class Task(nn.Module):
def __init__(self):
super().__init__()
self.p = nn.Parameter(torch.ones(2, 2))
def forward(self, x):
return self.p + x
class BatchNormNet(nn.Module):
def __init__(self):
super(BatchNormNet, self).__init__()
self.fc1 = nn.Linear(2, 40, bias=False)
self.bn = nn.BatchNorm1d(4)
self.fc2 = nn.Linear(40, 4, bias=False)
def forward(self, x):
x = torch.reshape(self.fc1(x), (-1, 4, 10))
x = self.bn(x)
x = torch.reshape(x, (-1, 40))
x = self.fc2(x)
return F.softmax(x, dim=1)
DDP_NET = Net()
BN_NET = BatchNormNet()
ONLY_SBN_NET = nn.SyncBatchNorm(2, momentum=0.99)
def get_timeout(test_id):
test_name = test_id.split(".")[-1]
if test_name in CUSTOMIZED_TIMEOUT:
return CUSTOMIZED_TIMEOUT[test_name]
else:
return DEFAULT_TIMEOUT
def require_backend(backends):
if BACKEND not in backends:
return unittest.skip("Test requires backend to be one of %s" % backends)
return lambda func: func
def require_backends_available(backends):
def check(backend):
if backend == dist.Backend.GLOO:
return dist.is_gloo_available()
if backend == dist.Backend.NCCL:
return dist.is_nccl_available()
if backend == dist.Backend.MPI:
return dist.is_mpi_available()
return False
if not all(check(dist.Backend(backend)) for backend in backends):
return unittest.skip(
"Test requires backends to be available %s" % backends)
return lambda func: func
def require_world_size(world_size):
if int(os.environ["WORLD_SIZE"]) < world_size:
return unittest.skip("Test requires world size of %d" % world_size)
return lambda func: func
def apply_hack_for_nccl():
# This is a hack for a known NCCL issue using multiprocess
# in conjunction with multiple threads to manage different GPUs which
# may cause ncclCommInitRank to fail.
# http://docs.nvidia.com/deeplearning/sdk/nccl-release-notes/rel_2.1.4.html#rel_2.1.4
# It slows down the performance of collective operations.
# Without this setting NCCL might throw unhandled error.
os.environ["NCCL_MAX_NRINGS"] = "1"
@contextmanager
def _lock():
TEMP_DIR = os.environ["TEMP_DIR"]
lockfile = os.path.join(TEMP_DIR, "lockfile")
with open(lockfile, "w") as lf:
try:
if sys.platform == 'win32':
msvcrt.locking(lf.fileno(), msvcrt.LK_RLCK, 1)
yield
else:
fcntl.flock(lf.fileno(), fcntl.LOCK_EX)
yield
finally:
if sys.platform == 'win32':
msvcrt.locking(lf.fileno(), msvcrt.LK_UNLCK, 1)
else:
fcntl.flock(lf.fileno(), fcntl.LOCK_UN)
lf.close()
def _build_tensor(size, value=None, dtype=torch.float, device_id=None):
if value is None:
value = size
if device_id is None:
return torch.empty(size, size, size, dtype=dtype).fill_(value)
else:
return torch.empty(size, size, size, dtype=dtype).fill_(value).cuda(device_id)
def _build_multidim_tensor(dim, dim_size, value=None, dtype=torch.float):
if value is None:
value = size
return torch.empty(size=[dim_size for _ in range(dim)], dtype=dtype).fill_(value)
class Barrier(object):
barrier_id = 0
@classmethod
def init(cls):
cls.barrier_id = 0
barrier_dir = os.path.join(os.environ["TEMP_DIR"], "barrier")
for f_name in os.listdir(barrier_dir):
os.unlink(os.path.join(barrier_dir, f_name))
@classmethod
def sync(cls, wait_for=None, timeout=10):
if wait_for is None:
wait_for = dist.get_world_size()
cls.barrier_id += 1
barrier_dir = os.path.join(os.environ["TEMP_DIR"], "barrier")
pid = str(os.getpid())
barrier_file = os.path.join(barrier_dir, pid)
with _lock():
with open(barrier_file, "w") as f:
f.write(str(cls.barrier_id))
start_time = time.time()
while True:
arrived = 0
with _lock():
for f_name in os.listdir(barrier_dir):
with open(os.path.join(barrier_dir, f_name), "r") as f:
data = f.read()
if int(data) >= cls.barrier_id:
arrived += 1
if arrived == wait_for:
break
if time.time() - start_time > timeout:
raise RuntimeError("barrier timeout")
time.sleep(0.1)
class TestDistBackend(MultiProcessTestCase):
@classmethod
def setUpClass(cls):
os.environ["MASTER_ADDR"] = str(MASTER_ADDR)
os.environ["MASTER_PORT"] = str(MASTER_PORT)
# os.environ["WORLD_SIZE"] = str(WORLD_SIZE)
super().setUpClass()
def setUp(self):
super().setUp()
# initialize temp directories
initialize_temp_directories()
# initialize Barrier
Barrier.init()
def tearDown(self):
cleanup_temp_dir()
super().tearDown()
@property
def init_method(self):
return "{}{file_name}".format(FILE_SCHEMA, file_name=self.file_name)
@classmethod
def _run(cls, rank, test_name, file_name):
if BACKEND == 'nccl' and not torch.cuda.is_available():
sys.exit(TEST_SKIPS['no_cuda'].exit_code)
self = cls(test_name)
self.rank = rank
self.file_name = file_name
if torch.cuda.is_available() and torch.cuda.device_count() < int(self.world_size):
sys.exit(TEST_SKIPS['multi-gpu'].exit_code)
try:
timeout = timedelta(seconds=60)
dist.init_process_group(
init_method=self.init_method,
backend=BACKEND,
world_size=int(self.world_size),
rank=self.rank,
timeout=timeout,
)
except RuntimeError as e:
if "recompile" in e.args[0]:
sys.exit(TEST_SKIPS["backend_unavailable"].exit_code)
raise
# Execute barrier prior to running test to ensure that every process
# has finished initialization and that the following test
# immediately exiting due to a skip doesn't cause flakiness.
self._barrier()
# self.id() == e.g. '__main__.TestDistributed.test_get_rank'
# We're retreiving a corresponding test and executing it.
getattr(self, test_name)()
self._barrier()
Loading ...