Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

/ quantization / server / conv_groupwise_dnnlowp_acc16_op_test.py



import collections

import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
from caffe2.python import core, dyndep, utils, workspace
from caffe2.quantization.server import utils as dnnlowp_utils
from caffe2.quantization.server.dnnlowp_test_utils import (
    check_quantized_results_close,
    run_conv_or_fc
)
from hypothesis import assume, given, settings


dyndep.InitOpsLibrary("//caffe2/caffe2/quantization/server:dnnlowp_ops")
workspace.GlobalInit(
    [
        "caffe2",
        "--caffe2_omp_num_threads=11",
        # Increase this threshold to test acc16 with randomly generated data
        "--caffe2_dnnlowp_acc16_density_threshold=0.5",
    ]
)


class GroupWiseDNNLowPOpConvAcc16OpTest(hu.HypothesisTestCase):
    # correctness test with no quantization error in inputs
    @given(
        stride=st.integers(1, 2),
        pad=st.integers(0, 2),
        kernel=st.integers(1, 5),
        dilation=st.integers(1, 2),
        size=st.integers(10, 16),
        group=st.integers(1, 4),
        input_channels_per_group=st.sampled_from([2, 3, 4, 5, 8, 16, 32]),
        output_channels_per_group=st.integers(2, 16),
        batch_size=st.integers(0, 3),
        order=st.sampled_from(["NCHW", "NHWC"]),
        share_col_buffer=st.booleans(),
        preserve_activation_sparsity=st.booleans(),
        preserve_weight_sparsity=st.booleans(),
        **hu.gcs_cpu_only
    )
    @settings(deadline=None)
    def test_groupwise_dnnlowp_conv_acc16_int(
        self,
        stride,
        pad,
        kernel,
        dilation,
        size,
        group,
        input_channels_per_group,
        output_channels_per_group,
        batch_size,
        order,
        share_col_buffer,
        preserve_activation_sparsity,
        preserve_weight_sparsity,
        gc,
        dc,
    ):
        assume(group == 1 or dilation == 1)
        assume(size >= dilation * (kernel - 1) + 1)

        input_channels = input_channels_per_group * group
        output_channels = output_channels_per_group * group

        # X and W have scale 1, so exactly represented after quantization
        # This was made sure by having at least one 0 and one 255 for unsigned
        # 8-bit tensors, and at least one -128 and one 127 for signed 8-bit
        # tensors.
        # Since fbgemm_acc16 accumulates to 16-bit, To avoid overflow, we use
        # small numbers except for those 0, 255, -128, and 127, for this test
        # We also make sure 255, -128, or 127 are not multiplied together by
        # putting them in different input channels and the corresponding input
        # channel in other matrix is 0.
        # For example, we put 255 in input channel 1 in X, so we make the
        # corresponding input channel in W all zeros.
        X_min = 0 if preserve_activation_sparsity else -77
        X_max = X_min + 255
        X = np.random.rand(batch_size, size, size, input_channels) * 4 + X_min
        X = np.round(X).astype(np.float32)
        X[..., 0] = X_min
        if batch_size != 0:
            X[0, 0, 0, 1] = X_max

        if preserve_weight_sparsity:
            W_min = -128
            W_max = 100
        else:
            W_min = -100
            W_max = W_min + 255
        W = (
            np.random.rand(output_channels, kernel, kernel, input_channels_per_group)
            * 4
            - 2
            + W_min
            + 128
        )
        W = np.round(W).astype(np.float32)
        W[..., 1] = W_min + 128  # "zeros"
        for g in range(group):
            W[g * output_channels_per_group, 0, 0, 0] = W_min
            W[g * output_channels_per_group + 1, 0, 0, 0] = W_max
            if not preserve_weight_sparsity:
                W[
                    g * output_channels_per_group : (g + 1) * output_channels_per_group,
                ] += g

        if order == "NCHW":
            X = utils.NHWC2NCHW(X)
            W = utils.NHWC2NCHW(W)

        # No input quantization error in bias
        b = np.round(np.random.randn(output_channels)).astype(np.float32)

        Output = collections.namedtuple("Output", ["Y", "op_type", "engine", "order"])
        outputs = []

        op_engine_list = [
            ("Conv", ""),
            ("Conv", "DNNLOWP_ACC16"),
            ("Int8Conv", "DNNLOWP_ACC16"),
        ]

        for op_type, engine in op_engine_list:
            net = core.Net("test_net")

            do_quantize = "DNNLOWP" in engine
            do_dequantize = "DNNLOWP" in engine

            if do_quantize:
                quantize = core.CreateOperator(
                    "Quantize",
                    ["X"],
                    ["X_q"],
                    preserve_activation_sparsity=preserve_activation_sparsity,
                    engine="DNNLOWP",
                    device_option=gc,
                )
                net.Proto().op.extend([quantize])

            conv = core.CreateOperator(
                op_type,
                ["X_q" if do_quantize else "X", "W", "b"],
                ["Y_q" if do_dequantize else "Y"],
                stride=stride,
                kernel=kernel,
                dilation=dilation,
                pad=pad,
                order=order,
                shared_buffer=(1 if share_col_buffer else 0),
                preserve_activation_sparsity=preserve_activation_sparsity,
                preserve_weight_sparsity=preserve_weight_sparsity,
                engine=engine,
                group=group,
                quantize_groupwise=1,
                device_option=gc,
            )
            if do_dequantize:
                # groupwise quantization only works with static quantization
                # so we need to set quantization parameters
                dnnlowp_utils.add_quantization_param_args(
                    conv, outputs[0][0], preserve_activation_sparsity
                )
            net.Proto().op.extend([conv])

            if do_dequantize:
                dequantize = core.CreateOperator(
                    "Dequantize", ["Y_q"], ["Y"], engine="DNNLOWP", device_option=gc
                )
                net.Proto().op.extend([dequantize])

            run_conv_or_fc(
                self, None, net, X, W, b, op_type, engine, order, gc, outputs
            )

        check_quantized_results_close(outputs, symmetric=preserve_activation_sparsity)

    @given(
        stride=st.integers(1, 2),
        pad=st.integers(0, 2),
        kernel=st.integers(1, 5),
        dilation=st.integers(1, 2),
        size=st.integers(10, 16),
        group=st.integers(1, 4),
        input_channels_per_group=st.sampled_from([2, 3, 4, 5, 8, 16, 32]),
        output_channels_per_group=st.integers(2, 16),
        batch_size=st.integers(0, 3),
        order=st.sampled_from(["NHWC"]),
        prepack_weight=st.booleans(),
        nbits_in_non_outlier=st.sampled_from((0, 1, 6, 8)),
        share_col_buffer=st.booleans(),
        **hu.gcs_cpu_only
    )
    def test_groupwise_dnnlowp_conv_acc16_outlier(
        self,
        stride,
        pad,
        kernel,
        dilation,
        size,
        group,
        input_channels_per_group,
        output_channels_per_group,
        batch_size,
        order,
        prepack_weight,
        nbits_in_non_outlier,
        share_col_buffer,
        gc,
        dc,
    ):
        assume(group == 1 or dilation == 1)
        assume(size >= dilation * (kernel - 1) + 1)

        input_channels = input_channels_per_group * group
        output_channels = output_channels_per_group * group

        X_min = -77
        X_max = X_min + 255
        X = np.random.rand(batch_size, size, size, input_channels) * 4 + X_min
        X = np.round(X).astype(np.float32)
        X[..., 0] = X_min
        if batch_size != 0:
            X[0, 0, 0, 1] = X_max

        W_min = -100
        W_max = W_min + 255
        W = (
            np.random.rand(output_channels, kernel, kernel, input_channels_per_group)
            * 4
            - 2
            + W_min
            + 128
        )
        W = np.round(W).astype(np.float32)
        W[..., 1] = W_min + 128  # "zeros"
        for g in range(group):
            W[g * output_channels_per_group, 0, 0, 0] = W_min
            W[g * output_channels_per_group + 1, 0, 0, 0] = W_max
            W[g * output_channels_per_group : (g + 1) * output_channels_per_group,] += g

        if order == "NCHW":
            X = utils.NHWC2NCHW(X)
            W = utils.NHWC2NCHW(W)

        b = np.round(np.random.randn(output_channels)).astype(np.float32)

        Output = collections.namedtuple("Output", ["Y", "op_type", "engine", "order"])
        outputs = []

        op_engine_list = [
            ("Conv", ""),
            ("Conv", "DNNLOWP_ACC16"),
            ("Int8Conv", "DNNLOWP_ACC16"),
        ]

        for op_type, engine in op_engine_list:
            init_net = core.Net("test_init_net")
            net = core.Net("test_net")

            do_quantize = "DNNLOWP" in engine
            do_dequantize = "DNNLOWP" in engine
            do_prepack_weight = "DNNLOWP" in engine and prepack_weight

            if do_quantize:
                quantize = core.CreateOperator(
                    "Quantize", ["X"], ["X_q"], engine="DNNLOWP", device_option=gc
                )
                net.Proto().op.extend([quantize])

            if do_prepack_weight:
                X_min = 0 if X.size == 0 else X.min()
                X_max = 0 if X.size == 0 else X.max()
                x_q_param = dnnlowp_utils.choose_quantization_params(X_min, X_max)
                inputs = ["W"]
                if do_dequantize:
                    inputs += ["b"]
                pack = core.CreateOperator(
                    "Int8ConvPackWeight",
                    inputs,
                    ["W_packed"],
                    stride=stride,
                    kernel=kernel,
                    dilation=dilation,
                    pad=pad,
                    nbits_in_non_outlier=nbits_in_non_outlier,
                    engine=engine,
                    group=group,
                    quantize_groupwise=1,
                    in_scale=x_q_param.scale,
                )
                init_net.Proto().op.extend([pack])

            conv = core.CreateOperator(
                op_type,
                [
                    "X_q" if do_quantize else "X",
                    "W_packed" if do_prepack_weight else "W",
                    "b",
                ],
                ["Y_q" if do_dequantize else "Y"],
                stride=stride,
                kernel=kernel,
                dilation=dilation,
                pad=pad,
                order=order,
                nbits_in_non_outlier=nbits_in_non_outlier,
                shared_buffer=(1 if share_col_buffer else 0),
                engine=engine,
                group=group,
                quantize_groupwise=1,
                device_option=gc,
            )
            if do_dequantize or do_prepack_weight:
                # groupwise quantization only works with static quantization
                # so we need to set quantization parameters
                dnnlowp_utils.add_quantization_param_args(conv, outputs[0][0])
            net.Proto().op.extend([conv])

            if do_dequantize:
                dequantize = core.CreateOperator(
                    "Dequantize", ["Y_q"], ["Y"], engine="DNNLOWP", device_option=gc
                )
                net.Proto().op.extend([dequantize])

            run_conv_or_fc(
                self, init_net, net, X, W, b, op_type, engine, order, gc, outputs
            )

        check_quantized_results_close(outputs)