## @package data_parallel_model
# Module caffe2.python.data_parallel_model
from collections import OrderedDict
from future.utils import viewitems, viewkeys, viewvalues
import logging
import copy
from multiprocessing import cpu_count
from caffe2.python import \
model_helper, dyndep, scope, workspace, core, memonger, utils
from caffe2.proto import caffe2_pb2
import numpy as np
import warnings
dyndep.InitOpsLibrary("@/caffe2/caffe2/contrib/gloo:gloo_ops")
# We only import nccl operators when the machine has GPUs
# Otherwise the binary can be compiled with CPU-only mode, and
# will not be able to find those modules
if workspace.NumGpuDevices() > 0:
dyndep.InitOpsLibrary("@/caffe2/caffe2/contrib/nccl:nccl_ops")
dyndep.InitOpsLibrary("@/caffe2/caffe2/contrib/gloo:gloo_ops_gpu")
log = logging.getLogger("data_parallel_model")
log.setLevel(logging.INFO)
_DEFAULT_TIMEOUT_SEC = 30
_DEFAULT_BARRIER_NET_TIMEOUT_SEC = 300
def Parallelize_GPU(*args, **kwargs):
kwargs['cpu_device'] = False
Parallelize(*args, **kwargs)
def Parallelize_CPU(*args, **kwargs):
kwargs['cpu_device'] = True
Parallelize(*args, **kwargs)
def Parallelize_iDeep(*args, **kwargs):
kwargs['ideep'] = True
Parallelize(*args, **kwargs)
def Parallelize(
model_helper_obj,
input_builder_fun,
forward_pass_builder_fun,
param_update_builder_fun=None,
optimizer_builder_fun=None,
post_sync_builder_fun=None,
pre_grad_net_transformer_fun=None,
net_transformer_fun=None,
devices=None,
rendezvous=None,
net_type='dag',
broadcast_computed_params=True,
optimize_gradient_memory=False,
dynamic_memory_management=False,
blobs_to_keep=None,
use_nccl=False,
max_concurrent_distributed_ops=16,
cpu_device=False,
ideep=False,
num_threads_per_device=4,
shared_model=False,
combine_spatial_bn=False,
barrier_net_timeout_sec=_DEFAULT_BARRIER_NET_TIMEOUT_SEC,
):
'''
Function to create a model that can run on many GPUs or CPUs.
model_helper_obj: an object of ModelHelper
input_builder_fun:
Function that adds the input operators
Note: Remember to instantiate reader outside of this
function so all devices share same reader object.
Signature: input_builder_fun(model)
forward_pass_builder_fun:
Function to add the operators to the model.
Must return list of loss-blob references that
are used to build the gradient. Loss scale parameter
is passed, as you should scale the loss of your model
by 1.0 / the total number of devices.
Signature: forward_pass_builder_fun(model, loss_scale)
param_update_builder_fun:
Function that adds operators that are run after
gradient update, such as updating the weights and
weight decaying. This is called for each GPU separately.
Signature: param_update_builder_fun(model)
optimizer_builder_fun:
Alternative to param_update_builder_fun, allows one
to add an optimizer for the whole model. Called only
once, without name or devicescope.
net_transformer_fun:
Optional function to transform the network after the
network is built. It will be called once (NOT once per
GPU.)
Signature:
net_transformer_fun(
model, num_devices, device_prefix, device_type)
pre_grad_net_transformer_fun:
Optional function to transform the network similar to
net_transformer_fun, but happens before gradient ops
been add.
Signature: pre_grad_net_transformer_fun(model)
post_sync_builder_fun:
Function applied after initial parameter sync has been
completed, such as keeping multi-precision parameters
in sync.
Signature: post_sync_builder_fun(model)
devices: List of GPU ids, such as [0, 1, 2, 3],
rendezvous: used for rendezvous in distributed computation, if None
then only one node is used. To create rendezvous,
use <TBD>.
net_type: Network type
optimize_gradient_memory: whether to apply 'memonger' to share blobs
shared_model (only for CPU) use same parameters on each device
in gradient computation to reduce memory footprint.
dynamic_memory_management: Whether to apply dynamic memory optimization
by freeing unused blobs. The underlying (de)allocation
uses cached allocator. For GPU training PLEASE MAKE SURE
caffe2_cuda_memory_pool is set.
blobs_to_keep : A list of blob names to keep and don't free during
dynamic memory optimization (for example loss blob).
cpu_device Use CPU instead of GPU.
ideep Use ideep.
combine_spatial_bn:
When set to True, applies batch normalization across
all devices within the node. If False, batch
normalization will be done separately for each device.
This option is currently only supported on the CPU.
barrier_net_timeout_sec:
The timeout in seconds of the barrier net, which is run
to synchronize shards before a training epoch starts.
Defaults to 300 seconds.
'''
assert scope.CurrentDeviceScope() is None \
or scope.CurrentDeviceScope().device_type == caffe2_pb2.CPU, \
"Parallelize must be called without device-scope, \
device scope was: {}".format(scope.CurrentDeviceScope())
if devices is None:
if not (cpu_device or ideep):
devices = list(range(0, workspace.NumCudaDevices()))
else:
devices = list(range(0, cpu_count()))
if not (cpu_device or ideep):
for gpu in devices:
if gpu >= workspace.NumGpuDevices():
log.warning("** Only {} GPUs available, GPUs {} requested".format(
workspace.NumGpuDevices(), devices))
break
model_helper_obj._device_type = workspace.GpuDeviceType
model_helper_obj._device_prefix = "gpu"
model_helper_obj._shared_model = False
device_name = "GPU"
assert shared_model is False, "Shared model only supported on CPU"
elif ideep:
model_helper_obj._device_type = caffe2_pb2.IDEEP
model_helper_obj._device_prefix = "ideep"
device_name = "IDEEP"
model_helper_obj._shared_model = shared_model
if shared_model and rendezvous is not None:
assert "Shared model only supported on single-node currently"
else:
model_helper_obj._device_type = caffe2_pb2.CPU
model_helper_obj._device_prefix = "cpu"
device_name = "CPU"
model_helper_obj._shared_model = shared_model
if shared_model and rendezvous is not None:
assert "Shared model only supported on single-node currently"
log.info("Parallelizing model for devices: {}".format(devices))
extra_workers = 8 if rendezvous is not None else 0 # best-guess
num_workers = len(devices) * num_threads_per_device + extra_workers
max_concurrent_distributed_ops =\
min(max_concurrent_distributed_ops, num_workers - 1)
model_helper_obj.net.Proto().num_workers = num_workers
model_helper_obj.net.Proto().type = net_type
# Store some information in the model -- a bit ugly
model_helper_obj._devices = devices
model_helper_obj._rendezvous = rendezvous
model_helper_obj._sync_barrier_net = None
model_helper_obj._broadcast_context = None
model_helper_obj._grad_names = []
assert isinstance(model_helper_obj, model_helper.ModelHelper)
# Keep track of params that were in the model before: they are not
# data parallel, so we need to handle them separately
non_datapar_params = copy.copy(model_helper_obj.params)
# Add input and model
log.info("Create input and model training operators")
losses_by_gpu = {}
num_shards = 1 if rendezvous is None else rendezvous['num_shards']
loss_scale = 1.0 / (len(devices) * num_shards)
has_parameter_updates = param_update_builder_fun is not None or \
optimizer_builder_fun is not None
assert not (
param_update_builder_fun is not None and
optimizer_builder_fun is not None
), 'Can only specify one of param_update_builder_fun, optimizer_builder_fun'
# Check that a model that is used for validation/testing has
# init_params False, otherwise running the param init net will overwrite
# synchronized values by the training net
if not has_parameter_updates and model_helper_obj.init_params:
log.warning('')
log.warning("############# WARNING #############")
log.warning("Model {}/{} is used for testing/validation but".format(
model_helper_obj.name, model_helper_obj))
log.warning("has init_params=True!")
log.warning("This can conflict with model training.")
log.warning("Please ensure model = ModelHelper(init_params=False)")
log.warning('####################################')
log.warning('')
# TODO: make into assert
for device in devices:
device_opt = core.DeviceOption(model_helper_obj._device_type, device)
with core.DeviceScope(device_opt):
with core.NameScope("{}_{}".format(model_helper_obj._device_prefix,
device)):
log.info("Model for {} : {}".format(device_name, device))
input_builder_fun(model_helper_obj)
losses = forward_pass_builder_fun(model_helper_obj, loss_scale)
# Losses are not needed for test net
if has_parameter_updates:
assert isinstance(losses, list), \
'Model builder function must return list of loss blobs'
for loss in losses:
assert isinstance(loss, core.BlobReference), \
'Model builder func must return list of loss blobs'
losses_by_gpu[device] = losses
_ValidateParams(model_helper_obj.params)
# Create parameter map
model_helper_obj._device_grouped_blobs =\
_GroupByDevice(model_helper_obj, devices,
model_helper_obj.params, non_datapar_params)
# computed params
computed_params_grouped =\
_GroupByDevice(model_helper_obj, devices,
model_helper_obj.GetComputedParams(''), [])
model_helper_obj._device_grouped_blobs.update(computed_params_grouped)
model_helper_obj._param_names =\
list(viewkeys(model_helper_obj._device_grouped_blobs))
model_helper_obj._computed_param_names =\
list(viewkeys(computed_params_grouped))
if pre_grad_net_transformer_fun:
pre_grad_net_transformer_fun(model_helper_obj)
if has_parameter_updates:
log.info("Adding gradient operators")
_AddGradientOperators(devices, model_helper_obj, losses_by_gpu)
if net_transformer_fun:
net_transformer_fun(
model_helper_obj,
len(devices),
model_helper_obj._device_prefix,
model_helper_obj._device_type)
if not has_parameter_updates:
log.info("Parameter update function not defined --> only forward")
_InferBlobDevice(model_helper_obj)
return
if combine_spatial_bn:
assert(has_parameter_updates), \
'combine_spatial_bn should only be used for train model'
_InterleaveOps(model_helper_obj)
if cpu_device:
_CPUInterDeviceBatchNormalization(model_helper_obj)
else:
_GPUInterDeviceBatchNormalization(model_helper_obj)
_ValidateParams(model_helper_obj.params)
# Group gradients by device and register to blob lookup
param_to_grad = model_helper_obj.param_to_grad
grads_ordered = [param_to_grad[p] for p in
model_helper_obj.params if p in param_to_grad]
non_datapar_grads = [param_to_grad[p] for p in non_datapar_params]
gradients_grouped = _GroupByDevice(
model_helper_obj,
devices,
grads_ordered,
non_datapar_grads
)
model_helper_obj._device_grouped_blobs.update(gradients_grouped)
model_helper_obj._grad_names = list(viewkeys(gradients_grouped))
model_helper_obj._losses_by_gpu = losses_by_gpu
_InferBlobDevice(model_helper_obj)
log.info("Add gradient all-reduces for SyncSGD")
if broadcast_computed_params:
_BroadcastComputedParams(devices, model_helper_obj, rendezvous, use_nccl)
if len(model_helper_obj._grad_names) > 0:
# Gradients in reverse order
reverse_ordered_grads = _GetReverseOrderedGrads(model_helper_obj)
assert(len(reverse_ordered_grads) > 0)
_AllReduceBlobs(
reverse_ordered_grads,
devices,
model_helper_obj,
model_helper_obj.net,
rendezvous,
use_nccl,
max_concurrent_distributed_ops,
)
else:
log.info("NOTE: Param builder function did not create any parameters.")
log.info("Post-iteration operators for updating params")
num_shards = 1 if rendezvous is None else rendezvous['num_shards']
all_params = set(model_helper_obj.GetParams(''))
if shared_model:
_PruneParametersForSharing(model_helper_obj)
if param_update_builder_fun is not None:
for device in devices:
device_opt = core.DeviceOption(model_helper_obj._device_type, device)
with core.DeviceScope(device_opt):
with core.NameScope(
"{}_{}".format(model_helper_obj._device_prefix, device)
Loading ...