import torch
import numpy as np
import argparse
from typing import Dict
# debug print
DEBUG_PRINT = False
################################################################################
# configuration for random tests setup
################################################################################
# maximum number of tensors as inputs
MAX_TENSOR = 6
# maximum tensor rank
MAX_TENSOR_DIM = 5
# maximum tensor size
MAX_TENSOR_SIZE = 2**20
# use a size 1 tensor for debug
DEBUG_TENSOR = False
# tensor device
DEVICE = "cuda"
# data type for tensors
DTYPE = torch.float
# factor sorta control the depth of the model
GRAPH_FACTOR = 2
################################################################################
# helper functions
################################################################################
class WrongResultException(Exception):
pass
# randomly reduce tensor_shape while preserving it to be broadcast-compatible
# two thing are done here:
# 1. trim starting dimensions;
# 2. randomly clamp remaining dimension to be size 1;
def get_broadcast_compatible_shape(tensor_shape):
max_dim = len(tensor_shape)
num_b_dims = np.random.randint(0, max_dim + 1)
trim_head = np.random.randint(0, min(num_b_dims + 1, max_dim))
shape = tensor_shape[trim_head:max_dim]
for i in np.random.choice(range(max_dim - trim_head),
num_b_dims - trim_head,
replace=False):
shape[i] = 1
return shape
# generate random topology using seed and also flags
def random_topology_test(seed, *inp_tensor_list):
np.random.seed(int(seed.numpy().tolist()))
tensor_list = [*inp_tensor_list]
num_tensor = len(tensor_list)
# randomly add available constant value
num_const = np.random.randint(0, num_tensor + 1)
const_list = np.random.random(num_const)
if DEBUG_PRINT:
for const_item in const_list:
print("----- real number {:.10f}", const_item)
# we require all tensor to be in a single dependency set
def get_root(x, dependency_map):
if x in dependency_map:
return get_root(dependency_map[x], dependency_map)
else:
return x
d_map: Dict[int, int] = {}
num_sets = num_tensor
candidate = list(range(num_tensor))
unary_operations = [torch.sigmoid, torch.relu]
binary_operations = [torch.add, torch.sub, torch.mul]
u_op_size = len(unary_operations)
b_op_size = len(binary_operations)
num_operations = np.random.randint(num_sets - 1,
num_sets * GRAPH_FACTOR)
ret_list = []
while num_operations >= 0 or num_sets > 1:
# we start off with randomly pick a candidate and operation
index = np.random.randint(0, len(candidate))
op_index = np.random.randint(0, u_op_size + b_op_size)
lh_index = candidate[index]
rh_index = None
out_tensor = None
if DEBUG_PRINT:
print("iteration {0}, num_sets{1}, candidates {2}, tensor_list {3}, lh_index {4}, op_index {5}".format(
num_operations, num_sets, candidate, len(tensor_list), lh_index, op_index))
if num_operations >= 0:
num_operations -= 1
if op_index < u_op_size:
# unary operation, we just apply a random operation on candidate
out_tensor = unary_operations[op_index](tensor_list[lh_index])
else:
# binary operation, we randomly choose the other operand:
# 1. tensor on tensor operation -> rh_index
# 2. tensor on const operation
# we are not restricted to candidate tensor any more.
op_2_index = np.random.randint(0, len(tensor_list) + num_const)
if op_2_index < len(tensor_list):
if op_2_index == lh_index:
# if we are unlucky that we picked on the candidate again, just try
# another tensor
op_2_index = (op_2_index + 1) % len(tensor_list)
# [if rh_index: create binary operator output tensor]
rh_index = op_2_index
else:
left = tensor_list[lh_index]
right = const_list[op_2_index - len(tensor_list)]
# if np.random.randint(0, 2) > 0:
# left = const_list[op_2_index - len(tensor_list)]
# right = tensor_list[lh_index]
out_tensor = binary_operations[op_index - u_op_size](left, right)
if DEBUG_PRINT:
print("binary, op_2_index {0}, rh_index ?{1}".format(op_2_index, rh_index))
else:
# binary operation, we just randomly pick two candidates.
# this is not the most efficient way to close dependecy, as we could have
# two candidate that are actually connected
cand_index = np.random.randint(0, len(candidate))
if cand_index == index:
cand_index = (cand_index + 1) % len(candidate)
# [if rh_index: create binary operator output tensor]
rh_index = candidate[cand_index]
if DEBUG_PRINT:
print("binary rh_index ?{0}".format(rh_index))
# update candidate should happen before we remove rh_index
candidate[index] = len(tensor_list)
lh_root = get_root(lh_index, d_map)
# [if rh_index: create binary operator output tensor]
if rh_index is not None:
out_tensor = binary_operations[op_index - u_op_size](
tensor_list[lh_index],
tensor_list[rh_index])
# remove rh_index from candidate if it is used
if rh_index in candidate:
# python remove(val), not by index
candidate.remove(rh_index)
# check if we join dependency sets:
rh_root = get_root(rh_index, d_map)
if lh_root != rh_root:
num_sets -= 1
# update dependency, no need to update d_map[rh_root] when
# they are already pointing the same root
d_map[rh_root] = len(tensor_list)
# no joining, just update dependency
d_map[lh_root] = len(tensor_list)
# update candidate, this avoids us applying identical operation on
# the same tensor(s)
tensor_list.append(out_tensor)
# TODO: we should mark
# union(random_sample(tensor_list[num_tensor:]), candidate) as outputs.
# which would ensure we have no dead branch and a connected computation
# graph. However, it won't work easily if we have broadcast.
# I have disabled broadcast for now to focus on topology test.
for ind in candidate:
ret_list.append(tensor_list[ind])
out_list = np.random.choice(
range(num_tensor, len(tensor_list)),
np.random.randint(0, len(tensor_list) - num_tensor),
False)
for ind in out_list:
if ind not in candidate:
ret_list.append(tensor_list[ind])
if DEBUG_PRINT:
print("ended with tensor_list: {0}".format(len(tensor_list)))
return tuple(ret_list)
def prepareInputTensorsToRandomTopoTest(seed,
max_tensor_num,
max_tensor_dim,
max_tensor_size,
debug_tensor,
device,
dtype):
# set seed to numpy as well as torch
np.random.seed(seed)
torch.manual_seed(np.random.randint(0, seed))
# seed to pass to torch.jit.trace
seed_tensor = torch.tensor(np.random.randint(0, seed))
# random number of input tensors
num_tensor = np.random.randint(1, max_tensor_num)
# prepare randomized tensor shape
tensor_dim = np.random.randint(1, max_tensor_dim)
tensor_shape = []
numel = 1
if debug_tensor:
tensor_shape.append(1)
else:
for i in range(tensor_dim):
size_i = np.random.randint(1, int(max_tensor_size / numel / (2**(tensor_dim - i))))
size_i = min(size_i, 128 + size_i % 128)
tensor_shape.insert(0, size_i)
numel *= size_i
if DEBUG_PRINT:
print("output tensor shape: ", tensor_shape)
# vvv BROADCASTING vvv
# select tensors to be broadcasted
# TODO: enable broadcasting when we fully support it.
# num_broadcasted_tensors = np.random.randint(0, num_tensor)
num_broadcasted_tensors = np.random.randint(0, 1)
# we leave at least one tensor not broadcasted
broadcasted_tensors_indices = np.random.choice(torch.arange(num_tensor),
num_broadcasted_tensors,
replace=False)
# vvv PREPARING TENSORS vvv
tensor_list = []
for i in range(num_tensor):
if i in broadcasted_tensors_indices:
# get broadcast-compatible shape:
# Note that we are not playing with stride here, as stride doesn't affect
# codegen meaningfully.
compatible_shape = get_broadcast_compatible_shape(tensor_shape)
tensor_list.append(torch.randn(compatible_shape, device=device, dtype=dtype) * 100)
else:
tensor_list.append(torch.randn(tensor_shape, device=device, dtype=dtype) * 100)
return seed_tensor, tensor_list
def reproString(current_seed, args):
repro_str = "python {0}".format(__file__)
if args.cuda_fuser:
repro_str += " --cuda_fuser"
if args.legacy_fuser:
repro_str += " --legacy_fuser"
if args.profiling_executor:
repro_str += " --profiling_executor"
if args.fp16:
repro_str += " --fp16"
if args.cpu:
repro_str += " --cpu"
repro_str += " --max_num_tensor {0} --max_tensor_dim {1} --max_tensor_size {2}"\
" --depth_factor {3} --seed {4} --repro_run".format(
args.max_num_tensor, args.max_tensor_dim, args.max_tensor_size,
args.depth_factor, current_seed)
return repro_str
################################################################################
# global seed to repro the test
################################################################################
def runDefaultTestWithSeed(seed):
# prepare input tensors
seed_tensor, tensor_list = prepareInputTensorsToRandomTopoTest(seed,
MAX_TENSOR,
MAX_TENSOR_DIM,
MAX_TENSOR_SIZE,
DEBUG_TENSOR,
DEVICE,
DTYPE)
o = random_topology_test(seed_tensor, *tensor_list)
traced_model = torch.jit.trace(random_topology_test, (seed_tensor, *tensor_list))
jit_o = traced_model(seed_tensor, *tensor_list) # possible profiling run
jit_o = traced_model(seed_tensor, *tensor_list)
validate_o = zip(o, jit_o)
for oo, jit_oo in validate_o:
if not oo.allclose(jit_oo, atol=1e-5, equal_nan=True):
return False
return True
def runTest(seed, args):
# prepare input tensors
seed_tensor, tensor_list = prepareInputTensorsToRandomTopoTest(seed,
args.max_num_tensor,
args.max_tensor_dim,
args.max_tensor_size,
args.debug_tensor,
"cuda" if not args.cpu else "cpu",
torch.float32 if not args.fp16 else torch.float16)
# vvv run random generated topo test in eager vvv
try:
if DEBUG_PRINT:
print("seed tensor: ", seed_tensor)
o = random_topology_test(seed_tensor, *tensor_list)
if DEBUG_PRINT:
for out in o:
print("val size: ", out.size())
except Exception as err:
raise Exception("Testing script failure with error message, repro by running:\n"
f"\t{reproString(seed, args)}") from err
try:
traced_model = torch.jit.trace(random_topology_test, (seed_tensor, *tensor_list))
if DEBUG_PRINT:
print("original graph: ", traced_model.graph)
jit_o = traced_model(seed_tensor, *tensor_list) # possible profiling run
jit_o = traced_model(seed_tensor, *tensor_list)
if DEBUG_PRINT:
print("optimized graph: ", traced_model.graph_for(seed_tensor, *tensor_list))
validate_o = zip(o, jit_o)
for oo, jit_oo in validate_o:
if not oo.allclose(jit_oo, equal_nan=True):
print("eager output: ", oo)
print("jit output: ", jit_oo)
print("diff ", jit_oo - oo)
raise WrongResultException()
except WrongResultException as err:
raise Exception("cuda fuser gives wrong results, repro by running:\n"
f"\t{reproString(seed, args)}") from err
except Exception as err:
raise Exception("something in cuda fuser went wrong, repro by running:\n"
f"\t{reproString(seed, args)}") from err
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--cuda_fuser", action='store_true', default=True)
parser.add_argument("--legacy_fuser", action='store_true', default=False)
parser.add_argument("--profiling_executor", action='store_true', default=False)
parser.add_argument("--fp16", action='store_true', default=False)
parser.add_argument("--cpu", action='store_true', default=False)
parser.add_argument("--debug_print", action='store_true', default=False)
Loading ...