Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

Version: 1.8.0 

/ python / onnx / test_onnxifi.py






import numpy as np
import time
import unittest

import onnx
import onnx.defs
from onnx.backend.base import namedtupledict
from onnx.helper import make_node, make_graph, make_tensor_value_info, make_model
from caffe2.proto import caffe2_pb2
from caffe2.python import core, workspace
from caffe2.python.models.download import ModelDownloader
from caffe2.python.onnx.onnxifi import onnxifi_caffe2_net
from caffe2.python.onnx.tests.test_utils import TestCase

ONNXIFI_DATATYPE_FLOAT32 = 1


def _print_net(net):
    for i in net.external_input:
        print("Input: {}".format(i))
    for i in net.external_output:
        print("Output: {}".format(i))
    for op in net.op:
        print("Op {}".format(op.type))
        for x in op.input:
            print("  input: {}".format(x))
        for y in op.output:
            print("  output: {}".format(y))


class OnnxifiTest(TestCase):
    @unittest.skip("Need ONNXIFI backend support")
    def test_relu_graph(self):
        batch_size = 1
        X = np.random.randn(batch_size, 1, 3, 2).astype(np.float32)
        graph_def = make_graph(
            [make_node("Relu", ["X"], ["Y"])],
            name="test",
            inputs=[make_tensor_value_info("X", onnx.TensorProto.FLOAT,
                [batch_size, 1, 3, 2])],
            outputs=[make_tensor_value_info("Y", onnx.TensorProto.FLOAT,
                [batch_size, 1, 3, 2])])
        model_def = make_model(graph_def, producer_name='relu-test')
        op = core.CreateOperator(
            "Onnxifi",
            ["X"],
            ["Y"],
            onnx_model=model_def.SerializeToString(),
            input_names=["X"],
            output_names=["Y"],
            output_shape_hint_0=[ONNXIFI_DATATYPE_FLOAT32, batch_size, 1, 3, 2])
        workspace.FeedBlob("X", X)
        workspace.RunOperatorOnce(op)
        Y = workspace.FetchBlob("Y")
        np.testing.assert_almost_equal(Y, np.maximum(X, 0))

    @unittest.skip("Need ONNXIFI backend support")
    def test_conv_graph(self):
        X = np.array([[[[0., 1., 2., 3., 4.],  # (1, 1, 5, 5) input tensor
                        [5., 6., 7., 8., 9.],
                        [10., 11., 12., 13., 14.],
                        [15., 16., 17., 18., 19.],
                        [20., 21., 22., 23., 24.]]]]).astype(np.float32)
        W = np.array([[[[1., 1., 1.],  # (1, 1, 3, 3) tensor for convolution weights
                        [1., 1., 1.],
                        [1., 1., 1.]]]]).astype(np.float32)
        Y_without_padding = np.array([[[[54., 63., 72.],  # (1, 1, 3, 3) output tensor
                                        [99., 108., 117.],
                                        [144., 153., 162.]]]]).astype(np.float32)
        graph_def = make_graph(
            [make_node(
                'Conv',
                inputs=['X', 'W'],
                outputs=['Y'],
                kernel_shape=[3, 3],
                # Default values for other attributes: strides=[1, 1], dilations=[1, 1], groups=1
                pads=[0, 0, 0, 0],
            )],
            name="test",
            inputs=[make_tensor_value_info("X", onnx.TensorProto.FLOAT, [1, 1, 5, 5]),
                make_tensor_value_info("W", onnx.TensorProto.FLOAT, [1, 1, 3, 3]),
            ],
            outputs=[make_tensor_value_info("Y", onnx.TensorProto.FLOAT,
                [1, 1, 3, 3])])
        model_def = make_model(graph_def, producer_name='conv-test')
        # We intentional rewrite the input/output name so test that the
        # input/output binding of c2 op is positional
        op = core.CreateOperator(
            "Onnxifi",
            ["X0"],
            ["Y0"],
            onnx_model=model_def.SerializeToString(),
            initializers=["W", "W0"],
            input_names=["X"],
            output_names=["Y"],
            output_shape_hint_0=[ONNXIFI_DATATYPE_FLOAT32, 1, 1, 3, 3])
        workspace.FeedBlob("X0", X)
        workspace.FeedBlob("W0", W)
        workspace.RunOperatorOnce(op)
        Y = workspace.FetchBlob("Y0")
        np.testing.assert_almost_equal(Y, Y_without_padding)


class OnnxifiTransformTest(TestCase):
    def setUp(self):
        self.model_downloader = ModelDownloader()

    def _add_head_tail(self, pred_net, new_head, new_tail):
        orig_head = pred_net.external_input[0]
        orig_tail = pred_net.external_output[0]

        # Add head
        head = caffe2_pb2.OperatorDef()
        head.type = "Copy"
        head.input.append(new_head)
        head.output.append(orig_head)
        dummy = caffe2_pb2.NetDef()
        dummy.op.extend(pred_net.op)
        del pred_net.op[:]
        pred_net.op.extend([head])
        pred_net.op.extend(dummy.op)
        pred_net.external_input[0] = new_head

        # Add tail
        tail = caffe2_pb2.OperatorDef()
        tail.type = "Copy"
        tail.input.append(orig_tail)
        tail.output.append(new_tail)
        pred_net.op.extend([tail])
        pred_net.external_output[0] = new_tail

    @unittest.skip("Need ONNXIFI backend support")
    def test_resnet50_core(self):
        N = 1
        repeat = 1
        print("Batch size: {}, repeat inference {} times".format(N, repeat))
        init_net, pred_net, _ = self.model_downloader.get_c2_model('resnet50')
        self._add_head_tail(pred_net, 'real_data', 'real_softmax')
        input_blob_dims = (N, 3, 224, 224)
        input_name = "real_data"

        device_option = core.DeviceOption(caffe2_pb2.CPU, 0)
        init_net.device_option.CopyFrom(device_option)
        pred_net.device_option.CopyFrom(device_option)
        for op in pred_net.op:
            op.device_option.CopyFrom(device_option)
        net_outputs = pred_net.external_output
        Y_c2 = None
        data = np.random.randn(*input_blob_dims).astype(np.float32)
        c2_time = 1
        workspace.SwitchWorkspace("onnxifi_test", True)
        with core.DeviceScope(device_option):
            workspace.FeedBlob(input_name, data)
            workspace.RunNetOnce(init_net)
            workspace.CreateNet(pred_net)
            start = time.time()
            for _ in range(repeat):
                workspace.RunNet(pred_net.name)
            end = time.time()
            c2_time = end - start
            output_values = [workspace.FetchBlob(name) for name in net_outputs]
            Y_c2 = namedtupledict('Outputs', net_outputs)(*output_values)
        workspace.ResetWorkspace()

        # Fill the workspace with the weights
        with core.DeviceScope(device_option):
            workspace.RunNetOnce(init_net)

        # Cut the graph
        start = time.time()
        pred_net_cut = onnxifi_caffe2_net(pred_net,
                                          {input_name: input_blob_dims},
                                          infer_shapes=True)
        del init_net, pred_net
        #_print_net(pred_net_cut)

        Y_trt = None
        input_name = pred_net_cut.external_input[0]
        print("C2 runtime: {}s".format(c2_time))
        with core.DeviceScope(device_option):
            workspace.FeedBlob(input_name, data)
            workspace.CreateNet(pred_net_cut)
            end = time.time()
            print("Conversion time: {:.2f}s".format(end - start))

            start = time.time()
            for _ in range(repeat):
                workspace.RunNet(pred_net_cut.name)
            end = time.time()
            trt_time = end - start
            print("Onnxifi runtime: {}s, improvement: {}%".format(trt_time, (c2_time - trt_time) / c2_time * 100))
            output_values = [workspace.FetchBlob(name) for name in net_outputs]
            Y_trt = namedtupledict('Outputs', net_outputs)(*output_values)
        np.testing.assert_allclose(Y_c2, Y_trt, rtol=1e-3)