# @package optimizer
# Module caffe2.python.optimizer
import copy
import logging
from collections import defaultdict, namedtuple
import numpy as np
from caffe2.proto import caffe2_pb2
from caffe2.python import core, scope, utils, workspace
from caffe2.python.modeling import parameter_info
from past.builtins import basestring
_LEARNING_RATE_INJECTION = "lr_injection"
AuxOptimizerParams = namedtuple("AuxOptimizerParams", ["local", "shared"])
_optimizer_instance_count = defaultdict(int)
FP16_ENGINES = ["SIMD_Q_FP16", "SIMD_Q_STOC_FP16", "SIMD_Q_STOC_MKL_FP16"]
logger = logging.getLogger(__name__)
class Optimizer(object):
def __init__(self):
self._aux_params = AuxOptimizerParams(local=[], shared=[])
self._instance_num = _optimizer_instance_count[self.__class__.__name__]
_optimizer_instance_count[self.__class__.__name__] += 1
self._lr_multiplier = None
self._local_lr_multiplier = None
self._local_lr_multiplier_on_gpu = False
"""
Adds optimization operators to the net for given parameter and its gradient
Parameter is specified by either 'param' being a ParameterInfo object.
In this case param.grad has to be set
Or by 'param' being a BlobReference and 'grad' being a BlobReference for its
gradient.
"""
def __call__(self, net, param_init_net, param, grad=None):
if grad is None:
assert isinstance(
param, parameter_info.ParameterInfo
), "Expected parameter to be of type ParameterInfo, got {}".format(param)
assert param.grad is not None
else:
if isinstance(param, basestring):
param = core.BlobReference(param)
param = parameter_info.ParameterInfo(param_id=None, param=param, grad=grad)
self._run(net, param_init_net, param)
def _run(self, net, param_init_net, param_info):
raise Exception("Not Implemented")
def get_cpu_blob_name(self, base_str, node_name=""):
classname = self.__class__.__name__
return "%s_%d_%s%s_cpu" % (classname, self._instance_num, base_str, node_name)
def get_gpu_blob_name(self, base_str, gpu_id, node_name):
classname = self.__class__.__name__
return "%s_%d_%s%s_gpu%d" % (
classname,
self._instance_num,
base_str,
node_name,
gpu_id,
)
@property
def attributes(self):
# return a dict that contains attributes related to init args only
attr = copy.deepcopy(self.__dict__)
del attr["_instance_num"]
return attr
def make_unique_blob_name(self, base_str):
"""
Returns a blob name that will be unique to the current device
and optimizer instance.
"""
current_scope = scope.CurrentDeviceScope()
if current_scope is None:
return self.get_cpu_blob_name(base_str)
if core.IsGPUDeviceType(current_scope.device_type):
return self.get_gpu_blob_name(
base_str, current_scope.device_id, current_scope.node_name
)
else:
return self.get_cpu_blob_name(base_str, current_scope.node_name)
def build_lr(
self,
net,
param_init_net,
base_learning_rate,
learning_rate_blob=None,
policy="fixed",
iter_val=0,
**kwargs
):
if learning_rate_blob is None:
learning_rate_blob = self.make_unique_blob_name("lr")
iteration = utils.BuildUniqueMutexIter(param_init_net, net, iter_val=iter_val)
if not net.BlobIsDefined(learning_rate_blob):
# There is one interesting thing here: since we are minimizing, we are
# doing "descent" so the learning rate is set to be negative.
lr = net.LearningRate(
[iteration],
learning_rate_blob,
base_lr=-base_learning_rate,
policy=policy,
**kwargs
)
else:
lr = net.GetBlobRef(learning_rate_blob)
if self._lr_multiplier is not None:
lr_multiplier = net.CopyFromCPUInput(
self._lr_multiplier, self.make_unique_blob_name("lr_multiplier")
)
lr = net.Mul(
[lr, lr_multiplier],
self.make_unique_blob_name("scaled_lr"),
broadcast=1,
)
if self._local_lr_multiplier is not None:
current_scope = scope.CurrentDeviceScope()
if (
current_scope is not None
and core.IsGPUDeviceType(current_scope.device_type)
and not self._local_lr_multiplier_on_gpu
):
local_lr_multiplier = net.CopyFromCPUInput(
self._local_lr_multiplier,
self.make_unique_blob_name("local_lr_multiplier"),
)
else:
local_lr_multiplier = self._local_lr_multiplier
lr = net.Mul(
[lr, local_lr_multiplier],
self.make_unique_blob_name("local_scaled_lr"),
broadcast=1,
)
return lr, iteration
def add_lr_multiplier(self, lr_multiplier):
"""
Set the global learning rate multiplier. If a multiplier already
existed, this will overwrite the existing multiplier. The multiplier is
used for all future calls to _run(), unless it is overwritten.
"""
self._lr_multiplier = lr_multiplier
def _add_local_lr_multiplier(self, local_lr_multiplier, is_gpu_blob=False):
"""
Set the local learning rate multiplier. This local multiplier is
multiplied with the global learning rate multiplier if it exists. As
with the global learning rate multiplier, this multiplier will be
used for all future calls to _run(), so please call
_clear_local_lr_multiplier() at the beginning of the optimizer's _run()
before optionally calling this function.
"""
self._local_lr_multiplier = local_lr_multiplier
self._local_lr_multiplier_on_gpu = is_gpu_blob
def _clear_local_lr_multiplier(self):
self._local_lr_multiplier = None
self._local_lr_multiplier_on_gpu = False
@staticmethod
def dedup(net, sparse_dedup_aggregator, grad):
assert isinstance(
grad, core.GradientSlice
), "Dedup only works for sparse gradient, got {}".format(grad)
if sparse_dedup_aggregator:
return net.DeduplicateGradientSlices(
grad, aggregator=sparse_dedup_aggregator
)
else:
return grad
def get_auxiliary_parameters(self):
"""Returns a list of auxiliary parameters.
Returns:
aux_params: A namedtuple, AuxParams.
aux_params.local stores a list of blobs. Each blob is a local
auxiliary parameter. A local auxiliary parameter is a parameter in
parallel to a learning rate parameter. Take adagrad as an example,
the local auxiliary parameter is the squared sum parameter, because
every learning rate has a squared sum associated with it.
aux_params.shared also stores a list of blobs. Each blob is a shared
auxiliary parameter. A shared auxiliary parameter is a parameter
that is shared across all the learning rate parameters. Take adam as
an example, the iteration parameter is a shared parameter, because
all the learning rates share the same iteration parameter.
"""
return self._aux_params
# TODO(xlwang): In transfer learning, parameter initialized from pretrained
# model might require a different learning rate than otherwise initialized.
# To this end, here we implement a python solution where
# `base_learning_rate` is scaled by `scale`, by calling
# `scale_learning_rate`; Alternatively, we can achieve same effect by
# rewriting the LearningRate operator in C++
# Note that it is the responsibility of specific optimizer to decide what
# logic should be used for `scale_learning_rate`
def scale_learning_rate(self, *args, **kwargs):
raise NotImplementedError(
"Optimizer Need to Implement `scale_learning_rate` method."
)
def create_lars_inputs(self, param_init_net, weight_decay, trust, lr_max):
wd = param_init_net.ConstantFill(
[], "weight_decay", shape=[1], value=weight_decay
)
trust = param_init_net.ConstantFill([], "trust", shape=[1], value=trust)
lr_max = param_init_net.ConstantFill([], "lr_max", shape=[1], value=lr_max)
return wd, trust, lr_max
class SgdOptimizer(Optimizer):
def __init__(
self,
base_learning_rate=0.01,
policy="fixed",
momentum=0.0,
nesterov=True,
sparse_dedup_aggregator=None,
lars=None,
**kwargs
):
super(SgdOptimizer, self).__init__()
self.base_learning_rate = base_learning_rate
self.policy = policy
self.momentum = momentum
self.nesterov = nesterov
self.sparse_dedup_aggregator = sparse_dedup_aggregator
self.lars = lars
self.init_kwargs = kwargs
def _run(self, net, param_init_net, param_info):
param = param_info.blob
grad = param_info.grad
if self.base_learning_rate == 0:
return
assert (
self.base_learning_rate > 0
), "Expect positive base learning rate, got {}".format(self.base_learning_rate)
self._clear_local_lr_multiplier()
# TODO(zqq): support LARS for sparse parameters
if self.lars is not None and not isinstance(grad, core.GradientSlice):
assert self.lars >= 0, "Lars offset must be nonnegative, got {}".format(
self.lars
)
wd, trust, lr_max = self.create_lars_inputs(
param_init_net, 0.0, 1.0, np.finfo(np.float32).max
)
lr_lars_multiplier = net.Lars(
[param, grad, wd, trust, lr_max],
self.make_unique_blob_name(str(param) + "_lars"),
offset=self.lars,
lr_min=0.0,
)
current_scope = scope.CurrentDeviceScope()
self._add_local_lr_multiplier(
lr_lars_multiplier,
is_gpu_blob=(
current_scope is not None
and core.IsGPUDeviceType(current_scope.device_type)
),
)
# We need negative sign for LR when used directly with WeightedSum
# below.
lr_sign = -1 if self.momentum else 1
lr, _ = self.build_lr(
net,
param_init_net,
base_learning_rate=self.base_learning_rate * lr_sign,
policy=self.policy,
**(self.init_kwargs)
)
dev = scope.CurrentDeviceScope()
if dev is None:
dev = core.DeviceOption(caffe2_pb2.CPU)
# Each GPU/CPU must have its own ONE blob, thus modify the name
# to include device information.
ONE = param_init_net.ConstantFill(
[],
"ONE_{}_{}{}".format(dev.device_type, dev.device_id, dev.node_name),
shape=[1],
value=1.0,
)
self._aux_params.shared.append(ONE)
if self.momentum > 0:
momentum_data = param_init_net.ConstantFill(
param, str(param) + "_momentum", value=0.0
)
self._aux_params.local.append(momentum_data)
if isinstance(grad, core.GradientSlice):
grad = self.dedup(net, self.sparse_dedup_aggregator, grad)
if self.momentum > 0.0:
net.SparseMomentumSGDUpdate(
[grad.values, momentum_data, lr, param, grad.indices],
[grad.values, momentum_data, param],
momentum=self.momentum,
nesterov=self.nesterov,
)
else:
net.ScatterWeightedSum(
[param, ONE, grad.indices, grad.values, lr], param
)
else:
if self.momentum > 0.0:
net.MomentumSGDUpdate(
[grad, momentum_data, lr, param],
[grad, momentum_data, param],
momentum=self.momentum,
nesterov=self.nesterov,
)
else:
coeff = lr
Loading ...