## @package seq2seq_util
# Module caffe2.python.examples.seq2seq_util
""" A bunch of util functions to build Seq2Seq models with Caffe2."""
import collections
from future.utils import viewitems
import caffe2.proto.caffe2_pb2 as caffe2_pb2
from caffe2.python import attention, core, rnn_cell, brew
PAD_ID = 0
PAD = '<PAD>'
GO_ID = 1
GO = '<GO>'
EOS_ID = 2
EOS = '<EOS>'
UNK_ID = 3
UNK = '<UNK>'
def gen_vocab(corpus, unk_threshold):
vocab = collections.defaultdict(lambda: len(vocab))
freqs = collections.defaultdict(lambda: 0)
# Adding padding tokens to the vocabulary to maintain consistency with IDs
vocab[PAD]
vocab[GO]
vocab[EOS]
vocab[UNK]
with open(corpus) as f:
for sentence in f:
tokens = sentence.strip().split()
for token in tokens:
freqs[token] += 1
for token, freq in viewitems(freqs):
if freq > unk_threshold:
vocab[token]
return vocab
def get_numberized_sentence(sentence, vocab):
numerized_sentence = []
for token in sentence.strip().split():
if token in vocab:
numerized_sentence.append(vocab[token])
else:
numerized_sentence.append(vocab[UNK])
return numerized_sentence
def rnn_unidirectional_layer(
model,
inputs,
input_lengths,
input_size,
num_units,
dropout_keep_prob,
forward_only,
return_sequence_output,
return_final_state,
scope=None,
):
""" Unidirectional LSTM encoder."""
with core.NameScope(scope):
initial_cell_state = model.param_init_net.ConstantFill(
[],
'initial_cell_state',
shape=[num_units],
value=0.0,
)
initial_hidden_state = model.param_init_net.ConstantFill(
[],
'initial_hidden_state',
shape=[num_units],
value=0.0,
)
cell = rnn_cell.LSTMCell(
input_size=input_size,
hidden_size=num_units,
forget_bias=0.0,
memory_optimization=False,
name=(scope + '/' if scope else '') + 'lstm',
forward_only=forward_only,
)
dropout_ratio = (
None if dropout_keep_prob is None else (1.0 - dropout_keep_prob)
)
if dropout_ratio is not None:
cell = rnn_cell.DropoutCell(
internal_cell=cell,
dropout_ratio=dropout_ratio,
name=(scope + '/' if scope else '') + 'dropout',
forward_only=forward_only,
is_test=False,
)
outputs_with_grads = []
if return_sequence_output:
outputs_with_grads.append(0)
if return_final_state:
outputs_with_grads.extend([1, 3])
outputs, (_, final_hidden_state, _, final_cell_state) = (
cell.apply_over_sequence(
model=model,
inputs=inputs,
seq_lengths=input_lengths,
initial_states=(initial_hidden_state, initial_cell_state),
outputs_with_grads=outputs_with_grads,
)
)
return outputs, final_hidden_state, final_cell_state
def rnn_bidirectional_layer(
model,
inputs,
input_lengths,
input_size,
num_units,
dropout_keep_prob,
forward_only,
return_sequence_output,
return_final_state,
scope=None,
):
outputs_fw, final_hidden_fw, final_cell_fw = rnn_unidirectional_layer(
model,
inputs,
input_lengths,
input_size,
num_units,
dropout_keep_prob,
forward_only,
return_sequence_output,
return_final_state,
scope=(scope + '/' if scope else '') + 'fw',
)
with core.NameScope(scope):
reversed_inputs = model.net.ReversePackedSegs(
[inputs, input_lengths],
['reversed_inputs'],
)
outputs_bw, final_hidden_bw, final_cell_bw = rnn_unidirectional_layer(
model,
reversed_inputs,
input_lengths,
input_size,
num_units,
dropout_keep_prob,
forward_only,
return_sequence_output,
return_final_state,
scope=(scope + '/' if scope else '') + 'bw',
)
with core.NameScope(scope):
outputs_bw = model.net.ReversePackedSegs(
[outputs_bw, input_lengths],
['outputs_bw'],
)
# Concatenate forward and backward results
if return_sequence_output:
with core.NameScope(scope):
outputs, _ = model.net.Concat(
[outputs_fw, outputs_bw],
['outputs', 'outputs_dim'],
axis=2,
)
else:
outputs = None
if return_final_state:
with core.NameScope(scope):
final_hidden_state, _ = model.net.Concat(
[final_hidden_fw, final_hidden_bw],
['final_hidden_state', 'final_hidden_state_dim'],
axis=2,
)
final_cell_state, _ = model.net.Concat(
[final_cell_fw, final_cell_bw],
['final_cell_state', 'final_cell_state_dim'],
axis=2,
)
else:
final_hidden_state = None
final_cell_state = None
return outputs, final_hidden_state, final_cell_state
def build_embeddings(
model,
vocab_size,
embedding_size,
name,
freeze_embeddings,
):
embeddings = model.param_init_net.GaussianFill(
[],
name,
shape=[vocab_size, embedding_size],
std=0.1,
)
if not freeze_embeddings:
model.params.append(embeddings)
return embeddings
def get_layer_scope(scope, layer_type, i):
prefix = (scope + '/' if scope else '') + layer_type
return '{}/layer{}'.format(prefix, i)
def build_embedding_encoder(
model,
encoder_params,
num_decoder_layers,
inputs,
input_lengths,
vocab_size,
embeddings,
embedding_size,
use_attention,
num_gpus=0,
forward_only=False,
scope=None,
):
with core.NameScope(scope or ''):
if num_gpus == 0:
embedded_encoder_inputs = model.net.Gather(
[embeddings, inputs],
['embedded_encoder_inputs'],
)
else:
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CPU)):
embedded_encoder_inputs_cpu = model.net.Gather(
[embeddings, inputs],
['embedded_encoder_inputs_cpu'],
)
embedded_encoder_inputs = model.CopyCPUToGPU(
embedded_encoder_inputs_cpu,
'embedded_encoder_inputs',
)
layer_inputs = embedded_encoder_inputs
layer_input_size = embedding_size
encoder_units_per_layer = []
final_encoder_hidden_states = []
final_encoder_cell_states = []
num_encoder_layers = len(encoder_params['encoder_layer_configs'])
use_bidirectional_encoder = encoder_params.get(
'use_bidirectional_encoder',
False,
)
for i, layer_config in enumerate(encoder_params['encoder_layer_configs']):
if use_bidirectional_encoder and i == 0:
layer_func = rnn_bidirectional_layer
output_dims = 2 * layer_config['num_units']
else:
layer_func = rnn_unidirectional_layer
output_dims = layer_config['num_units']
encoder_units_per_layer.append(output_dims)
is_final_layer = (i == num_encoder_layers - 1)
dropout_keep_prob = layer_config.get(
'dropout_keep_prob',
None,
)
return_final_state = i >= (num_encoder_layers - num_decoder_layers)
(
layer_outputs,
final_layer_hidden_state,
final_layer_cell_state,
) = layer_func(
model=model,
inputs=layer_inputs,
input_lengths=input_lengths,
input_size=layer_input_size,
num_units=layer_config['num_units'],
dropout_keep_prob=dropout_keep_prob,
forward_only=forward_only,
return_sequence_output=(not is_final_layer) or use_attention,
return_final_state=return_final_state,
scope=get_layer_scope(scope, 'encoder', i),
)
if not is_final_layer:
layer_inputs = layer_outputs
layer_input_size = output_dims
final_encoder_hidden_states.append(final_layer_hidden_state)
final_encoder_cell_states.append(final_layer_cell_state)
encoder_outputs = layer_outputs
weighted_encoder_outputs = None
return (
encoder_outputs,
weighted_encoder_outputs,
final_encoder_hidden_states,
final_encoder_cell_states,
encoder_units_per_layer,
)
class LSTMWithAttentionDecoder(object):
def scope(self, name):
return self.name + '/' + name if self.name is not None else name
def _get_attention_type(self, attention_type_as_string):
if attention_type_as_string == 'regular':
return attention.AttentionType.Regular
elif attention_type_as_string == 'recurrent':
return attention.AttentionType.Recurrent
else:
assert False, 'Unknown type ' + attention_type_as_string
def __init__(
self,
encoder_outputs,
encoder_output_dim,
encoder_lengths,
vocab_size,
attention_type,
embedding_size,
decoder_num_units,
decoder_cells,
residual_output_layers=None,
name=None,
weighted_encoder_outputs=None,
Loading ...