Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

/ python / operator_test / specialized_segment_ops_test.py



import unittest

from caffe2.proto import caffe2_pb2
from caffe2.python import core
import caffe2.python.hip_test_util as hiputl
import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
from hypothesis import given, assume, settings


class TestSpecializedSegmentOps(hu.HypothesisTestCase):
    @given(
        batchsize=st.integers(1, 20),
        fptype=st.sampled_from([np.float16, np.float32]),
        fp16asint=st.booleans(),
        blocksize=st.sampled_from([8, 16, 32, 64, 85, 96, 128, 163]),
        normalize_by_lengths=st.booleans(),
        empty_indices=st.booleans(),
        **hu.gcs
    )
    def test_sparse_lengths_sum_cpu(
        self,
        batchsize,
        fptype,
        fp16asint,
        blocksize,
        normalize_by_lengths,
        empty_indices,
        gc,
        dc,
    ):
        if fptype != np.float32:
            assume(gc.device_type == caffe2_pb2.CPU)
            assume(not hiputl.run_in_hip(gc, dc))
            assume(caffe2_pb2.CUDA not in {d.device_type for d in dc})

        if normalize_by_lengths:
            print("<test_sparse_lengths_sum_mean_cpu>")
        else:
            print("<test_sparse_lengths_sum_cpu>")

        tblsize = 300
        if fptype == np.float32:
            Tbl = np.random.rand(tblsize, blocksize).astype(np.float32)
            atol = 1e-5
        else:
            if fp16asint:
                Tbl = (
                    (10.0 * np.random.rand(tblsize, blocksize))
                    .round()
                    .astype(np.float16)
                )
                atol = 1e-3
            else:
                Tbl = np.random.rand(tblsize, blocksize).astype(np.float16)
                atol = 1e-1

        # array of each row length
        if empty_indices:
            Lengths = np.zeros(batchsize, dtype=np.int32)
        else:
            Lengths = np.random.randint(1, 30, size=batchsize, dtype=np.int32)
        # flat indices
        Indices = np.random.randint(0, tblsize, size=sum(Lengths), dtype=np.int64)

        op = core.CreateOperator(
            "SparseLengths" + ("Mean" if normalize_by_lengths else "Sum"),
            ["Tbl", "Indices", "Lengths"],
            "out",
        )

        def sparse_lengths_sum_ref(Tbl, Indices, Lengths):
            rptr = np.cumsum(np.insert(Lengths, [0], [0]))
            out = np.zeros((len(Lengths), blocksize))
            if normalize_by_lengths:
                for i in range(0, len(rptr[0:-1])):
                    if Lengths[i] != 0:
                        out[i] = (
                            Tbl[Indices[rptr[i] : rptr[i + 1]]].sum(axis=0)
                            * 1.0
                            / float(Lengths[i])
                        )
            else:
                for i in range(0, len(rptr[0:-1])):
                    out[i] = Tbl[Indices[rptr[i] : rptr[i + 1]]].sum(axis=0)

            return [out.astype(np.float32)]

        self.assertReferenceChecks(
            gc,
            op,
            [Tbl, Indices, Lengths],
            sparse_lengths_sum_ref,
            threshold=1e-3,
            atol=atol,
        )

    @given(
        batchsize=st.integers(1, 20),
        fptype=st.sampled_from([np.float16, np.float32]),
        fp16asint=st.booleans(),
        blocksize=st.sampled_from([8, 16, 32, 64, 85, 96, 128, 163]),
        empty_indices=st.booleans(),
        **hu.gcs
    )
    def test_sparse_lengths_weightedsum_cpu(
        self, batchsize, fptype, fp16asint, blocksize, empty_indices, gc, dc
    ):
        if fptype != np.float32:
            assume(gc.device_type == caffe2_pb2.CPU)
            assume(not hiputl.run_in_hip(gc, dc))
            assume(caffe2_pb2.CUDA not in {d.device_type for d in dc})

        print("<test_sparse_lengths_weightedsum_cpu>")

        tblsize = 300
        if fptype == np.float32:
            Tbl = np.random.rand(tblsize, blocksize).astype(np.float32)
            atol = 1e-5
        else:
            if fp16asint:
                Tbl = (
                    (10.0 * np.random.rand(tblsize, blocksize))
                    .round()
                    .astype(np.float16)
                )
                atol = 1e-3
            else:
                Tbl = np.random.rand(tblsize, blocksize).astype(np.float16)
                atol = 1e-1

        # array of each row length
        if empty_indices:
            Lengths = np.zeros(batchsize, dtype=np.int32)
        else:
            Lengths = np.random.randint(1, 30, size=batchsize, dtype=np.int32)
        # flat indices
        Indices = np.random.randint(0, tblsize, size=sum(Lengths), dtype=np.int64)
        Weights = np.random.rand(sum(Lengths)).astype(np.float32)

        op = core.CreateOperator(
            "SparseLengthsWeightedSum", ["Tbl", "Weights", "Indices", "Lengths"], "out"
        )

        def sparse_lengths_weightedsum_ref(Tbl, Weights, Indices, Lengths):
            rptr = np.cumsum(np.insert(Lengths, [0], [0]))
            out = np.zeros((len(Lengths), blocksize))
            for i in range(0, len(rptr[0:-1])):
                w = Weights[rptr[i] : rptr[i + 1]]
                out[i] = (Tbl[Indices[rptr[i] : rptr[i + 1]]] * w[:, np.newaxis]).sum(
                    axis=0
                )
            return [out.astype(np.float32)]

        self.assertReferenceChecks(
            gc,
            op,
            [Tbl, Weights, Indices, Lengths],
            sparse_lengths_weightedsum_ref,
            threshold=1e-3,
            atol=atol,
        )

    @given(
        batchsize=st.integers(1, 20),
        blocksize=st.sampled_from([8, 16, 17, 26, 32, 64, 85, 96, 128, 148, 163]),
        normalize_by_lengths=st.booleans(),
        empty_indices=st.booleans(),
        **hu.gcs_cpu_only
    )
    def test_sparse_lengths_weightedsum_8BitsRowwiseOp_cpu(
        self, batchsize, blocksize, normalize_by_lengths, empty_indices, gc, dc
    ):
        if normalize_by_lengths:
            print(
                "<test_sparse_lengths_weightedsum_SparseLengthsWeightedMean8BitsRowwise_cpu>"
            )
        else:
            print(
                "<test_sparse_lengths_weightedsum_SparseLengthsWeightedSum8BitsRowwise_cpu>"
            )

        tblsize = 300
        Tbl = np.random.randint(7, size=(tblsize, blocksize), dtype=np.uint8)
        atol = 1e-5

        # array of each row length
        if empty_indices:
            Lengths = np.zeros(batchsize, dtype=np.int32)
        else:
            Lengths = np.random.randint(1, 30, size=batchsize, dtype=np.int32)
        # flat indices
        Indices = np.random.randint(0, tblsize, size=sum(Lengths), dtype=np.int64)
        Weights = np.random.rand(sum(Lengths)).astype(np.float32)
        Scale_Bias = np.random.rand(tblsize, 2).astype(np.float32)

        op = core.CreateOperator(
            "SparseLengthsWeighted"
            + ("Mean" if normalize_by_lengths else "Sum")
            + "8BitsRowwise",
            ["Tbl", "Weights", "Indices", "Lengths", "Scale_Bias"],
            "out",
        )

        def sparse_lengths_weightedsum_8BitsRowwiseOp_cpu_ref(
            Tbl, Weights, Indices, Lengths, Scale_Bias
        ):
            rptr = np.cumsum(np.insert(Lengths, [0], [0]))
            out = np.zeros((len(Lengths), blocksize))
            for i in range(0, len(rptr[0:-1])):
                w = Weights[rptr[i] : rptr[i + 1]]
                s = Scale_Bias[Indices[rptr[i] : rptr[i + 1]], 0][:, np.newaxis]
                b = Scale_Bias[Indices[rptr[i] : rptr[i + 1]], 1][:, np.newaxis]
                f = 1.0
                if normalize_by_lengths and Lengths[i] != 0:
                    f = 1.0 / float(Lengths[i])
                out[i] = (
                    w[:, np.newaxis] * (s * Tbl[Indices[rptr[i] : rptr[i + 1]]] + b)
                ).sum(axis=0) * f
            return [out.astype(np.float32)]

        self.assertReferenceChecks(
            gc,
            op,
            [Tbl, Weights, Indices, Lengths, Scale_Bias],
            sparse_lengths_weightedsum_8BitsRowwiseOp_cpu_ref,
            threshold=1e-3,
            atol=atol,
        )

    @given(
        batchsize=st.integers(1, 20),
        blocksize=st.sampled_from([8, 16, 17, 26, 32, 64, 85, 96, 128, 148, 163]),
        normalize_by_lengths=st.booleans(),
        empty_indices=st.booleans(),
        **hu.gcs_cpu_only
    )
    def test_sparse_lengths_sum_8BitsRowwiseOp_cpu(
        self, batchsize, blocksize, normalize_by_lengths, empty_indices, gc, dc
    ):
        if normalize_by_lengths:
            print("<test_sparse_lengths_sum_SparseLengthsMean8BitsRowwise_cpu>")
        else:
            print("<test_sparse_lengths_sum_SparseLengthsSum8BitsRowwise_cpu>")

        tblsize = 300
        Tbl = np.random.randint(7, size=(tblsize, blocksize), dtype=np.uint8)
        atol = 1e-5

        # array of each row length
        if empty_indices:
            Lengths = np.zeros(batchsize, dtype=np.int32)
        else:
            Lengths = np.random.randint(1, 30, size=batchsize, dtype=np.int32)
        # flat indices
        Indices = np.random.randint(0, tblsize, size=sum(Lengths), dtype=np.int64)
        Scale_Bias = np.random.rand(tblsize, 2).astype(np.float32)

        op = core.CreateOperator(
            "SparseLengths"
            + ("Mean" if normalize_by_lengths else "Sum")
            + "8BitsRowwise",
            ["Tbl", "Indices", "Lengths", "Scale_Bias"],
            "out",
        )

        def sparse_lengths_sum_8BitsRowwiseOp_cpu_reg(
            Tbl, Indices, Lengths, Scale_Bias
        ):
            rptr = np.cumsum(np.insert(Lengths, [0], [0]))
            out = np.zeros((len(Lengths), blocksize))
            for i in range(0, len(rptr[0:-1])):
                s = Scale_Bias[Indices[rptr[i] : rptr[i + 1]], 0][:, np.newaxis]
                b = Scale_Bias[Indices[rptr[i] : rptr[i + 1]], 1][:, np.newaxis]
                f = 1.0
                if normalize_by_lengths and Lengths[i] != 0:
                    f = 1.0 / float(Lengths[i])
                out[i] = (s * Tbl[Indices[rptr[i] : rptr[i + 1]]] + b).sum(axis=0) * f
            return [out.astype(np.float32)]

        self.assertReferenceChecks(
            gc,
            op,
            [Tbl, Indices, Lengths, Scale_Bias],
            sparse_lengths_sum_8BitsRowwiseOp_cpu_reg,
            threshold=1e-3,
            atol=atol,
        )

    @given(
        batchsize=st.integers(1, 20),
        blocksize=st.sampled_from([8, 16, 17, 26, 32, 64, 85, 96, 128, 148, 163]),
        normalize_by_lengths=st.booleans(),
        **hu.gcs_cpu_only
    )
    @settings(deadline=10000)
    def test_sparse_lengths_sum_8BitsRowwiseOp_cpu_invalid_index(
        self, batchsize, blocksize, normalize_by_lengths, gc, dc
    ):

        tblsize = 300
        Tbl = np.random.randint(7, size=(tblsize, blocksize), dtype=np.uint8)

        # array of each row length
        Lengths = np.random.randint(1, 30, size=batchsize, dtype=np.int32)
        # flat indices
        Indices = np.random.randint(0, tblsize, size=sum(Lengths), dtype=np.int64)
        Indices[0] += 1000
        Scale_Bias = np.random.rand(tblsize, 2).astype(np.float32)

        op = core.CreateOperator(
            "SparseLengths"
            + ("Mean" if normalize_by_lengths else "Sum")
            + "8BitsRowwise",
            ["Tbl", "Indices", "Lengths", "Scale_Bias"],
            "out",
        )

        self.ws.create_blob("Tbl").feed(Tbl)
        self.ws.create_blob("Indices").feed(Indices)
        self.ws.create_blob("Lengths").feed(Lengths)
        self.ws.create_blob("Scale_Bias").feed(Scale_Bias)
        with self.assertRaises(RuntimeError):
            self.ws.run(op)


if __name__ == "__main__":
    unittest.main()