import struct
import unittest
import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
import torch
from caffe2.python import core, workspace
from hypothesis import given, settings
from scipy.stats import norm
def generate_rois(roi_counts, im_dims):
assert len(roi_counts) == len(im_dims)
all_rois = []
for i, num_rois in enumerate(roi_counts):
if num_rois == 0:
continue
# [batch_idx, x1, y1, x2, y2]
rois = np.random.uniform(0, im_dims[i], size=(roi_counts[i], 5)).astype(
np.float32
)
rois[:, 0] = i # batch_idx
# Swap (x1, x2) if x1 > x2
rois[:, 1], rois[:, 3] = (
np.minimum(rois[:, 1], rois[:, 3]),
np.maximum(rois[:, 1], rois[:, 3]),
)
# Swap (y1, y2) if y1 > y2
rois[:, 2], rois[:, 4] = (
np.minimum(rois[:, 2], rois[:, 4]),
np.maximum(rois[:, 2], rois[:, 4]),
)
all_rois.append(rois)
if len(all_rois) > 0:
return np.vstack(all_rois)
return np.empty((0, 5)).astype(np.float32)
def generate_rois_rotated(roi_counts, im_dims):
rois = generate_rois(roi_counts, im_dims)
# [batch_id, ctr_x, ctr_y, w, h, angle]
rotated_rois = np.empty((rois.shape[0], 6)).astype(np.float32)
rotated_rois[:, 0] = rois[:, 0] # batch_id
rotated_rois[:, 1] = (rois[:, 1] + rois[:, 3]) / 2.0 # ctr_x = (x1 + x2) / 2
rotated_rois[:, 2] = (rois[:, 2] + rois[:, 4]) / 2.0 # ctr_y = (y1 + y2) / 2
rotated_rois[:, 3] = rois[:, 3] - rois[:, 1] + 1.0 # w = x2 - x1 + 1
rotated_rois[:, 4] = rois[:, 4] - rois[:, 2] + 1.0 # h = y2 - y1 + 1
rotated_rois[:, 5] = np.random.uniform(-90.0, 90.0) # angle in degrees
return rotated_rois
def create_bbox_transform_inputs(roi_counts, num_classes, rotated):
batch_size = len(roi_counts)
total_rois = sum(roi_counts)
im_dims = np.random.randint(100, 600, batch_size)
rois = (
generate_rois_rotated(roi_counts, im_dims)
if rotated
else generate_rois(roi_counts, im_dims)
)
box_dim = 5 if rotated else 4
deltas = np.random.randn(total_rois, box_dim * num_classes).astype(np.float32)
im_info = np.zeros((batch_size, 3)).astype(np.float32)
im_info[:, 0] = im_dims
im_info[:, 1] = im_dims
im_info[:, 2] = 1.0
return rois, deltas, im_info
# Eigen/Python round 0.5 away from 0, Numpy rounds to even
round_to_nearest = np.vectorize(round)
def bytes_to_floats(byte_matrix):
floats = np.empty([np.shape(byte_matrix)[0], 1], dtype=np.float32)
for i, byte_values in enumerate(byte_matrix):
(floats[i],) = struct.unpack("f", bytearray(byte_values))
return floats
def floats_to_bytes(floats):
byte_matrix = np.empty([np.shape(floats)[0], 4], dtype=np.uint8)
for i, value in enumerate(floats):
assert isinstance(value, np.float32), (value, floats)
as_bytes = struct.pack("f", value)
# In Python3 bytes will be a list of int, in Python2 a list of string
if isinstance(as_bytes[0], int):
byte_matrix[i] = list(as_bytes)
else:
byte_matrix[i] = [ord(i) for i in as_bytes]
return byte_matrix
def fused_rowwise_8bit_quantize_reference(data):
minimum = np.min(data, axis=1, keepdims=True)
maximum = np.max(data, axis=1, keepdims=True)
span = maximum - minimum
bias = minimum
scale = span / 255.0
inverse_scale = 255.0 / (span + 1e-8)
quantized_data = round_to_nearest((data - bias) * inverse_scale)
scale_bytes = floats_to_bytes(scale.reshape(-1))
bias_bytes = floats_to_bytes(bias.reshape(-1))
return np.concatenate([quantized_data, scale_bytes, bias_bytes], axis=1)
def fused_rowwise_8bit_quantize_dequantize_reference(data):
fused_quantized = fused_rowwise_8bit_quantize_reference(data)
scale = bytes_to_floats(fused_quantized[:, -8:-4].astype(np.uint8))
bias = bytes_to_floats(fused_quantized[:, -4:].astype(np.uint8))
quantized_data = fused_quantized[:, :-8]
return quantized_data * scale + bias
class TorchIntegration(hu.HypothesisTestCase):
@given(
roi_counts=st.lists(st.integers(0, 5), min_size=1, max_size=10),
num_classes=st.integers(1, 10),
rotated=st.booleans(),
angle_bound_on=st.booleans(),
clip_angle_thresh=st.sampled_from([-1.0, 1.0]),
**hu.gcs_cpu_only
)
def test_bbox_transform(
self,
roi_counts,
num_classes,
rotated,
angle_bound_on,
clip_angle_thresh,
gc,
dc,
):
"""
Test with rois for multiple images in a batch
"""
rois, deltas, im_info = create_bbox_transform_inputs(
roi_counts, num_classes, rotated
)
def bbox_transform_ref():
ref_op = core.CreateOperator(
"BBoxTransform",
["rois", "deltas", "im_info"],
["box_out"],
apply_scale=False,
rotated=rotated,
angle_bound_on=angle_bound_on,
clip_angle_thresh=clip_angle_thresh,
)
workspace.FeedBlob("rois", rois)
workspace.FeedBlob("deltas", deltas)
workspace.FeedBlob("im_info", im_info)
workspace.RunOperatorOnce(ref_op)
return workspace.FetchBlob("box_out")
box_out = torch.tensor(bbox_transform_ref())
a, b = torch.ops._caffe2.BBoxTransform(
torch.tensor(rois),
torch.tensor(deltas),
torch.tensor(im_info),
[1.0, 1.0, 1.0, 1.0],
False,
rotated,
angle_bound_on,
-90,
90,
clip_angle_thresh,
legacy_plus_one=True,
)
torch.testing.assert_allclose(box_out, a)
@given(
roi_counts=st.lists(st.integers(0, 5), min_size=1, max_size=10),
num_classes=st.integers(1, 10),
rotated=st.booleans(),
angle_bound_on=st.booleans(),
clip_angle_thresh=st.sampled_from([-1.0, 1.0]),
batch_splits_dtype=st.sampled_from([torch.float32, torch.int32]),
**hu.gcs_cpu_only
)
def test_box_with_nms_limits(
self,
roi_counts,
num_classes,
rotated,
angle_bound_on,
clip_angle_thresh,
batch_splits_dtype,
gc,
dc,
):
rotated = False # FIXME remove this after rotation is supported
rois, deltas, im_info = create_bbox_transform_inputs(
roi_counts, num_classes, rotated
)
pred_bbox, batch_splits = [
t.detach().numpy()
for t in torch.ops._caffe2.BBoxTransform(
torch.tensor(rois),
torch.tensor(deltas),
torch.tensor(im_info),
[1.0, 1.0, 1.0, 1.0],
False,
rotated,
angle_bound_on,
-90,
90,
clip_angle_thresh,
legacy_plus_one=True,
)
]
class_prob = np.random.randn(sum(roi_counts), num_classes).astype(np.float32)
score_thresh = 0.5
nms_thresh = 0.5
topk_per_image = sum(roi_counts) / 2
def box_with_nms_limit_ref():
input_blobs = ["class_prob", "pred_bbox", "batch_splits"]
output_blobs = [
"score_nms",
"bbox_nms",
"class_nms",
"batch_splits_nms",
"keeps_nms",
"keeps_size_nms",
]
ref_op = core.CreateOperator(
"BoxWithNMSLimit",
input_blobs,
output_blobs,
score_thresh=float(score_thresh),
nms=float(nms_thresh),
detections_per_im=int(topk_per_image),
soft_nms_enabled=False,
soft_nms_method="linear",
soft_nms_sigma=0.5,
soft_nms_min_score_thres=0.001,
rotated=rotated,
)
workspace.FeedBlob("class_prob", class_prob)
workspace.FeedBlob("pred_bbox", pred_bbox)
workspace.FeedBlob("batch_splits", batch_splits)
workspace.RunOperatorOnce(ref_op)
return (workspace.FetchBlob(b) for b in output_blobs)
output_refs = box_with_nms_limit_ref()
outputs = torch.ops._caffe2.BoxWithNMSLimit(
torch.tensor(class_prob),
torch.tensor(pred_bbox),
torch.tensor(batch_splits, dtype=batch_splits_dtype),
score_thresh=float(score_thresh),
nms=float(nms_thresh),
detections_per_im=int(topk_per_image),
soft_nms_enabled=False,
soft_nms_method="linear",
soft_nms_sigma=0.5,
soft_nms_min_score_thres=0.001,
rotated=rotated,
cls_agnostic_bbox_reg=False,
input_boxes_include_bg_cls=True,
output_classes_include_bg_cls=True,
legacy_plus_one=True,
)
for o, o_ref in zip(outputs, output_refs):
torch.testing.assert_allclose(o, o_ref)
@given(
dim_1=st.integers(min_value=10, max_value=10),
dim_2=st.integers(min_value=3, max_value=3),
dim_3=st.integers(min_value=2, max_value=2),
)
def test_sparse_to_dense_mask(self, dim_1, dim_2, dim_3):
indices = np.array([i + 1 for i in range(dim_1)]).astype(np.int32)
values = np.random.rand(dim_1, dim_2, dim_3).astype(np.float32)
default_value = np.zeros((dim_2, dim_3)).astype(np.float32)
mask = [2, 4, 9]
def sparse_to_dense_mask_ref(return_presence_mask=False):
ref_op = core.CreateOperator(
"SparseToDenseMask",
["indices", "values", "default_value"],
["output", "presence_mask"],
mask=mask,
return_presence_mask=return_presence_mask,
)
workspace.FeedBlob("indices", indices)
workspace.FeedBlob("values", values)
workspace.FeedBlob("default_value", default_value)
workspace.RunOperatorOnce(ref_op)
if return_presence_mask:
return (
workspace.FetchBlob("output"),
workspace.FetchBlob("presence_mask"),
)
return workspace.FetchBlob("output")
# Testing return_presence_mask = False
output = sparse_to_dense_mask_ref()
output = torch.tensor(output)
a, _ = torch.ops._caffe2.SparseToDenseMask(
torch.tensor(indices),
torch.tensor(values),
torch.tensor(default_value),
None,
mask=mask,
)
torch.testing.assert_allclose(output, a)
# Testing return_presence_mask = True
output, presence_mask = sparse_to_dense_mask_ref(return_presence_mask=True)
output = torch.tensor(output)
presence_mask = torch.tensor(presence_mask)
a, b = torch.ops._caffe2.SparseToDenseMask(
torch.tensor(indices),
torch.tensor(values),
torch.tensor(default_value),
None,
mask=mask,
return_presence_mask=True,
)
torch.testing.assert_allclose(output, a)
torch.testing.assert_allclose(presence_mask, b)
@given(
A=st.integers(min_value=4, max_value=4),
H=st.integers(min_value=10, max_value=10),
W=st.integers(min_value=8, max_value=8),
img_count=st.integers(min_value=3, max_value=3),
)
def test_generate_proposals(self, A, H, W, img_count):
scores = np.ones((img_count, A, H, W)).astype(np.float32)
bbox_deltas = (
np.linspace(0, 10, num=img_count * 4 * A * H * W)
Loading ...