from caffe2.python import schema
from caffe2.python.layers.layers import ModelLayer
import numpy as np
class ArcCosineFeatureMap(ModelLayer):
"""
A general version of the arc-cosine kernel feature map (s = 1 restores
the original arc-cosine kernel feature map).
Applies H(x) * x^s, where H is the Heaviside step function and x is the
input after applying FC (such that x = w * x_orig + b).
For more information, see the original paper:
http://cseweb.ucsd.edu/~saul/papers/nips09_kernel.pdf
Inputs :
output_dims -- dimensions of the output vector
s -- degree to raise transformed features
scale -- amount to scale the standard deviation
weight_init -- initialization distribution for weight parameter
bias_init -- initialization distribution for bias pararmeter
weight_optim -- optimizer for weight params; None for random features
bias_optim -- optimizer for bias param; None for random features
set_weight_as_global_constant -- if True, initialized random parameters
will be constant across all distributed
instances of the layer
initialize_output_schema -- if True, initialize output schema as Scalar
from Arc Cosine; else output schema is None
"""
def __init__(
self,
model,
input_record,
output_dims,
s=1,
scale=1.0,
weight_init=None,
bias_init=None,
weight_optim=None,
bias_optim=None,
set_weight_as_global_constant=False,
initialize_output_schema=True,
name='arc_cosine_feature_map',
**kwargs):
super(ArcCosineFeatureMap, self).__init__(model, name, input_record,
**kwargs)
assert isinstance(input_record, schema.Scalar), "Incorrect input type"
self.params = []
self.model = model
self.set_weight_as_global_constant = set_weight_as_global_constant
self.input_dims = input_record.field_type().shape[0]
assert self.input_dims >= 1, "Expected input dimensions >= 1, got %s" \
% self.input_dims
if initialize_output_schema:
self.output_schema = schema.Scalar(
(np.float32, (output_dims, )),
model.net.NextScopedBlob(name + '_output')
)
self.output_dims = output_dims
assert self.output_dims >= 1, "Expected output dimensions >= 1, got %s" \
% self.output_dims
self.s = s
assert (self.s >= 0), "Expected s >= 0, got %s" % self.s
assert isinstance(self.s, int), "Expected s to be type int, got type %s" \
% type(self.s)
assert (scale > 0.0), "Expected scale > 0, got %s" % scale
self.stddev = scale * np.sqrt(1.0 / self.input_dims)
# Initialize train_init_net parameters
# Random Parameters
if set_weight_as_global_constant:
w_init = np.random.normal(scale=self.stddev,
size=(self.output_dims, self.input_dims))
b_init = np.random.uniform(low=-0.5 * self.stddev,
high=0.5 * self.stddev,
size=self.output_dims)
self.random_w = self.model.add_global_constant(
name=self.name + "_fixed_rand_W",
array=w_init
)
self.random_b = self.model.add_global_constant(
name=self.name + "_fixed_rand_b",
array=b_init
)
else:
(self.random_w, self.random_b) = self._initialize_params(
'random_w',
'random_b',
w_init=weight_init,
b_init=bias_init,
w_optim=weight_optim,
b_optim=bias_optim
)
def _initialize_params(self, w_name, b_name, w_init=None, b_init=None,
w_optim=None, b_optim=None):
"""
Initializes the Layer Parameters for weight and bias terms for features
Inputs :
w_blob -- blob to contain w values
b_blob -- blob to contain b values
w_init -- initialization distribution for weight parameter
b_init -- initialization distribution for bias parameter
w_optim -- optimizer to use for w; if None, then will use no optimizer
b_optim -- optimizer to user for b; if None, then will use no optimizer
"""
w_init = w_init if w_init else (
'GaussianFill', {'mean': 0.0, 'std': self.stddev}
)
w_optim = w_optim if w_optim else self.model.NoOptim
b_init = b_init if b_init else (
'UniformFill', {'min': -0.5 * self.stddev, 'max': 0.5 * self.stddev}
)
b_optim = b_optim if b_optim else self.model.NoOptim
w_param = self.create_param(param_name=w_name,
shape=(self.output_dims, self.input_dims),
initializer=w_init,
optimizer=w_optim)
b_param = self.create_param(param_name=b_name,
shape=[self.output_dims],
initializer=b_init,
optimizer=b_optim)
return [w_param, b_param]
def _heaviside_with_power(self, net, input_features, output_blob, s):
"""
Applies Heaviside step function and Relu / exponentiation to features
depending on the value of s.
Inputs:
net -- net with operators
input_features -- features to processes
output_blob -- output blob reference
s -- degree to raise the transformed features
"""
if s == 0:
softsign_features = net.Softsign([input_features],
net.NextScopedBlob('softsign'))
return net.Relu(softsign_features, output_blob)
elif s == 1:
return net.Relu([input_features],
output_blob)
else:
relu_features = net.Relu([input_features],
net.NextScopedBlob('relu_rand'))
pow_features = net.Pow([input_features],
net.NextScopedBlob('pow_rand'),
exponent=float(s - 1))
return net.Mul([relu_features, pow_features],
output_blob)
def add_ops(self, net):
input_blob = self.input_record.field_blobs()
# Random features: wx + b
random_features = net.FC(input_blob + [self.random_w, self.random_b],
net.NextScopedBlob('random_features'))
# Process random features
self._heaviside_with_power(net,
random_features,
self.output_schema.field_blobs(),
self.s)