Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

/ python / core_test.py






from inspect import currentframe, getframeinfo
import unittest

import numpy as np

from caffe2.proto import caffe2_pb2
from caffe2.python import core, workspace, schema, test_util
from caffe2.python.task import Node, Task


class TestScopes(test_util.TestCase):
    def testBlobReferenceIsIndependentFromNameScope(self):
        blob_v = core.BlobReference("v")
        with core.NameScope("foo"):
            blob_w = core.BlobReference("w")
            with core.NameScope("bar"):
                blob_x = core.BlobReference("x")
        self.assertEqual(str(blob_v), "v")
        self.assertEqual(str(blob_w), "w")
        self.assertEqual(str(blob_x), "x")

    def testNameScopeWithOp(self):
        global_x = core.BlobReference("x")
        global_y = core.BlobReference("y")
        with core.NameScope("foo"):
            # Raw strings should have namescope prepended.
            op = core.CreateOperator("Relu", "x", "y")
            self.assertEqual(len(op.input), 1)
            self.assertEqual(op.input[0], "foo/x")
            self.assertEqual(len(op.output), 1)
            self.assertEqual(op.output[0], "foo/y")
            # BlobReferences should not.
            op = core.CreateOperator("Relu", global_x, global_y)
            self.assertEqual(len(op.input), 1)
            self.assertEqual(op.input[0], "x")
            self.assertEqual(len(op.output), 1)
            self.assertEqual(op.output[0], "y")

    def testNameScopeWithReset(self):
        with core.NameScope("foo"):
            # foo/
            op = core.CreateOperator("Relu", "x", "y")
            self.assertEqual(len(op.input), 1)
            self.assertEqual(op.input[0], "foo/x")
            self.assertEqual(len(op.output), 1)
            self.assertEqual(op.output[0], "foo/y")
            with core.NameScope("bar"):
                # foo/bar/
                op = core.CreateOperator("Relu", "x", "y")
                self.assertEqual(len(op.input), 1)
                self.assertEqual(op.input[0], "foo/bar/x")
                self.assertEqual(len(op.output), 1)
                self.assertEqual(op.output[0], "foo/bar/y")
            # Back to foo/
            op = core.CreateOperator("Relu", "x", "y")
            self.assertEqual(len(op.input), 1)
            self.assertEqual(op.input[0], "foo/x")
            self.assertEqual(len(op.output), 1)
            self.assertEqual(op.output[0], "foo/y")
            with core.NameScope("bar", reset=True):
                # bar/
                op = core.CreateOperator("Relu", "x", "y")
                self.assertEqual(len(op.input), 1)
                self.assertEqual(op.input[0], "bar/x")
                self.assertEqual(len(op.output), 1)
                self.assertEqual(op.output[0], "bar/y")
            # Back to foo/
            op = core.CreateOperator("Relu", "x", "y")
            self.assertEqual(len(op.input), 1)
            self.assertEqual(op.input[0], "foo/x")
            self.assertEqual(len(op.output), 1)
            self.assertEqual(op.output[0], "foo/y")

    def testDeviceScope(self):
        # No device
        op = core.CreateOperator("Relu", "x", "y")
        self.assertFalse(op.HasField('device_option'))
        # explicitly setting a device
        device_option = caffe2_pb2.DeviceOption()
        device_option.device_type = workspace.GpuDeviceType
        device_option.device_id = 1
        op = core.CreateOperator("Relu", "x", "y", device_option=device_option)
        self.assertTrue(op.HasField('device_option'))
        self.assertEqual(op.device_option.device_type, workspace.GpuDeviceType)
        self.assertEqual(op.device_option.device_id, 1)
        with core.DeviceScope(device_option):
            # from device scope
            op = core.CreateOperator("Relu", "x", "y")
            self.assertTrue(op.HasField('device_option'))
            self.assertEqual(op.device_option.device_type, workspace.GpuDeviceType)
            self.assertEqual(op.device_option.device_id, 1)
            # from an overridden device option
            override_device = caffe2_pb2.DeviceOption()
            override_device.device_type = caffe2_pb2.CPU
            op = core.CreateOperator(
                "Relu", "x", "y", device_option=override_device)
            self.assertTrue(op.HasField('device_option'))
            self.assertEqual(op.device_option.device_type, caffe2_pb2.CPU)
        # back from normal: no device
        op = core.CreateOperator("Relu", "x", "y")
        self.assertFalse(op.HasField('device_option'))
        device_option = caffe2_pb2.DeviceOption()

    def testNameAndDeviceScopeTogether(self):
        device_option = caffe2_pb2.DeviceOption()
        device_option.device_type = workspace.GpuDeviceType
        device_option.device_id = 1
        with core.DeviceScope(device_option):
            with core.NameScope("foo"):
                op = core.CreateOperator("Relu", "x", "y")
                self.assertTrue(op.HasField('device_option'))
                self.assertEqual(op.device_option.device_type, workspace.GpuDeviceType)
                self.assertEqual(op.device_option.device_id, 1)
                self.assertEqual(len(op.input), 1)
                self.assertEqual(op.input[0], "foo/x")
                self.assertEqual(len(op.output), 1)
                self.assertEqual(op.output[0], "foo/y")


class TestCloneNet(test_util.TestCase):
    def testPartialClone(self):
        params = core.Net('params')
        p1 = params.ConstantFill([], ['p1'])
        workspace.CreateNet(params)
        workspace.RunNetOnce(params)

        n = core.Net('original')
        a1 = n.AddExternalInput('a1')
        a2 = n.AddExternalInput('a2')
        b1, b2 = n.Concat([a1, a2], ['b1', 'b2'], axis=0)
        c1 = n.Sum([b1, p1], ['c1'])
        c2 = n.Sum([b2], ['c2'])
        d = n.Sum([c1, c2], ['d'])

        # test that gradient ops are ignored when partial-cloning
        n.AddGradientOperators([d])

        # test some in-place ops
        k = n.Sum([p1], ['k'])
        e = n.Sum([d], ['e'])
        e = n.Sum([e, k], [e])
        e = n.Sum([e], [e])
        f = n.Sum(e, ['f'])

        def net_assert(net, num_ops, inputs, outputs, internals):
            self.assertEqual(len(net.Proto().op), num_ops)
            self.assertEqual(set(net.Proto().external_input), inputs)
            self.assertEqual(set(net.Proto().external_output), outputs)
            all_blobs = set(net.Proto().external_input)
            all_blobs |= set(net.Proto().external_output)
            for op in net.Proto().op:
                all_blobs |= set(op.input) | set(op.output)
            self.assertEqual(all_blobs, inputs | outputs | internals)
            # create net to make sure its valid
            for input in inputs:
                workspace.FeedBlob(input, np.array([]))
            workspace.CreateNet(net)

        n2, (d22, ) = n.ClonePartial('f1', {a1: 'a11', a2: 'a22'}, [d])
        net_assert(
            n2, 4, {'p1', 'a11', 'a22'}, {'f1/d'},
            {'f1/b1', 'f1/b2', 'f1/c1', 'f1/c2', 'p1'})
        self.assertTrue(isinstance(d22, core.BlobReference))
        self.assertEqual(d22.Net(), n2)
        self.assertEqual(str(d22), 'f1/d')

        n3, (d22, ) = n.ClonePartial('f2', [b1, b2], [d])
        net_assert(
            n3, 3, {'p1', 'b1', 'b2'}, {'f2/d'}, {'f2/c1', 'f2/c2', 'p1'})
        self.assertEqual(str(d22), 'f2/d')

        n4, (c22, ) = n.ClonePartial('f3', [b1], [c1])
        net_assert(n4, 1, {'p1', 'b1'}, {'f3/c1'}, {'p1'})
        self.assertEqual(str(c22), 'f3/c1')

        n5, (c11, c22) = n.ClonePartial('f4', [b1, b2], [c1, c2])
        net_assert(n5, 2, {'p1', 'b1', 'b2'}, {'f4/c1', 'f4/c2'}, {'p1'})
        self.assertEqual(str(c11), 'f4/c1')
        self.assertEqual(str(c22), 'f4/c2')

        with self.assertRaises(AssertionError):
            n.ClonePartial('f4', [a1, a2, c2], [d])

        n6, (e22, ) = n.ClonePartial('f5', [d], [e])
        net_assert(n6, 4, {'p1', 'd'}, {'f5/e'}, {'f5/k', 'p1'})
        self.assertEqual(str(e22), 'f5/e')

        n8, (e22, f22) = n.ClonePartial('f7', [d], [e, f])
        net_assert(n8, 5, {'p1', 'd'}, {'f7/e', 'f7/f'}, {'p1', 'f7/k'})
        self.assertEqual(str(e22), 'f7/e')
        self.assertEqual(str(f22), 'f7/f')

        params._CheckLookupTables()
        n._CheckLookupTables()

    def test_mask_clone_update_external_list(self):
        n = core.Net('original')
        a1 = n.AddExternalInput('a1')
        a2 = n.AddExternalInput('a2')
        p1 = 'p1'
        b1, b2 = n.Concat([a1, a2], ['b1', 'b2'], axis=0)
        c1 = n.Sum([b1, p1], ['c1'])
        c2 = n.Sum([b2], ['c2'])
        n.Sum([c1, c2], ['d'])
        new_net = n.Clone(
            "new", op_id_mask=[0, 1], keep_schema=True, update_external_list=True)
        self.assertEqual(
            sorted(map(str, new_net.external_inputs)),
            ["a1", "a2", "p1"],
            "external input not matched",
        )
        self.assertEqual(
            sorted(map(str, new_net.external_outputs)),
            ["b2", "c1"],
            "external output not matched",
        )
        new_net = n.Clone(
            "new2", op_id_mask=[2, 3], keep_schema=True, update_external_list=True)
        self.assertEqual(
            sorted(map(str, new_net.external_inputs)),
            ["b2", "c1"],
            "external input not matched",
        )
        self.assertEqual(
            sorted(map(str, new_net.external_outputs)),
            ["d"],
            "external output not matched",
        )

    def test_control_op_remap(self):
        # Subnets under If/AsyncIf operators should get name remapping when cloned
        n = core.Net("original")
        then_net = core.Net("a")
        then_net.FC(["inputA"], "fc_a")
        else_net = core.Net("b")
        else_net.FC(["inputB"], "fc_b")
        n.If(
            inputs=[],
            outputs=[],
            then_net=then_net.Proto(),
            else_net=else_net.Proto(),
        )
        copied = n.Clone("copied", blob_remap={"inputA": "inputX"})
        if_op = copied._net.op[0]
        self.assertEqual(if_op.arg[0].n.op[0].input, ["inputX"])
        self.assertEqual(if_op.arg[1].n.op[0].input, ["inputB"])


class TestExternalInputs(test_util.TestCase):
    def testAddExternalInputShouldRaiseIfDuplicate(self):
        net = core.Net("test")
        net.AddExternalInput(
            schema.Struct(("x", schema.Scalar(np.float))),
        )
        with self.assertRaises(AssertionError):
            net.AddExternalInput(
                schema.Struct(("x", schema.Scalar(np.float))),
            )

    def testAddExternalInputShouldRaiseIfDuplicateInSameCall(self):
        net = core.Net("test")
        with self.assertRaises(AssertionError):
            net.AddExternalInput(
                schema.Struct(("x", schema.Scalar(np.float))),
                schema.Struct(("x", schema.Scalar(np.float))),
            )

    def testSetInputRecordWithBlobs(self):
        net = core.Net("test")
        record = schema.NewRecord(net, schema.Struct(
            ("x", schema.Scalar(np.float)),
        ))
        input_record = net.set_input_record(record)
        self.assertTrue(net.BlobIsDefined(input_record.x()))
        self.assertIn(input_record.x(), net.external_inputs)

    def testSetInputRecordWithoutBlobs(self):
        net = core.Net("test")
        record = schema.Struct(("x", schema.Scalar(np.float)))
        input_record = net.set_input_record(record)
        self.assertTrue(net.BlobIsDefined(input_record.x()))
        self.assertIn(input_record.x(), net.external_inputs)


class TestCreateOperator(test_util.TestCase):
    def testCreate(self):
        device_option = caffe2_pb2.DeviceOption()
        device_option.device_type = workspace.GpuDeviceType
        device_option.device_id = 1
        op = core.CreateOperator(
            "Ludicrous", "x", "y", name="ludicrous",
            control_input="z", device_option=device_option,
            engine="WARP", arg1=1, arg2="2", arg3=[1, 2, 3])
        self.assertEqual(op.type, "Ludicrous")
        self.assertEqual(op.name, "ludicrous")
        self.assertEqual(op.engine, "WARP")
        self.assertEqual(len(op.input), 1)
        self.assertEqual(op.input[0], "x")
        self.assertEqual(len(op.output), 1)
        self.assertEqual(op.output[0], "y")
        self.assertEqual(len(op.control_input), 1)
        self.assertEqual(op.control_input[0], "z")
        self.assertTrue(op.HasField('device_option'))
        self.assertEqual(op.device_option.device_type, workspace.GpuDeviceType)
        self.assertEqual(op.device_option.device_id, 1)
        self.assertTrue(len(op.arg), 3)

        # can't guarantee ordering of kwargs, so generate a set of args
        # to test with
        arg_map = {}
        for arg in op.arg:
            arg_map[arg.name] = arg

        # Check all elements exist that should
        self.assertEqual("arg1" in arg_map, True)
        self.assertEqual("arg2" in arg_map, True)
        self.assertEqual("arg3" in arg_map, True)

        # Now test that all args were initialized correctly
        self.assertEqual(arg_map["arg1"].i, 1)
        self.assertEqual(arg_map["arg2"].s, b"2")
        self.assertEqual(list(arg_map["arg3"].ints), [1, 2, 3])


class TestAutoNaming(test_util.TestCase):
    def assertOperatorListEqual(self, operatorDefList1, operatorDefList2):
        for op in operatorDefList1:
            op.debug_info = ""
        for op in operatorDefList2:
            op.debug_info = ""
        self.assertEqual(operatorDefList1, operatorDefList2)
    """
    Test that operators are named with different names, and that automatically
    named blob names don't clash intra or inter networks.
    """
    def test_next_blob(self):
        def create_net():
            net = core.Net('net')
            with core.NameScope('foo'):
                net.Add(['a', 'b'], net.NextScopedBlob('ab'))
Loading ...