Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

/ python / operator_test / gather_ranges_op_test.py



import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
import numpy as np
from caffe2.python import core, workspace
from hypothesis import given, settings, strategies as st


def batched_boarders_and_data(
    data_min_size=5,
    data_max_size=10,
    examples_min_number=1,
    examples_max_number=4,
    example_min_size=1,
    example_max_size=3,
    dtype=np.float32,
    elements=None,
):
    dims_ = st.tuples(
        st.integers(min_value=data_min_size, max_value=data_max_size),
        st.integers(min_value=examples_min_number, max_value=examples_max_number),
        st.integers(min_value=example_min_size, max_value=example_max_size),
    )
    return dims_.flatmap(
        lambda dims: st.tuples(
            hu.arrays(
                [dims[1], dims[2], 2],
                dtype=np.int32,
                elements=st.integers(min_value=0, max_value=dims[0]),
            ),
            hu.arrays([dims[0]], dtype, elements),
        )
    )


@st.composite
def _tensor_splits(draw):
    lengths = draw(st.lists(st.integers(1, 5), min_size=1, max_size=10))
    batch_size = draw(st.integers(1, 5))
    element_pairs = [
        (batch, r) for batch in range(batch_size) for r in range(len(lengths))
    ]
    perm = draw(st.permutations(element_pairs))
    perm = perm[:-1]  # skip one range
    ranges = [[(0, 0)] * len(lengths) for _ in range(batch_size)]
    offset = 0
    for pair in perm:
        ranges[pair[0]][pair[1]] = (offset, lengths[pair[1]])
        offset += lengths[pair[1]]

    data = draw(
        st.lists(
            st.floats(min_value=-1.0, max_value=1.0), min_size=offset, max_size=offset
        )
    )

    key = draw(st.permutations(range(offset)))

    return (
        np.array(data).astype(np.float32),
        np.array(ranges),
        np.array(lengths),
        np.array(key).astype(np.int64),
    )


@st.composite
def _bad_tensor_splits(draw):
    lengths = draw(st.lists(st.integers(4, 6), min_size=4, max_size=4))
    batch_size = 4
    element_pairs = [
        (batch, r) for batch in range(batch_size) for r in range(len(lengths))
    ]
    perm = draw(st.permutations(element_pairs))
    ranges = [[(0, 0)] * len(lengths) for _ in range(batch_size)]
    offset = 0

    # Inject some bad samples depending on the batch.
    # Batch 2: length is set to 0. This way, 25% of the samples are empty.
    # Batch 0-1: length is set to half the original length. This way, 50% of the
    # samples are of mismatched length.
    for pair in perm:
        if pair[0] == 2:
            length = 0
        elif pair[0] <= 1:
            length = lengths[pair[1]] // 2
        else:
            length = lengths[pair[1]]
        ranges[pair[0]][pair[1]] = (offset, length)
        offset += length

    data = draw(
        st.lists(
            st.floats(min_value=-1.0, max_value=1.0), min_size=offset, max_size=offset
        )
    )

    key = draw(st.permutations(range(offset)))

    return (
        np.array(data).astype(np.float32),
        np.array(ranges),
        np.array(lengths),
        np.array(key).astype(np.int64),
    )


def gather_ranges(data, ranges):
    lengths = []
    output = []
    for example_ranges in ranges:
        length = 0
        for range in example_ranges:
            assert len(range) == 2
            output.extend(data[range[0] : range[0] + range[1]])
            length += range[1]
        lengths.append(length)
    return output, lengths


def gather_ranges_to_dense(data, ranges, lengths):
    outputs = []
    assert len(ranges)
    batch_size = len(ranges)
    assert len(ranges[0])
    num_ranges = len(ranges[0])
    assert ranges.shape[2] == 2
    for i in range(num_ranges):
        out = []
        for j in range(batch_size):
            start, length = ranges[j][i]
            if not length:
                out.append([0] * lengths[i])
            else:
                assert length == lengths[i]
                out.append(data[start : start + length])
        outputs.append(np.array(out))
    return outputs


def gather_ranges_to_dense_with_key(data, ranges, key, lengths):
    outputs = []
    assert len(ranges)
    batch_size = len(ranges)
    assert len(ranges[0])
    num_ranges = len(ranges[0])
    assert ranges.shape[2] == 2
    for i in range(num_ranges):
        out = []
        for j in range(batch_size):
            start, length = ranges[j][i]
            if not length:
                out.append([0] * lengths[i])
            else:
                assert length == lengths[i]
                key_data_list = zip(
                    key[start : start + length], data[start : start + length]
                )
                sorted_key_data_list = sorted(key_data_list, key=lambda x: x[0])
                sorted_data = [d for (k, d) in sorted_key_data_list]
                out.append(sorted_data)
        outputs.append(np.array(out))
    return outputs


class TestGatherRanges(serial.SerializedTestCase):
    @given(boarders_and_data=batched_boarders_and_data(), **hu.gcs_cpu_only)
    @settings(deadline=1000)
    def test_gather_ranges(self, boarders_and_data, gc, dc):
        boarders, data = boarders_and_data

        def boarders_to_range(boarders):
            assert len(boarders) == 2
            boarders = sorted(boarders)
            return [boarders[0], boarders[1] - boarders[0]]

        ranges = np.apply_along_axis(boarders_to_range, 2, boarders)

        self.assertReferenceChecks(
            device_option=gc,
            op=core.CreateOperator(
                "GatherRanges", ["data", "ranges"], ["output", "lengths"]
            ),
            inputs=[data, ranges],
            reference=gather_ranges,
        )

    @given(tensor_splits=_tensor_splits(), **hu.gcs_cpu_only)
    @settings(deadline=1000)
    def test_gather_ranges_split(self, tensor_splits, gc, dc):
        data, ranges, lengths, _ = tensor_splits

        self.assertReferenceChecks(
            device_option=gc,
            op=core.CreateOperator(
                "GatherRangesToDense",
                ["data", "ranges"],
                ["X_{}".format(i) for i in range(len(lengths))],
                lengths=lengths,
            ),
            inputs=[data, ranges, lengths],
            reference=gather_ranges_to_dense,
        )

    @given(tensor_splits=_tensor_splits(), **hu.gcs_cpu_only)
    def test_gather_ranges_with_key_split(self, tensor_splits, gc, dc):
        data, ranges, lengths, key = tensor_splits

        self.assertReferenceChecks(
            device_option=gc,
            op=core.CreateOperator(
                "GatherRangesToDense",
                ["data", "ranges", "key"],
                ["X_{}".format(i) for i in range(len(lengths))],
                lengths=lengths,
            ),
            inputs=[data, ranges, key, lengths],
            reference=gather_ranges_to_dense_with_key,
        )

    def test_shape_and_type_inference(self):
        with hu.temp_workspace("shape_type_inf_int32"):
            net = core.Net("test_net")
            net.ConstantFill([], "ranges", shape=[3, 5, 2], dtype=core.DataType.INT32)
            net.ConstantFill([], "values", shape=[64], dtype=core.DataType.INT64)
            net.GatherRanges(["values", "ranges"], ["values_output", "lengths_output"])
            (shapes, types) = workspace.InferShapesAndTypes([net], {})

            self.assertEqual(shapes["values_output"], [64])
            self.assertEqual(types["values_output"], core.DataType.INT64)
            self.assertEqual(shapes["lengths_output"], [3])
            self.assertEqual(types["lengths_output"], core.DataType.INT32)

    @given(tensor_splits=_bad_tensor_splits(), **hu.gcs_cpu_only)
    @settings(deadline=10000)
    def test_empty_range_check(self, tensor_splits, gc, dc):
        data, ranges, lengths, key = tensor_splits

        workspace.FeedBlob("data", data)
        workspace.FeedBlob("ranges", ranges)
        workspace.FeedBlob("key", key)

        def getOpWithThreshold(
            min_observation=2, max_mismatched_ratio=0.5, max_empty_ratio=None
        ):
            return core.CreateOperator(
                "GatherRangesToDense",
                ["data", "ranges", "key"],
                ["X_{}".format(i) for i in range(len(lengths))],
                lengths=lengths,
                min_observation=min_observation,
                max_mismatched_ratio=max_mismatched_ratio,
                max_empty_ratio=max_empty_ratio,
            )

        workspace.RunOperatorOnce(getOpWithThreshold())

        workspace.RunOperatorOnce(
            getOpWithThreshold(max_mismatched_ratio=0.3, min_observation=50)
        )

        with self.assertRaises(RuntimeError):
            workspace.RunOperatorOnce(
                getOpWithThreshold(max_mismatched_ratio=0.3, min_observation=5)
            )

        with self.assertRaises(RuntimeError):
            workspace.RunOperatorOnce(
                getOpWithThreshold(min_observation=50, max_empty_ratio=0.01)
            )


if __name__ == "__main__":
    import unittest

    unittest.main()