Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

Version: 1.8.0 

/ python / onnx / tests / conversion_test.py

## @package onnx
# Module caffe2.python.onnx.tests.conversion_test





import json
import tempfile
import textwrap
import traceback
import unittest
import zipfile

from caffe2.proto import caffe2_pb2
from caffe2.python import brew, core
from caffe2.python.model_helper import ModelHelper
from click.testing import CliRunner
import numpy as np
from onnx import helper, ModelProto, TensorProto
from caffe2.python.onnx.helper import c2_native_run_net

from caffe2.python.onnx.bin.conversion import caffe2_to_onnx, onnx_to_caffe2
import caffe2.python.onnx.backend as c2
from caffe2.python.onnx.tests.test_utils import TestCase


class TestConversion(TestCase):
    def _run_command(self, cmd, *args, **kwargs):
        runner = CliRunner()
        result = runner.invoke(cmd, *args, **kwargs)
        self.assertEqual(result.exit_code, 0, textwrap.dedent('''
        Command exited with non-zero exit code:
        output: {}
        exception: {}
        exc_info: {}
        '''.format(result.output,
                   result.exception,
                   traceback.format_exception(*result.exc_info))))
        return result

    def test_caffe2_to_onnx(self):
        caffe2_net = tempfile.NamedTemporaryFile()
        caffe2_init_net = tempfile.NamedTemporaryFile()
        output = tempfile.NamedTemporaryFile()

        model = ModelHelper(name='caffe2-to-onnx-test')
        brew.relu(model, ["X"], "Y")
        caffe2_net.write(model.net.Proto().SerializeToString())
        caffe2_net.flush()

        init_model = ModelHelper(name='caffe2-to-onnx-init-test')
        init_model.net.GivenTensorFill([], 'X', shape=[2, 2],
                                       values=np.zeros((2, 2)).flatten().astype(float))
        caffe2_init_net.write(init_model.net.Proto().SerializeToString())
        caffe2_init_net.flush()

        self._run_command(
            caffe2_to_onnx, [
                caffe2_net.name,
                '--caffe2-init-net', caffe2_init_net.name,
                '--output', output.name,
            ],
            catch_exceptions=False,
        )

        onnx_model = ModelProto()
        onnx_model.ParseFromString(output.read())
        self.assertEqual(len(onnx_model.graph.node), 1)
        self.assertEqual(onnx_model.graph.node[0].op_type, 'Relu')
        self.assertEqual(len(onnx_model.graph.initializer), 1)
        self.assertEqual(onnx_model.graph.initializer[0].name, onnx_model.graph.input[0].name)

    def test_caffe2_to_onnx_value_info(self):
        caffe2_net = tempfile.NamedTemporaryFile()
        output = tempfile.NamedTemporaryFile()

        model = ModelHelper(name='caffe2-to-onnx-test')
        brew.relu(model, ["X"], "Y")
        caffe2_net.write(model.net.Proto().SerializeToString())
        caffe2_net.flush()

        args = [caffe2_net.name, '--output', output.name]
        self.assertRaisesRegex(Exception,
                               'value info',
                               self._run_command, caffe2_to_onnx, args)

        args.extend([
            '--value-info',
            json.dumps({
                'X': (TensorProto.FLOAT, (2, 2)),
            })])
        self._run_command(caffe2_to_onnx, args)

        onnx_model = ModelProto()
        onnx_model.ParseFromString(output.read())
        self.assertEqual(len(onnx_model.graph.node), 1)
        self.assertEqual(onnx_model.graph.node[0].op_type, 'Relu')
        self.assertEqual(len(onnx_model.graph.initializer), 0)

    def test_onnx_to_caffe2(self):
        onnx_model = tempfile.NamedTemporaryFile()
        output = tempfile.NamedTemporaryFile()
        init_net_output = tempfile.NamedTemporaryFile()

        node_def = helper.make_node(
            "Mul", ["X", "W"], ["Y"])
        graph_def = helper.make_graph(
            [node_def],
            "test",
            [helper.make_tensor_value_info("X", TensorProto.FLOAT, (2, 3)),
             helper.make_tensor_value_info("W", TensorProto.FLOAT, (1, 3))],
            [helper.make_tensor_value_info("Y", TensorProto.FLOAT, (2, 3))],
            initializer=[helper.make_tensor("W",
                                            TensorProto.FLOAT,
                                            [1, 3],
                                            np.zeros((1, 3)).flatten().astype(float))])
        model_def = helper.make_model(graph_def, producer_name='onnx-to-caffe2-test')
        onnx_model.write(model_def.SerializeToString())
        onnx_model.flush()

        self._run_command(
            onnx_to_caffe2, [
                onnx_model.name,
                '--output', output.name,
                '--init-net-output', init_net_output.name,
            ])

        caffe2_net = caffe2_pb2.NetDef()
        caffe2_net.ParseFromString(output.read())
        self.assertEqual(len(caffe2_net.op), 1)
        self.assertEqual(caffe2_net.op[0].type, 'Mul')

        caffe2_init_net = caffe2_pb2.NetDef()
        caffe2_init_net.ParseFromString(init_net_output.read())
        self.assertEqual(len(caffe2_init_net.op), 1)
        self.assertEqual(set(sum([list(init_op.output)
                                  for init_op in caffe2_init_net.op], [])),
                         {'W'})

    def test_onnx_to_caffe2_zipfile(self):
        buf = tempfile.NamedTemporaryFile()
        onnx_model = zipfile.ZipFile(buf, 'w')

        node_def = helper.make_node(
            "MatMul", ["X", "W"], ["Y"])
        X = np.random.rand(2, 3).astype(np.float32)
        W = np.random.rand(3, 2).flatten().astype(np.float32)
        graph_def = helper.make_graph(
            [node_def],
            "test",
            [helper.make_tensor_value_info("X", TensorProto.FLOAT, (2, 3)),
             helper.make_tensor_value_info("W", TensorProto.FLOAT, (3, 2))],
            [helper.make_tensor_value_info("Y", TensorProto.FLOAT, (2, 2))],
            initializer=[helper.make_tensor("W",
                                            TensorProto.FLOAT,
                                            [3, 2],
                                            W.tobytes(),
                                            raw=True)])
        model_def = helper.make_model(graph_def, producer_name='onnx-to-caffe2-test')
        onnx_model.writestr('__MODEL_PROTO', model_def.SerializeToString())
        onnx_model.writestr('W', W.tobytes())
        onnx_model.close()

        W = W.reshape((3, 2))
        Y_expect = np.matmul(X, W)

        c2_model = c2.prepare_zip_archive(buf)
        Y = c2_model.run(X).Y
        np.testing.assert_allclose(Y, Y_expect)

    def _make_fake_if_op(self, true_nodes, false_nodes, output_types):
        true = helper.make_tensor("condition", TensorProto.BOOL, (), [True])
        true_graph = helper.make_graph(true_nodes, "true_graph", [], [
            helper.make_tensor_value_info("Y", TensorProto.FLOAT, (2, 2)),
        ])
        false_graph = helper.make_graph(false_nodes, "false_graph", [], [
            helper.make_tensor_value_info("Y", TensorProto.FLOAT, (2, 2)),
        ])
        if_inputs = ["condition"]
        if_outputs = [name for _, _, name in output_types]
        retval_nodes = [
            helper.make_node("Constant", [], ["condition"], value=true),
            helper.make_node("If", if_inputs, if_outputs, then_branch=true_graph,
                             else_branch=false_graph)
        ]
        return retval_nodes

    def test_onnx_to_caffe2_if(self):
        true_nodes = [helper.make_node(
            "MatMul", ["X", "W"], ["Y"])]
        false_nodes = [helper.make_node("Slice", ["X"], ["Y"], axes=[0, 1],
                                        starts=[0, 0], ends=[2, 2])]
        nodes = self._make_fake_if_op(true_nodes, false_nodes, [(TensorProto.FLOAT, (2, 2), "Y")])
        X = np.random.rand(2, 3).astype(np.float32)
        W = np.random.rand(3, 2).flatten().astype(np.float32)
        graph_def = helper.make_graph(
            nodes,
            "test",
            [helper.make_tensor_value_info("X", TensorProto.FLOAT, (2, 3)),
             helper.make_tensor_value_info("W", TensorProto.FLOAT, (3, 2))],
            [helper.make_tensor_value_info("Y", TensorProto.FLOAT, (2, 2))],
            initializer=[helper.make_tensor("W",
                                            TensorProto.FLOAT,
                                            [3, 2],
                                            W.tolist())]
        )
        onnx_id = helper.make_opsetid("", 9)
        model_def = helper.make_model(graph_def, producer_name='onnx-to-caffe2-test',
                                      opset_imports=[onnx_id])

        p = c2.prepare(model_def)
        Y = np.matmul(X, W.reshape(3, 2))
        out = p.run(X)
        np.testing.assert_allclose(out.Y, Y)

    # input_types and output_types are lists of triples of (name, type, shape)
    def _make_fake_loop_op(self, body_nodes, input_types, output_types):
        ten = helper.make_tensor("trip_count_value", TensorProto.INT64, (1,), [10])
        true = helper.make_tensor("condition", TensorProto.BOOL, (1,), [True])
        # lcd is a dummy loop-carried dependency that only exists because
        # right now the schema checker is broken and assumes a variadic
        # input needs at least one value.
        graph_inputs = [helper.make_tensor_value_info("i", TensorProto.INT64, (1,)),
                        helper.make_tensor_value_info("cond", TensorProto.BOOL, (1,))]
        for type, shape, name in input_types:
            graph_inputs.append(helper.make_tensor_value_info("_" + name, type, shape))
        graph_outputs = [helper.make_tensor_value_info("cond", TensorProto.BOOL, (1,))]
        for type, shape, name in output_types:
            graph_outputs.append(helper.make_tensor_value_info("_" + name, type, shape))
        body_graph = helper.make_graph(body_nodes, "body_graph", graph_inputs,
                                       graph_outputs)
        loop_inputs = ["trip_count", "condition"]
        loop_inputs.extend([name for _, _, name in input_types])
        loop_outputs = [name for _, _, name in output_types]
        retval_nodes = [
            helper.make_node("Constant", [], ["trip_count"], value=ten),
            helper.make_node("Constant", [], ["condition"], value=true),
            helper.make_node("Loop", loop_inputs, loop_outputs, body=body_graph)
        ]
        return retval_nodes

    def test_onnx_to_caffe2_loop(self):
        body_nodes = [helper.make_node(
            "MatMul", ["_X", "W"], ["_Y"])]
        nodes = self._make_fake_loop_op(body_nodes,
                                        [(TensorProto.FLOAT, (2, 2), "X")],
                                        [(TensorProto.FLOAT, (2, 2), "Y")])
        X = np.random.rand(2, 2).astype(np.float32)
        W = np.random.rand(2, 2).flatten().astype(np.float32)
        graph_def = helper.make_graph(
            nodes,
            "test",
            [helper.make_tensor_value_info("X", TensorProto.FLOAT, (2, 2)),
             helper.make_tensor_value_info("W", TensorProto.FLOAT, (2, 2))],
            [helper.make_tensor_value_info("Y", TensorProto.FLOAT, (2, 2))],
            initializer=[helper.make_tensor("W",
                                            TensorProto.FLOAT,
                                            [2, 2],
                                            W.tolist())]
        )
        model_def = helper.make_model(graph_def, producer_name='onnx-to-caffe2-test')
        Y = X
        for _ in range(10):
            Y = np.matmul(Y, W.reshape(2, 2))
        p = c2.prepare(model_def)
        out = p.run(X)
        np.testing.assert_allclose(out.Y, Y)

    # TODO investigate why this is failing after changing Reshape
    # operator from taking the new shape as attribute to as input
    @unittest.skip('Start failing after Reshape op change')
    def test_convert_end2end(self):
        predict_net_f = tempfile.NamedTemporaryFile()
        init_net_f = tempfile.NamedTemporaryFile()
        onnx_model_f = tempfile.NamedTemporaryFile()

        x = 'X'
        w = 'W'
        b = 'b'
        y = 'Y'

        predict_net = caffe2_pb2.NetDef()
        predict_net.name = 'test-convert-end2end'
        predict_net.external_input[:] = [x, w, b]
        predict_net.external_output[:] = [y]
        predict_net.op.extend([
            core.CreateOperator(
                'FC',
                inputs=[x, w, b],
                outputs=[y],
                axis=2,
            ),
        ])
        predict_net_f.write(predict_net.SerializeToString())
        predict_net_f.flush()

        init_net = caffe2_pb2.NetDef()
        init_net.name = 'test-convert-end2end-init'
        init_net.external_output[:] = [w, b]
        x_val = np.random.randn(1, 3, 2).astype(np.float32)
        w_val = np.random.randn(4, 2).astype(np.float32)
        b_val = np.random.randn(4).astype(np.float32)
        init_net.op.extend([
            core.CreateOperator(
                'GivenTensorFill',
                [],
                [w],
                values=w_val,
                shape=w_val.shape,
            ),
            core.CreateOperator(
                'GivenTensorFill',
                [],
                [b],
                values=b_val,
                shape=b_val.shape,
            ),
        ])
        init_net_f.write(init_net.SerializeToString())
        init_net_f.flush()

        y_val = np.matmul(x_val, w_val.transpose()) + b_val
        for _ in range(5):
            self._run_command(
                caffe2_to_onnx, [
                    predict_net_f.name,
                    '--caffe2-init-net', init_net_f.name,
                    '--output', onnx_model_f.name,
                    '--value-info',
                    json.dumps({
                        x: (TensorProto.FLOAT, (1, 3, 2)),
                    }),
                ],
                catch_exceptions=False,
            )

            onnx_model_f.seek(0)
            onnx_model = ModelProto()
            onnx_model.ParseFromString(onnx_model_f.read())
            np.testing.assert_almost_equal(
                c2.run_model(
                    onnx_model, {onnx_model.graph.input[0].name: x_val}),
                [y_val])
Loading ...