Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

/ python / operator_test / crf_test.py





from caffe2.python import workspace, crf, brew
from caffe2.python.model_helper import ModelHelper
import numpy as np
from scipy.special import logsumexp
import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
from hypothesis import given, settings


class TestCRFOp(hu.HypothesisTestCase):

    @given(num_tags=st.integers(2, 4),
           num_words=st.integers(2, 15))
    @settings(deadline=1000)
    def test_crf_with_loss_op(self, num_tags, num_words):
        model = ModelHelper(name='external')
        embeddings_dim = 200
        embeddings = np.random.randn(num_words, embeddings_dim).astype(np.float32)
        transitions = np.random.uniform(
            low=-1, high=1, size=(num_tags + 2, num_tags + 2)
        ).astype(np.float32)
        labels = np.random.randint(num_tags, size=(num_words)).astype(np.int64)
        embeddings_blob, labels_blob, transitions_blob = (
            model.net.AddExternalInputs(
                'embeddings_blob',
                'labels_blob',
                'crf_transitions')
        )
        workspace.FeedBlob(str(embeddings_blob), embeddings)
        workspace.FeedBlob(str(labels_blob), labels)
        workspace.FeedBlob(str(transitions_blob), transitions)
        predictions_blob = brew.fc(
            model,
            embeddings_blob, "fc_0",
            embeddings_dim, num_tags,
            ('UniformFill', {'min': -1.0}, {'max': 1.0}),
            ('UniformFill', {'min': -1.0}, {'max': 1.0})
        )
        crf_layer = crf.CRFWithLoss(model, num_tags, transitions_blob)
        crf_loss = crf_layer.crf_loss(predictions_blob, labels_blob)
        model.net.AddGradientOperators([crf_loss])
        workspace.RunNetOnce(model.param_init_net)
        workspace.RunNetOnce(model.net)
        loss = workspace.FetchBlob(str(crf_loss))
        predictions = workspace.FetchBlob(str(predictions_blob))
        np.testing.assert_allclose(
            loss,
            self._compute_loss_manual(
                predictions, num_tags, labels, transitions
            ),
            atol=0.001,
            rtol=0.001,
            err_msg='CRF LOSS is not matching the reference'
        )

    @given(num_tags=st.integers(1, 4),
           num_words=st.integers(2, 4))
    @settings(deadline=10000)
    def test_crf_gradient(self, num_tags, num_words):
        base_model = ModelHelper(name='base_model')
        transitions = np.random.randn(
            num_tags + 2, num_tags + 2
        ).astype(np.float32)
        predictions = np.random.randn(num_words, 1, num_tags + 2).astype(np.float32)
        initial = np.random.randn(1, num_tags + 2).astype(np.float32)
        predictions_blob, transitions_blob, initial_blob = (
            base_model.net.AddExternalInputs(
                'predictions_blob', 'crf_transitions', 'inital_blob'
            )
        )

        workspace.FeedBlob(str(predictions_blob), predictions)
        workspace.FeedBlob(str(transitions_blob), transitions)
        workspace.FeedBlob(str(initial_blob), initial)

        crf_layer = crf.CRFWithLoss(base_model, num_tags, transitions_blob)
        crf_layer.build_crf_net(
            predictions_blob, initial_blob, transitions_blob
        )
        op = base_model.net._net.op[-1]
        workspace.RunNetOnce(base_model.param_init_net)
        gradients_to_check = (
            index for (index, input_name) in enumerate(op.input)
            if input_name != "crf_net/zero_segment_id"
        )

        inputs = [workspace.FetchBlob(name) for name in op.input]
        for param in gradients_to_check:
            self.assertGradientChecks(
                device_option=hu.cpu_do,
                op=op,
                inputs=inputs,
                outputs_to_check=param,
                outputs_with_grads=[1],
                threshold=0.05,
                stepsize=0.001,
            )

    def _compute_loss_manual(self, predictions, num_tags, labels, transitions):
        low_score = -1000
        b_s = np.array(
            [[low_score] * num_tags + [0, low_score]]
        ).astype(np.float32)
        e_s = np.array(
            [[low_score] * num_tags + [low_score, 0]]
        ).astype(np.float32)
        predictions = np.concatenate(
            [predictions, low_score * np.ones((predictions.shape[0], 2))],
            axis=1
        )
        predictions = np.concatenate(
            [b_s, predictions, e_s],
            axis=0
        )
        b_id = np.array([num_tags], dtype=np.int32)
        e_id = np.array([num_tags + 1], dtype=np.int32)
        labels = np.concatenate(
            [b_id, labels, e_id],
            axis=0
        )
        curr_state = predictions[0]
        input_states = predictions[1:]

        for input_state in input_states:
            prev = np.expand_dims(curr_state, axis=1)
            curr_input = np.expand_dims(input_state, axis=0)
            curr_state = logsumexp(prev + curr_input + transitions, axis=0)

        total_score = logsumexp(curr_state, axis=0)
        # Compute best path score
        unary_scores = sum(w[labels[i]] for i, w in enumerate(predictions))
        binary_scores = sum(
            transitions[a][b] for a, b in zip(labels[:-1], labels[1:])
        )
        loss = total_score - (binary_scores + unary_scores)
        return loss