# Module caffe2.python.examples.resnet50_trainer
import argparse
import logging
import numpy as np
import time
import os
from caffe2.python import core, workspace, experiment_util, data_parallel_model
from caffe2.python import dyndep, optimizer
from caffe2.python import timeout_guard, model_helper, brew
from caffe2.proto import caffe2_pb2
import caffe2.python.models.resnet as resnet
import caffe2.python.models.shufflenet as shufflenet
from caffe2.python.modeling.initializers import Initializer, PseudoFP16Initializer
import caffe2.python.predictor.predictor_exporter as pred_exp
import caffe2.python.predictor.predictor_py_utils as pred_utils
from caffe2.python.predictor_constants import predictor_constants as predictor_constants
'''
Parallelized multi-GPU distributed trainer for Resne(X)t & Shufflenet.
Can be used to train on imagenet data, for example.
The default parameters can train a standard Resnet-50 (1x64d), and parameters
can be provided to train ResNe(X)t models (e.g., ResNeXt-101 32x4d).
To run the trainer in single-machine multi-gpu mode by setting num_shards = 1.
To run the trainer in multi-machine multi-gpu mode with M machines,
run the same program on all machines, specifying num_shards = M, and
shard_id = a unique integer in the set [0, M-1].
For rendezvous (the trainer processes have to know about each other),
you can either use a directory path that is visible to all processes
(e.g. NFS directory), or use a Redis instance. Use the former by
passing the `file_store_path` argument. Use the latter by passing the
`redis_host` and `redis_port` arguments.
'''
logging.basicConfig()
log = logging.getLogger("Imagenet_trainer")
log.setLevel(logging.DEBUG)
dyndep.InitOpsLibrary('@/caffe2/caffe2/distributed:file_store_handler_ops')
dyndep.InitOpsLibrary('@/caffe2/caffe2/distributed:redis_store_handler_ops')
def AddImageInput(
model,
reader,
batch_size,
img_size,
dtype,
is_test,
mean_per_channel=None,
std_per_channel=None,
):
'''
The image input operator loads image and label data from the reader and
applies transformations to the images (random cropping, mirroring, ...).
'''
data, label = brew.image_input(
model,
reader, ["data", "label"],
batch_size=batch_size,
output_type=dtype,
use_gpu_transform=True if core.IsGPUDeviceType(model._device_type) else False,
use_caffe_datum=True,
mean_per_channel=mean_per_channel,
std_per_channel=std_per_channel,
# mean_per_channel takes precedence over mean
mean=128.,
std=128.,
scale=256,
crop=img_size,
mirror=1,
is_test=is_test,
)
data = model.StopGradient(data, data)
def AddNullInput(model, reader, batch_size, img_size, dtype):
'''
The null input function uses a gaussian fill operator to emulate real image
input. A label blob is hardcoded to a single value. This is useful if you
want to test compute throughput or don't have a dataset available.
'''
suffix = "_fp16" if dtype == "float16" else ""
model.param_init_net.GaussianFill(
[],
["data" + suffix],
shape=[batch_size, 3, img_size, img_size],
)
if dtype == "float16":
model.param_init_net.FloatToHalf("data" + suffix, "data")
model.param_init_net.ConstantFill(
[],
["label"],
shape=[batch_size],
value=1,
dtype=core.DataType.INT32,
)
def SaveModel(args, train_model, epoch, use_ideep):
prefix = "[]_{}".format(train_model._device_prefix, train_model._devices[0])
predictor_export_meta = pred_exp.PredictorExportMeta(
predict_net=train_model.net.Proto(),
parameters=data_parallel_model.GetCheckpointParams(train_model),
inputs=[prefix + "/data"],
outputs=[prefix + "/softmax"],
shapes={
prefix + "/softmax": (1, args.num_labels),
prefix + "/data": (args.num_channels, args.image_size, args.image_size)
}
)
# save the train_model for the current epoch
model_path = "%s/%s_%d.mdl" % (
args.file_store_path,
args.save_model_name,
epoch,
)
# set db_type to be "minidb" instead of "log_file_db", which breaks
# the serialization in save_to_db. Need to switch back to log_file_db
# after migration
pred_exp.save_to_db(
db_type="minidb",
db_destination=model_path,
predictor_export_meta=predictor_export_meta,
use_ideep=use_ideep
)
def LoadModel(path, model, use_ideep):
'''
Load pretrained model from file
'''
log.info("Loading path: {}".format(path))
meta_net_def = pred_exp.load_from_db(path, 'minidb')
init_net = core.Net(pred_utils.GetNet(
meta_net_def, predictor_constants.GLOBAL_INIT_NET_TYPE))
predict_init_net = core.Net(pred_utils.GetNet(
meta_net_def, predictor_constants.PREDICT_INIT_NET_TYPE))
if use_ideep:
predict_init_net.RunAllOnIDEEP()
else:
predict_init_net.RunAllOnGPU()
if use_ideep:
init_net.RunAllOnIDEEP()
else:
init_net.RunAllOnGPU()
assert workspace.RunNetOnce(predict_init_net)
assert workspace.RunNetOnce(init_net)
# Hack: fix iteration counter which is in CUDA context after load model
itercnt = workspace.FetchBlob("optimizer_iteration")
workspace.FeedBlob(
"optimizer_iteration",
itercnt,
device_option=core.DeviceOption(caffe2_pb2.CPU, 0)
)
def RunEpoch(
args,
epoch,
train_model,
test_model,
total_batch_size,
num_shards,
expname,
explog,
):
'''
Run one epoch of the trainer.
TODO: add checkpointing here.
'''
# TODO: add loading from checkpoint
log.info("Starting epoch {}/{}".format(epoch, args.num_epochs))
epoch_iters = int(args.epoch_size / total_batch_size / num_shards)
test_epoch_iters = int(args.test_epoch_size / total_batch_size / num_shards)
for i in range(epoch_iters):
# This timeout is required (temporarily) since CUDA-NCCL
# operators might deadlock when synchronizing between GPUs.
timeout = args.first_iter_timeout if i == 0 else args.timeout
with timeout_guard.CompleteInTimeOrDie(timeout):
t1 = time.time()
workspace.RunNet(train_model.net.Proto().name)
t2 = time.time()
dt = t2 - t1
fmt = "Finished iteration {}/{} of epoch {} ({:.2f} images/sec)"
log.info(fmt.format(i + 1, epoch_iters, epoch, total_batch_size / dt))
prefix = "{}_{}".format(
train_model._device_prefix,
train_model._devices[0])
accuracy = workspace.FetchBlob(prefix + '/accuracy')
loss = workspace.FetchBlob(prefix + '/loss')
train_fmt = "Training loss: {}, accuracy: {}"
log.info(train_fmt.format(loss, accuracy))
num_images = epoch * epoch_iters * total_batch_size
prefix = "{}_{}".format(train_model._device_prefix, train_model._devices[0])
accuracy = workspace.FetchBlob(prefix + '/accuracy')
loss = workspace.FetchBlob(prefix + '/loss')
learning_rate = workspace.FetchBlob(
data_parallel_model.GetLearningRateBlobNames(train_model)[0]
)
test_accuracy = 0
test_accuracy_top5 = 0
if test_model is not None:
# Run 100 iters of testing
ntests = 0
for _ in range(test_epoch_iters):
workspace.RunNet(test_model.net.Proto().name)
for g in test_model._devices:
test_accuracy += np.asscalar(workspace.FetchBlob(
"{}_{}".format(test_model._device_prefix, g) + '/accuracy'
))
test_accuracy_top5 += np.asscalar(workspace.FetchBlob(
"{}_{}".format(test_model._device_prefix, g) + '/accuracy_top5'
))
ntests += 1
test_accuracy /= ntests
test_accuracy_top5 /= ntests
else:
test_accuracy = (-1)
test_accuracy_top5 = (-1)
explog.log(
input_count=num_images,
batch_count=(i + epoch * epoch_iters),
additional_values={
'accuracy': accuracy,
'loss': loss,
'learning_rate': learning_rate,
'epoch': epoch,
'top1_test_accuracy': test_accuracy,
'top5_test_accuracy': test_accuracy_top5,
}
)
assert loss < 40, "Exploded gradients :("
# TODO: add checkpointing
return epoch + 1
def Train(args):
if args.model == "resnext":
model_name = "resnext" + str(args.num_layers)
elif args.model == "shufflenet":
model_name = "shufflenet"
# Either use specified device list or generate one
if args.gpus is not None:
gpus = [int(x) for x in args.gpus.split(',')]
num_gpus = len(gpus)
else:
gpus = list(range(args.num_gpus))
num_gpus = args.num_gpus
log.info("Running on GPUs: {}".format(gpus))
# Verify valid batch size
total_batch_size = args.batch_size
batch_per_device = total_batch_size // num_gpus
assert \
total_batch_size % num_gpus == 0, \
"Number of GPUs must divide batch size"
# Verify valid image mean/std per channel
if args.image_mean_per_channel:
assert \
len(args.image_mean_per_channel) == args.num_channels, \
"The number of channels of image mean doesn't match input"
if args.image_std_per_channel:
assert \
len(args.image_std_per_channel) == args.num_channels, \
"The number of channels of image std doesn't match input"
# Round down epoch size to closest multiple of batch size across machines
global_batch_size = total_batch_size * args.num_shards
epoch_iters = int(args.epoch_size / global_batch_size)
assert \
epoch_iters > 0, \
"Epoch size must be larger than batch size times shard count"
args.epoch_size = epoch_iters * global_batch_size
log.info("Using epoch size: {}".format(args.epoch_size))
# Create ModelHelper object
if args.use_ideep:
train_arg_scope = {
'use_cudnn': False,
'cudnn_exhaustive_search': False,
'training_mode': 1
}
else:
train_arg_scope = {
'order': 'NCHW',
'use_cudnn': True,
'cudnn_exhaustive_search': True,
'ws_nbytes_limit': (args.cudnn_workspace_limit_mb * 1024 * 1024),
}
train_model = model_helper.ModelHelper(
name=model_name, arg_scope=train_arg_scope
)
num_shards = args.num_shards
shard_id = args.shard_id
# Expect interfaces to be comma separated.
# Use of multiple network interfaces is not yet complete,
# so simply use the first one in the list.
interfaces = args.distributed_interfaces.split(",")
# Rendezvous using MPI when run with mpirun
if os.getenv("OMPI_COMM_WORLD_SIZE") is not None:
num_shards = int(os.getenv("OMPI_COMM_WORLD_SIZE", 1))
shard_id = int(os.getenv("OMPI_COMM_WORLD_RANK", 0))
if num_shards > 1:
rendezvous = dict(
kv_handler=None,
num_shards=num_shards,
shard_id=shard_id,
engine="GLOO",
transport=args.distributed_transport,
interface=interfaces[0],
mpi_rendezvous=True,
exit_nets=None)
elif num_shards > 1:
# Create rendezvous for distributed computation
store_handler = "store_handler"
if args.redis_host is not None:
# Use Redis for rendezvous if Redis host is specified
workspace.RunOperatorOnce(
core.CreateOperator(
Loading ...