Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

Version: 1.8.0 

/ python / regularizer_test.py



import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
import numpy.testing as npt
from caffe2.python import core, layer_model_instantiator, regularizer, schema, workspace
from caffe2.python.layer_test_util import LayersTestCase
from caffe2.python.optimizer import SgdOptimizer
from caffe2.python.regularizer import L1Norm, RegularizationBy
from caffe2.python.regularizer_context import RegularizerContext, UseRegularizer
from hypothesis import given


class TestRegularizerContext(LayersTestCase):
    @given(X=hu.arrays(dims=[2, 5]))
    def test_regularizer_context(self, X):
        weight_reg_out = L1Norm(0.2)
        bias_reg_out = L1Norm(0)
        regularizers = {"WEIGHT": weight_reg_out, "BIAS": bias_reg_out}

        output_dims = 2
        input_record = self.new_record(schema.Scalar((np.float32, (5,))))
        schema.FeedRecord(input_record, [X])

        with UseRegularizer(regularizers):
            weight_reg = RegularizerContext.current().get_regularizer("WEIGHT")
            bias_reg = RegularizerContext.current().get_regularizer("BIAS")
            optim = SgdOptimizer(0.15)

            assert (
                weight_reg == weight_reg_out
            ), "fail to get correct weight reg from context"
            assert bias_reg == bias_reg_out, "fail to get correct bias reg from context"
            fc_output = self.model.FC(
                input_record,
                output_dims,
                weight_optim=optim,
                bias_optim=optim,
                weight_reg=weight_reg,
                bias_reg=bias_reg,
            )
            # model.output_schema has to a struct
            self.model.output_schema = schema.Struct(("fc_output", fc_output))

            self.assertEqual(schema.Scalar((np.float32, (output_dims,))), fc_output)

            _, train_net = layer_model_instantiator.generate_training_nets(self.model)
            ops = train_net.Proto().op
            ops_type_list = [ops[i].type for i in range(len(ops))]
            assert ops_type_list.count("LpNorm") == 2
            assert ops_type_list.count("Scale") == 4
            assert ops_type_list.count("LpNormGradient") == 2


class TestRegularizer(LayersTestCase):
    @given(X=hu.arrays(dims=[2, 5], elements=hu.floats(min_value=-1.0, max_value=1.0)))
    def test_log_barrier(self, X):
        param = core.BlobReference("X")
        workspace.FeedBlob(param, X)
        train_init_net, train_net = self.get_training_nets()
        reg = regularizer.LogBarrier(1.0)
        output = reg(train_net, train_init_net, param, by=RegularizationBy.ON_LOSS)
        reg(
            train_net,
            train_init_net,
            param,
            grad=None,
            by=RegularizationBy.AFTER_OPTIMIZER,
        )
        workspace.RunNetOnce(train_init_net)
        workspace.RunNetOnce(train_net)

        def ref(X):
            return (
                np.array(np.sum(-np.log(np.clip(X, 1e-9, None))) * 0.5).astype(
                    np.float32
                ),
                np.clip(X, 1e-9, None),
            )

        for x, y in zip(workspace.FetchBlobs([output, param]), ref(X)):
            npt.assert_allclose(x, y, rtol=1e-3)

    @given(
        X=hu.arrays(dims=[2, 5], elements=hu.floats(min_value=-1.0, max_value=1.0)),
        left_open=st.booleans(),
        right_open=st.booleans(),
        eps=hu.floats(min_value=1e-6, max_value=1e-4),
        ub=hu.floats(min_value=-1.0, max_value=1.0),
        lb=hu.floats(min_value=-1.0, max_value=1.0),
        **hu.gcs_cpu_only
    )
    def test_bounded_grad_proj(self, X, left_open, right_open, eps, ub, lb, gc, dc):
        if ub - (eps if right_open else 0.) < lb + (eps if left_open else 0.):
            return
        param = core.BlobReference("X")
        workspace.FeedBlob(param, X)
        train_init_net, train_net = self.get_training_nets()
        reg = regularizer.BoundedGradientProjection(
            lb=lb, ub=ub, left_open=left_open, right_open=right_open, epsilon=eps
        )
        output = reg(train_net, train_init_net, param, by=RegularizationBy.ON_LOSS)
        reg(
            train_net,
            train_init_net,
            param,
            grad=None,
            by=RegularizationBy.AFTER_OPTIMIZER,
        )
        workspace.RunNetOnce(train_init_net)
        workspace.RunNetOnce(train_net)

        def ref(X):
            return np.clip(
                X, lb + (eps if left_open else 0.), ub - (eps if right_open else 0.)
            )

        assert output is None
        npt.assert_allclose(workspace.blobs[param], ref(X), atol=1e-7)

    @given(
        output_dim=st.integers(1, 10),
        input_num=st.integers(3, 30),
        reg_weight=st.integers(0, 10)
    )
    def test_group_l1_norm(self, output_dim, input_num, reg_weight):
        """
        1. create a weight blob
        2. create random group splits
        3. run group_l1_nrom with the weight blob
        4. run equivalent np operations to calculate group l1 norm
        5. compare if the results from 3 and 4 are equal
        """
        def compare_reference(weight, group_boundaries, reg_lambda, output):
            group_splits = np.hsplit(weight, group_boundaries[1:-1])
            l2_reg = np.sqrt([np.sum(np.square(g)) for g in group_splits])
            l2_normalized = np.multiply(l2_reg,
                np.array([np.sqrt(g.shape[1]) for g in group_splits]))
            result = np.multiply(np.sum(l2_normalized), reg_lambda)
            npt.assert_almost_equal(result, workspace.blobs[output], decimal=2)

        weight = np.random.rand(output_dim, input_num).astype(np.float32)

        feature_num = np.random.randint(low=1, high=input_num - 1)
        group_boundaries = [0]
        group_boundaries = np.append(
            group_boundaries,
            np.sort(
                np.random.choice(range(1, input_num - 1), feature_num, replace=False)
            ),
        )
        group_boundaries = np.append(group_boundaries, [input_num])
        split_info = np.diff(group_boundaries)

        weight_blob = core.BlobReference("weight_blob")
        workspace.FeedBlob(weight_blob, weight)

        train_init_net, train_net = self.get_training_nets()
        reg = regularizer.GroupL1Norm(reg_weight * 0.1, split_info.tolist())
        output = reg(
            train_net, train_init_net, weight_blob, by=RegularizationBy.ON_LOSS
        )

        workspace.RunNetOnce(train_init_net)
        workspace.RunNetOnce(train_net)
        compare_reference(weight, group_boundaries, reg_weight * 0.1, output)

    @given(
        param_dim=st.integers(10, 30),
        k=st.integers(5, 9),
        reg_weight=st.integers(0, 10)
    )
    def test_l1_norm_trimmed(self, param_dim, k, reg_weight):
        weight = np.random.rand(param_dim).astype(np.float32)
        weight_blob = core.BlobReference("weight_blob")
        workspace.FeedBlob(weight_blob, weight)

        train_init_net, train_net = self.get_training_nets()
        reg = regularizer.L1NormTrimmed(reg_weight * 0.1, k)
        output = reg(
            train_net, train_init_net, weight_blob, by=RegularizationBy.ON_LOSS
        )

        workspace.RunNetOnce(train_init_net)
        workspace.RunNetOnce(train_net)
        result = np.sum(np.sort(np.absolute(weight))[:(param_dim - k)]) * reg_weight * 0.1
        npt.assert_almost_equal(result, workspace.blobs[output], decimal=2)

    @given(
        param_dim=st.integers(10, 30),
        k=st.integers(5, 9),
        l1=st.integers(0, 10),
        l2=st.integers(0, 10)
    )
    def test_elastic_l1_norm_trimmed(self, param_dim, k, l1, l2):
        weight = np.random.rand(param_dim).astype(np.float32)
        weight_blob = core.BlobReference("weight_blob")
        workspace.FeedBlob(weight_blob, weight)

        train_init_net, train_net = self.get_training_nets()
        reg = regularizer.ElasticNetL1NormTrimmed(l1 * 0.1, l2 * 0.1, k)
        output = reg(
            train_net, train_init_net, weight_blob, by=RegularizationBy.ON_LOSS
        )

        workspace.RunNetOnce(train_init_net)
        workspace.RunNetOnce(train_net)
        l1_norm = np.sum(np.sort(np.absolute(weight))[:(param_dim - k)])
        l2_norm = np.sum(np.square(weight))
        result = l1_norm * l1 * 0.1 + l2_norm * l2 * 0.1
        npt.assert_almost_equal(result, workspace.blobs[output], decimal=2)