Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

Version: 1.8.0 

/ python / operator_test / bisect_percentile_op_test.py






import hypothesis.strategies as st

from caffe2.python import core, workspace
from hypothesis import given
import caffe2.python.hypothesis_test_util as hu

import bisect
import numpy as np


class TestBisectPercentileOp(hu.HypothesisTestCase):
    def compare_reference(
            self,
            raw_data,
            pct_raw_data,
            pct_mapping,
            pct_upper,
            pct_lower,
            lengths,
    ):
        def bisect_percentile_op_ref(
            raw_data,
            pct_raw_data,
            pct_mapping,
            pct_lower,
            pct_upper,
            lengths
        ):
            results = np.zeros_like(raw_data)
            indices = [0]
            for j in range(len(lengths)):
                indices.append(indices[j] + lengths[j])
            for i in range(len(raw_data)):
                for j in range(len(raw_data[0])):
                    start = indices[j]
                    end = indices[j + 1]
                    val = raw_data[i][j]
                    pct_raw_data_i = pct_raw_data[start:end]
                    pct_lower_i = pct_lower[start:end]
                    pct_upper_i = pct_upper[start:end]
                    pct_mapping_i = pct_mapping[start:end]

                    # Corner cases
                    if val < pct_raw_data_i[0]:
                        results[i][j] = 0
                        continue
                    if val > pct_raw_data_i[-1]:
                        results[i][j] = 1.
                        continue

                    # interpolation
                    k = bisect.bisect_left(pct_raw_data_i, val)
                    if pct_raw_data_i[k] == val:
                        results[i][j] = pct_mapping_i[k]
                    else:
                        k = k - 1
                        slope = ((pct_lower_i[k + 1] - pct_upper_i[k])
                            / (pct_raw_data_i[k + 1] - pct_raw_data_i[k]))
                        results[i][j] = pct_upper_i[k] + \
                            slope * (val - pct_raw_data_i[k])

            return results

        workspace.ResetWorkspace()
        workspace.FeedBlob("raw_data", raw_data)

        op = core.CreateOperator(
            "BisectPercentile",
            ["raw_data"],
            ["pct_output"],
            percentile_raw=pct_raw_data,
            percentile_mapping=pct_mapping,
            percentile_lower=pct_lower,
            percentile_upper=pct_upper,
            lengths=lengths
        )
        workspace.RunOperatorOnce(op)

        expected_output = bisect_percentile_op_ref(
            raw_data,
            pct_raw_data,
            pct_mapping,
            pct_lower,
            pct_upper,
            lengths
        )
        output = workspace.blobs['pct_output']
        np.testing.assert_array_almost_equal(output, expected_output)

    def test_bisect_percentil_op_simple(self):
        raw_data = np.array([
            [1, 1],
            [2, 2],
            [3, 3],
            [3, 1],
            [9, 10],
            [1.5, 5],
            [1.32, 2.4],
            [2.9, 5.7],
            [-1, -1],
            [3, 7]
        ], dtype=np.float32)
        pct_raw_data = np.array([1, 2, 3, 2, 7], dtype=np.float32)
        pct_lower = np.array([0.1, 0.2, 0.9, 0.1, 0.5], dtype=np.float32)
        pct_upper = np.array([0.1, 0.8, 1.0, 0.4, 1.0], dtype=np.float32)
        pct_mapping = np.array([0.1, 0.5, 0.95, 0.25, 0.75], dtype=np.float32)
        lengths = np.array([3, 2], dtype=np.int32)
        self.compare_reference(
            raw_data, pct_raw_data, pct_mapping, pct_lower, pct_upper, lengths)

    @given(
        N=st.integers(min_value=20, max_value=100),
        lengths=st.lists(
            elements=st.integers(min_value=2, max_value=10),
            min_size=2,
            max_size=5,
        ),
        max_value=st.integers(min_value=100, max_value=1000),
        discrete=st.booleans(),
        p=st.floats(min_value=0, max_value=0.9),
        **hu.gcs_cpu_only
    )
    def test_bisect_percentil_op_large(
        self, N, lengths, max_value, discrete, p, gc, dc
    ):
        lengths = np.array(lengths, dtype=np.int32)
        D = len(lengths)

        if discrete:
            raw_data = np.random.randint(0, max_value, size=(N, D))
        else:
            raw_data = np.random.randn(N, D)

        # To generate valid pct_lower and pct_upper
        pct_lower = []
        pct_upper = []
        pct_raw_data = []
        for i in range(D):
            pct_lower_val = 0.
            pct_upper_val = 0.
            pct_lower_cur = []
            pct_upper_cur = []
            # There is no duplicated values in pct_raw_data
            if discrete:
                pct_raw_data_cur = np.random.choice(
                    np.arange(max_value), size=lengths[i], replace=False)
            else:
                pct_raw_data_cur = np.random.randn(lengths[i])
                while len(set(pct_raw_data_cur)) < lengths[i]:
                    pct_raw_data_cur = np.random.randn(lengths[i])
            pct_raw_data_cur = np.sort(pct_raw_data_cur)
            for _ in range(lengths[i]):
                pct_lower_val = pct_upper_val + 0.01
                pct_lower_cur.append(pct_lower_val)
                pct_upper_val = pct_lower_val + \
                    0.01 * np.random.randint(1, 20) * (np.random.uniform() < p)
                pct_upper_cur.append(pct_upper_val)
            # normalization
            pct_lower_cur = np.array(pct_lower_cur, np.float32) / pct_upper_val
            pct_upper_cur = np.array(pct_upper_cur, np.float32) / pct_upper_val
            pct_lower.extend(pct_lower_cur)
            pct_upper.extend(pct_upper_cur)
            pct_raw_data.extend(pct_raw_data_cur)

        pct_lower = np.array(pct_lower, dtype=np.float32)
        pct_upper = np.array(pct_upper, dtype=np.float32)
        pct_mapping = (pct_lower + pct_upper) / 2.
        raw_data = np.array(raw_data, dtype=np.float32)
        pct_raw_data = np.array(pct_raw_data, dtype=np.float32)

        self.compare_reference(
            raw_data, pct_raw_data, pct_mapping, pct_lower, pct_upper, lengths)


if __name__ == "__main__":
    import unittest
    unittest.main()