Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

Version: 1.8.0 

/ python / operator_test / rand_quantization_op_test.py






import numpy as np
import struct
import unittest

from hypothesis import given, example
import hypothesis.strategies as st

from caffe2.python import core, workspace
import caffe2.python.hypothesis_test_util as hu

np.set_printoptions(precision=6)

class TestFloatToFusedRandRowwiseQuantized(hu.HypothesisTestCase):
    @given(X=hu.tensor(min_dim=2, max_dim=2,
                        min_value=1, max_value=17),  # only matrix is supported
           bitwidth_=st.sampled_from([1, 2, 4, 8]),
           random_=st.booleans(),
           **hu.gcs)
    @example(X=np.array([[0., 0., 0., 0.264019]]).astype(np.float32),
            bitwidth_=2,
            random_=False,
            **hu.gcs)
    @unittest.skip("Test is flaky, see https://github.com/pytorch/pytorch/issues/28550")
    def test_rand_quantization(self, X, bitwidth_, random_, gc, dc):

        # python reference of encoder
        def quantization_ref(X):
            in_shape = X.shape
            data_per_byte = 8 // bitwidth_
            output_cols = 10 + in_shape[1] // data_per_byte
            tail = 0
            if in_shape[1] % data_per_byte:
                output_cols += 1
                tail = data_per_byte - in_shape[1] % data_per_byte
            segment = output_cols - 10
            out = np.zeros((in_shape[0], output_cols), dtype=np.uint8)
            for r in range(in_shape[0]):
                out[r][0] = bitwidth_
                out[r][1] = tail
                min_fval = np.amin(X[r])
                max_fval = np.amax(X[r])
                min_bvals = bytearray(struct.pack("f", min_fval))
                max_bvals = bytearray(struct.pack("f", max_fval))
                for idx in range(4):
                    out[r][2 + idx] = min_bvals[idx]
                    out[r][6 + idx] = max_bvals[idx]
                for c in range(in_shape[1]):
                    fval = X[r][c]
                    gap = (max_fval - min_fval) / ((1 << bitwidth_) - 1)
                    thetimes = (fval - min_fval) / (gap + 1e-8)
                    thetimes = np.around(thetimes).astype(np.uint8)
                    out[r][10 + c % segment] += \
                        (thetimes << (bitwidth_ * (c // segment)))
            print("pY:\n{}\n".format(out))
            return (out,)

        # the maximum quantization error
        def get_allowed_errors(X):
            out = np.zeros_like(X)
            for r in range(X.shape[0]):
                min_fval = np.amin(X[r])
                max_fval = np.amax(X[r])
                gap = (max_fval - min_fval) / (2 ** bitwidth_ - 1) + 1e-8
                out[r] = gap
            return out

        # python reference of decoder
        def dec_ref(Y):
            in_shape = Y.shape
            bitwidth = Y[0][0]
            tail = Y[0][1]
            mask = np.array([(1 << bitwidth) - 1]).astype(np.uint8)[0]
            data_per_byte = 8 // bitwidth
            output_cols = (in_shape[1] - 10) * data_per_byte - tail
            segment = in_shape[1] - 10
            out = np.zeros((in_shape[0], output_cols), dtype=np.float)
            for r in range(in_shape[0]):
                min_fval = struct.unpack('f', Y[r][2:6])[0]
                max_fval = struct.unpack('f', Y[r][6:10])[0]
                print(min_fval, max_fval)
                gap = (max_fval - min_fval) / (2 ** bitwidth - 1.) + 1e-8
                for out_c in range(output_cols):
                    bit_start = (out_c // segment) * bitwidth
                    out[r][out_c] = min_fval + gap * \
                        ((Y[r][10 + out_c % segment] >> bit_start) & mask)
            print("pdecX:\n{}\n".format(out))
            return (out,)

        enc_op = core.CreateOperator(
            "FloatToFusedRandRowwiseQuantized",
            ["X"], ["Y"],
            bitwidth=bitwidth_,
            random=random_)
        dec_op = core.CreateOperator(
            "FusedRandRowwiseQuantizedToFloat",
            ["Y"], ["decX"])
        workspace.FeedBlob("X", X)
        workspace.RunOperatorOnce(enc_op)
        print("X:\n{}\n".format(workspace.FetchBlob("X")))
        Y = workspace.FetchBlob("Y")
        print("Y:\n{}\n".format(Y))

        pY = quantization_ref(X)[0]
        pdecX = dec_ref(Y)[0]

        # The equality check of encoded floating values in bytes may occasionally fail
        # because of precision.
        # Refer to the format of floating values here:
        #   https://en.wikipedia.org/wiki/Single-precision_floating-point_format
        # Instead of using self.assertReferenceChecks(gc, op, [X], quantization_ref),
        # we do the following
        if not random_:
            for r in range(Y.shape[0]):
                # compare min
                np.testing.assert_almost_equal(
                    struct.unpack('f', Y[r][2:6])[0],
                    struct.unpack('f', pY[r][2:6])[0])
                # compare max
                np.testing.assert_almost_equal(
                    struct.unpack('f', Y[r][6:10])[0],
                    struct.unpack('f', pY[r][6:10])[0])
            np.testing.assert_array_equal(Y[:, 0:2], pY[:, 0:2])
            np.testing.assert_array_equal(Y[:, 10:], pY[:, 10:])

        # check decoded floating values are within error thresholds
        workspace.RunOperatorOnce(dec_op)
        decX = workspace.FetchBlob("decX")
        print("decX:\n{}\n".format(decX))
        if random_:
            err_thre = get_allowed_errors(X) + 1e-6
        else:
            err_thre = get_allowed_errors(X) / 2.0 + 1e-6
        err = decX - X
        print("err_thre:\n{}\n".format(err_thre))
        print("err:\n{}\n".format(err))
        np.testing.assert_almost_equal(decX, pdecX, decimal=6)

        np.testing.assert_array_less(
            np.absolute(err),
            err_thre)

        # test the expectation of stochastic quantized values
        # are near to the floating values
        # Warning: it can fail the unit test with small probability
        test_stochastic_quantization = True
        if random_ and test_stochastic_quantization:
            X_sum = np.zeros_like(X)
            test_times = 2000
            for _ in range(test_times):
                workspace.RunOperatorOnce(enc_op)
                workspace.RunOperatorOnce(dec_op)
                X_sum += workspace.FetchBlob("decX")
            X_avg = X_sum / test_times
            print("X_avg:\n{}".format(X_avg))
            print("X    :\n{}".format(X))
            np.testing.assert_array_less(
                np.absolute(X_avg - X),
                5e-2 * get_allowed_errors(X) + 1e-4
            )

        # Check over multiple devices -- CUDA implementation is pending
        # self.assertDeviceChecks(dc, op, [X], [0])


if __name__ == "__main__":
    import unittest
    unittest.main()