import unittest
import caffe2.python.hypothesis_test_util as hu
import numpy as np
from caffe2.python import core, workspace
class TestQuantile(hu.HypothesisTestCase):
def _test_quantile(self, inputs, quantile, abs, tol):
net = core.Net("test_net")
net.Proto().type = "dag"
input_tensors = []
for i, input in enumerate(inputs):
workspace.FeedBlob("t_{}".format(i), input)
input_tensors.append("t_{}".format(i))
net.Quantile(
input_tensors, ["quantile_value"], quantile=quantile, abs=abs, tol=tol
)
workspace.RunNetOnce(net)
quantile_value_blob = workspace.FetchBlob("quantile_value")
assert np.size(quantile_value_blob) == 1
quantile_value = quantile_value_blob[0]
input_cat = np.concatenate([input.flatten() for input in inputs])
input_cat = np.abs(input_cat) if abs == 1 else input_cat
target_cnt = np.ceil(np.size(input_cat) * quantile)
actual_cnt = np.sum(input_cat <= quantile_value)
# prune with return value will remove no less than
# "quantile" portion of elements
assert actual_cnt >= target_cnt
# Expect that (hi-lo) < tol * (|lo| + |hi|)
# if tol < 1.0 -> hi * lo > 0, then we are expecting
# 1. if hi >0,
# |hi|-|lo| < tol * (|lo| + |hi|)
# hi - lo < (2 tol) /(1 + tol) |hi| < 2 tol |hi|
# 2. if hi < 0,
# |lo|- |hi| < tol * (|lo| + |hi|)
# hi - lo < (2 tol) /(1 - tol) |hi| < 2.5 tol |hi| if tol < 0.2
quantile_value_lo = quantile_value - 2.5 * tol * np.abs(quantile_value)
lo_cnt = np.sum(input_cat <= quantile_value_lo)
# prune with a slightly smaller value will remove
# less than "quantile" portion of elements
assert lo_cnt <= target_cnt
def test_quantile_1(self):
inputs = []
num_tensors = 5
for i in range(num_tensors):
dim = np.random.randint(5, 100)
inputs.append(np.random.rand(dim))
self._test_quantile(inputs=inputs, quantile=0.2, abs=1, tol=1e-4)
def test_quantile_2(self):
inputs = []
num_tensors = 5
for i in range(num_tensors):
dim = np.random.randint(5, 100)
inputs.append(np.random.rand(dim))
self._test_quantile(inputs=inputs, quantile=1e-6, abs=0, tol=1e-3)
def test_quantile_3(self):
inputs = []
num_tensors = 5
for i in range(num_tensors):
dim1 = np.random.randint(5, 100)
dim2 = np.random.randint(5, 100)
inputs.append(np.random.rand(dim1, dim2))
self._test_quantile(inputs=inputs, quantile=1 - 1e-6, abs=1, tol=1e-5)
def test_quantile_4(self):
inputs = []
num_tensors = 5
for i in range(num_tensors):
dim1 = np.random.randint(5, 100)
dim2 = np.random.randint(5, 100)
inputs.append(np.random.rand(dim1, dim2))
inputs.append(np.random.rand(dim1))
self._test_quantile(inputs=inputs, quantile=0.168, abs=1, tol=1e-4)
if __name__ == "__main__":
global_options = ["caffe2"]
core.GlobalInit(global_options)
unittest.main()