import hypothesis.strategies as st
import numpy as np
import numpy.testing as npt
from hypothesis import given, settings
import caffe2.python.hypothesis_test_util as hu
from caffe2.python import (
layer_model_instantiator,
core,
schema,
workspace,
)
from caffe2.python.layers.layers import (
AccessedFeatures,
almost_equal_schemas,
get_key,
IdList,
IdScoreList,
InstantiationContext,
is_request_only_scalar,
set_request_only,
)
from caffe2.python.layers.tags import Tags
from caffe2.python.layer_test_util import (
LayersTestCase,
OpSpec,
)
import logging
logger = logging.getLogger(__name__)
class TestLayers(LayersTestCase):
def testSparseDropoutWithReplacement(self):
input_record = schema.NewRecord(self.model.net, IdList)
self.model.output_schema = schema.Struct()
lengths_blob = input_record.field_blobs()[0]
values_blob = input_record.field_blobs()[1]
lengths = np.array([1] * 10).astype(np.int32)
values = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).astype(np.int64)
workspace.FeedBlob(lengths_blob, lengths)
workspace.FeedBlob(values_blob, values)
out = self.model.SparseDropoutWithReplacement(
input_record, 0.0, 0.5, 1.0, -1, output_names_or_num=1)
self.assertEqual(schema.List(schema.Scalar(np.int64,)), out)
train_init_net, train_net = self.get_training_nets()
eval_net = self.get_eval_net()
predict_net = self.get_predict_net()
workspace.RunNetOnce(train_init_net)
workspace.RunNetOnce(train_net)
out_values = workspace.FetchBlob(out.items())
out_lengths = workspace.FetchBlob(out.lengths())
self.assertBlobsEqual(out_values, values)
self.assertBlobsEqual(out_lengths, lengths)
workspace.RunNetOnce(eval_net)
workspace.RunNetOnce(predict_net)
predict_values = workspace.FetchBlob("values_auto_0")
predict_lengths = workspace.FetchBlob("lengths_auto_0")
self.assertBlobsEqual(predict_values, np.array([-1] * 10).astype(np.int64))
self.assertBlobsEqual(predict_lengths, lengths)
def testAddLoss(self):
input_record_LR = self.new_record(
schema.Struct(
('label', schema.Scalar((np.float64, (1, )))),
('logit', schema.Scalar((np.float32, (2, )))),
('weight', schema.Scalar((np.float64, (1, ))))
)
)
loss_LR = self.model.BatchLRLoss(input_record_LR)
self.model.add_loss(loss_LR)
assert 'unnamed' in self.model.loss
self.assertEqual(
schema.Scalar((np.float32, tuple())), self.model.loss.unnamed
)
self.assertEqual(loss_LR, self.model.loss.unnamed)
self.model.add_loss(loss_LR, 'addLoss')
assert 'addLoss' in self.model.loss
self.assertEqual(
schema.Scalar((np.float32, tuple())), self.model.loss.addLoss
)
self.assertEqual(loss_LR, self.model.loss.addLoss)
self.model.add_loss(
schema.Scalar(
dtype=np.float32, blob=core.BlobReference('loss_blob_1')
), 'addLoss'
)
assert 'addLoss_auto_0' in self.model.loss
self.assertEqual(
schema.Scalar((np.float32, tuple())), self.model.loss.addLoss_auto_0
)
assert core.BlobReference('loss_blob_1') in self.model.loss.field_blobs()
self.model.add_loss(
schema.Struct(
(
'structName', schema.Scalar(
dtype=np.float32,
blob=core.BlobReference('loss_blob_2')
)
)
), 'addLoss'
)
assert 'addLoss_auto_1' in self.model.loss
self.assertEqual(
schema.Struct(('structName', schema.Scalar((np.float32, tuple())))),
self.model.loss.addLoss_auto_1
)
assert core.BlobReference('loss_blob_2') in self.model.loss.field_blobs()
loss_in_tuple_0 = schema.Scalar(
dtype=np.float32, blob=core.BlobReference('loss_blob_in_tuple_0')
)
loss_in_tuple_1 = schema.Scalar(
dtype=np.float32, blob=core.BlobReference('loss_blob_in_tuple_1')
)
loss_tuple = schema.NamedTuple(
'loss_in_tuple', * [loss_in_tuple_0, loss_in_tuple_1]
)
self.model.add_loss(loss_tuple, 'addLoss')
assert 'addLoss_auto_2' in self.model.loss
self.assertEqual(
schema.Struct(
('loss_in_tuple_0', schema.Scalar((np.float32, tuple()))),
('loss_in_tuple_1', schema.Scalar((np.float32, tuple())))
), self.model.loss.addLoss_auto_2
)
assert core.BlobReference('loss_blob_in_tuple_0')\
in self.model.loss.field_blobs()
assert core.BlobReference('loss_blob_in_tuple_1')\
in self.model.loss.field_blobs()
def testFilterMetricSchema(self):
self.model.add_metric_field("a:b", schema.Scalar())
self.model.add_metric_field("a:c", schema.Scalar())
self.model.add_metric_field("d", schema.Scalar())
self.assertEqual(
self.model.metrics_schema,
schema.Struct(
("a", schema.Struct(
("b", schema.Scalar()),
("c", schema.Scalar()),
)),
("d", schema.Scalar()),
))
self.model.filter_metrics_schema({"a:b", "d"})
self.assertEqual(
self.model.metrics_schema,
schema.Struct(
("a", schema.Struct(
("b", schema.Scalar()),
)),
("d", schema.Scalar()),
))
def testAddOutputSchema(self):
# add the first field
self.model.add_output_schema('struct', schema.Struct())
expected_output_schema = schema.Struct(('struct', schema.Struct()))
self.assertEqual(
self.model.output_schema,
expected_output_schema,
)
# add the second field
self.model.add_output_schema('scalar', schema.Scalar(np.float64))
expected_output_schema = schema.Struct(
('struct', schema.Struct()),
('scalar', schema.Scalar(np.float64)),
)
self.assertEqual(
self.model.output_schema,
expected_output_schema,
)
# overwrite a field should raise
with self.assertRaises(AssertionError):
self.model.add_output_schema('scalar', schema.Struct())
def _test_net(self, net, ops_list):
'''
Helper function to assert the net contains some set of operations and
then to run the net.
Inputs:
net -- the network to test and run
ops_list -- the list of operation specifications to check for
in the net
'''
ops_output = self.assertNetContainOps(net, ops_list)
workspace.RunNetOnce(net)
return ops_output
def testFCWithoutBias(self):
output_dims = 2
fc_without_bias = self.model.FCWithoutBias(
self.model.input_feature_schema.float_features, output_dims)
self.model.output_schema = fc_without_bias
self.assertEqual(
schema.Scalar((np.float32, (output_dims, ))),
fc_without_bias
)
train_init_net, train_net = self.get_training_nets()
init_ops = self.assertNetContainOps(
train_init_net,
[
OpSpec("UniformFill", None, None),
]
)
mat_mul_spec = OpSpec(
"MatMul",
[
self.model.input_feature_schema.float_features(),
init_ops[0].output[0],
],
fc_without_bias.field_blobs()
)
self.assertNetContainOps(train_net, [mat_mul_spec])
predict_net = self.get_predict_net()
self.assertNetContainOps(predict_net, [mat_mul_spec])
def testFCWithBootstrap(self):
output_dims = 1
fc_with_bootstrap = self.model.FCWithBootstrap(
self.model.input_feature_schema.float_features,
output_dims=output_dims,
num_bootstrap=2,
max_fc_size=-1
)
self.model.output_schema = fc_with_bootstrap
self.assertEqual(len(fc_with_bootstrap), 4)
# must be in this order
assert (
core.BlobReference("fc_with_bootstrap/bootstrap_iteration_0/indices") == fc_with_bootstrap[0].field_blobs()[0]
)
assert (
core.BlobReference("fc_with_bootstrap/bootstrap_iteration_0/preds") == fc_with_bootstrap[1].field_blobs()[0]
)
assert (
core.BlobReference("fc_with_bootstrap/bootstrap_iteration_1/indices") == fc_with_bootstrap[2].field_blobs()[0]
)
assert (
core.BlobReference("fc_with_bootstrap/bootstrap_iteration_1/preds") == fc_with_bootstrap[3].field_blobs()[0]
)
train_init_net, train_net = self.get_training_nets()
predict_net = layer_model_instantiator.generate_predict_net(self.model)
train_proto = train_net.Proto()
eval_proto = predict_net.Proto()
train_ops = train_proto.op
eval_ops = eval_proto.op
master_train_ops = [
"Shape",
"GivenTensorInt64Fill",
"Gather",
"GivenTensorIntFill",
"GivenTensorIntFill",
"Cast",
"Sub",
"UniformIntFill",
"Gather",
"FC",
"UniformIntFill",
"Gather",
"FC",
]
master_eval_ops = [
"Shape",
"GivenTensorInt64Fill",
"Gather",
"GivenTensorIntFill",
"GivenTensorIntFill",
"Cast",
"Sub",
"UniformIntFill",
"FC",
"UniformIntFill",
"FC",
]
assert len(train_ops) == len(master_train_ops)
assert len(eval_ops) == len(master_eval_ops)
assert train_proto.external_input == eval_proto.external_input
assert train_proto.external_output == list()
# make sure all the ops are present and unchanged for train_net and eval_net
for idx, op in enumerate(master_train_ops):
assert train_ops[idx].type == op
for idx, op in enumerate(master_eval_ops):
assert eval_ops[idx].type == op
def testFCwithAxis2(self):
input_dim = 10
output_dim = 30
max_length = 20
input_record = self.new_record(
schema.Struct(
('history_sequence', schema.Scalar((np.float32, (max_length,
input_dim)))),
)
)
fc_out = self.model.FC(
input_record.history_sequence, output_dim,
axis=2)
self.model.output_schema = fc_out
self.assertEqual(
schema.Scalar((np.float32, (max_length, output_dim))),
fc_out
)
train_init_net, train_net = self.get_training_nets()
Loading ...