import numpy as np
import os
import shutil
import tempfile
import unittest
import torch
from caffe2.proto import caffe2_pb2
from caffe2.python import core, test_util, workspace, model_helper, brew
import caffe2.python.hypothesis_test_util as htu
import hypothesis.strategies as st
from hypothesis import given, settings
class TestWorkspace(unittest.TestCase):
def setUp(self):
self.net = core.Net("test-net")
self.testblob_ref = self.net.ConstantFill(
[], "testblob", shape=[1, 2, 3, 4], value=1.0)
workspace.ResetWorkspace()
def testRootFolder(self):
self.assertEqual(workspace.ResetWorkspace(), True)
self.assertEqual(workspace.RootFolder(), ".")
self.assertEqual(
workspace.ResetWorkspace("/tmp/caffe-workspace-test"), True)
self.assertEqual(workspace.RootFolder(), "/tmp/caffe-workspace-test")
def testWorkspaceHasBlobWithNonexistingName(self):
self.assertEqual(workspace.HasBlob("non-existing"), False)
def testRunOperatorOnce(self):
self.assertEqual(
workspace.RunOperatorOnce(
self.net.Proto().op[0].SerializeToString()
), True
)
self.assertEqual(workspace.HasBlob("testblob"), True)
blobs = workspace.Blobs()
self.assertEqual(len(blobs), 1)
self.assertEqual(blobs[0], "testblob")
def testGetOperatorCost(self):
op = core.CreateOperator(
"Conv2D",
["X", "W"], ["Y"],
stride_h=1,
stride_w=1,
pad_t=1,
pad_l=1,
pad_b=1,
pad_r=1,
kernel=3,
)
X = np.zeros((1, 8, 8, 8))
W = np.zeros((1, 1, 3, 3))
workspace.FeedBlob("X", X)
workspace.FeedBlob("W", W)
flops, _ = workspace.GetOperatorCost(op.SerializeToString(), ["X", "W"])
self.assertEqual(flops, 1152)
def testRunNetOnce(self):
self.assertEqual(
workspace.RunNetOnce(self.net.Proto().SerializeToString()), True)
self.assertEqual(workspace.HasBlob("testblob"), True)
def testCurrentWorkspaceWrapper(self):
self.assertNotIn("testblob", workspace.C.Workspace.current.blobs)
self.assertEqual(
workspace.RunNetOnce(self.net.Proto().SerializeToString()), True)
self.assertEqual(workspace.HasBlob("testblob"), True)
self.assertIn("testblob", workspace.C.Workspace.current.blobs)
workspace.ResetWorkspace()
self.assertNotIn("testblob", workspace.C.Workspace.current.blobs)
def testRunPlan(self):
plan = core.Plan("test-plan")
plan.AddStep(core.ExecutionStep("test-step", self.net))
self.assertEqual(
workspace.RunPlan(plan.Proto().SerializeToString()), True)
self.assertEqual(workspace.HasBlob("testblob"), True)
def testRunPlanInBackground(self):
plan = core.Plan("test-plan")
plan.AddStep(core.ExecutionStep("test-step", self.net))
background_plan = workspace.RunPlanInBackground(plan)
while not background_plan.is_done():
pass
self.assertEqual(background_plan.is_succeeded(), True)
self.assertEqual(workspace.HasBlob("testblob"), True)
def testConstructPlanFromSteps(self):
step = core.ExecutionStep("test-step-as-plan", self.net)
self.assertEqual(workspace.RunPlan(step), True)
self.assertEqual(workspace.HasBlob("testblob"), True)
def testResetWorkspace(self):
self.assertEqual(
workspace.RunNetOnce(self.net.Proto().SerializeToString()), True)
self.assertEqual(workspace.HasBlob("testblob"), True)
self.assertEqual(workspace.ResetWorkspace(), True)
self.assertEqual(workspace.HasBlob("testblob"), False)
def testTensorAccess(self):
ws = workspace.C.Workspace()
""" test in-place modification """
ws.create_blob("tensor").feed(np.array([1.1, 1.2, 1.3]))
tensor = ws.blobs["tensor"].tensor()
tensor.data[0] = 3.3
val = np.array([3.3, 1.2, 1.3])
np.testing.assert_array_equal(tensor.data, val)
np.testing.assert_array_equal(ws.blobs["tensor"].fetch(), val)
""" test in-place initialization """
tensor.init([2, 3], core.DataType.INT32)
for x in range(2):
for y in range(3):
tensor.data[x, y] = 0
tensor.data[1, 1] = 100
val = np.zeros([2, 3], dtype=np.int32)
val[1, 1] = 100
np.testing.assert_array_equal(tensor.data, val)
np.testing.assert_array_equal(ws.blobs["tensor"].fetch(), val)
""" strings cannot be initialized from python """
with self.assertRaises(RuntimeError):
tensor.init([3, 4], core.DataType.STRING)
""" feed (copy) data into tensor """
val = np.array([[b'abc', b'def'], [b'ghi', b'jkl']], dtype=np.object)
tensor.feed(val)
self.assertEquals(tensor.data[0, 0], b'abc')
np.testing.assert_array_equal(ws.blobs["tensor"].fetch(), val)
val = np.array([1.1, 10.2])
tensor.feed(val)
val[0] = 5.2
self.assertEquals(tensor.data[0], 1.1)
""" fetch (copy) data from tensor """
val = np.array([1.1, 1.2])
tensor.feed(val)
val2 = tensor.fetch()
tensor.data[0] = 5.2
val3 = tensor.fetch()
np.testing.assert_array_equal(val, val2)
self.assertEquals(val3[0], 5.2)
def testFetchFeedBlob(self):
self.assertEqual(
workspace.RunNetOnce(self.net.Proto().SerializeToString()), True)
fetched = workspace.FetchBlob("testblob")
# check if fetched is correct.
self.assertEqual(fetched.shape, (1, 2, 3, 4))
np.testing.assert_array_equal(fetched, 1.0)
fetched[:] = 2.0
self.assertEqual(workspace.FeedBlob("testblob", fetched), True)
fetched_again = workspace.FetchBlob("testblob")
self.assertEqual(fetched_again.shape, (1, 2, 3, 4))
np.testing.assert_array_equal(fetched_again, 2.0)
def testFetchFeedBlobViaBlobReference(self):
self.assertEqual(
workspace.RunNetOnce(self.net.Proto().SerializeToString()), True)
fetched = workspace.FetchBlob(self.testblob_ref)
# check if fetched is correct.
self.assertEqual(fetched.shape, (1, 2, 3, 4))
np.testing.assert_array_equal(fetched, 1.0)
fetched[:] = 2.0
self.assertEqual(workspace.FeedBlob(self.testblob_ref, fetched), True)
fetched_again = workspace.FetchBlob("testblob") # fetch by name now
self.assertEqual(fetched_again.shape, (1, 2, 3, 4))
np.testing.assert_array_equal(fetched_again, 2.0)
def testFetchFeedBlobTypes(self):
for dtype in [np.float16, np.float32, np.float64, np.bool,
np.int8, np.int16, np.int32, np.int64,
np.uint8, np.uint16]:
try:
rng = np.iinfo(dtype).max * 2
except ValueError:
rng = 1000
data = ((np.random.rand(2, 3, 4) - 0.5) * rng).astype(dtype)
self.assertEqual(workspace.FeedBlob("testblob_types", data), True)
fetched_back = workspace.FetchBlob("testblob_types")
self.assertEqual(fetched_back.shape, (2, 3, 4))
self.assertEqual(fetched_back.dtype, dtype)
np.testing.assert_array_equal(fetched_back, data)
def testFetchFeedBlobBool(self):
"""Special case for bool to ensure coverage of both true and false."""
data = np.zeros((2, 3, 4)).astype(np.bool)
data.flat[::2] = True
self.assertEqual(workspace.FeedBlob("testblob_types", data), True)
fetched_back = workspace.FetchBlob("testblob_types")
self.assertEqual(fetched_back.shape, (2, 3, 4))
self.assertEqual(fetched_back.dtype, np.bool)
np.testing.assert_array_equal(fetched_back, data)
def testGetBlobSizeBytes(self):
for dtype in [np.float16, np.float32, np.float64, np.bool,
np.int8, np.int16, np.int32, np.int64,
np.uint8, np.uint16]:
data = np.random.randn(2, 3).astype(dtype)
self.assertTrue(workspace.FeedBlob("testblob_sizeBytes", data), True)
self.assertEqual(
workspace.GetBlobSizeBytes("testblob_sizeBytes"),
6 * np.dtype(dtype).itemsize)
strs1 = np.array([b'Hello World!', b'abcd'])
strs2 = np.array([b'element1', b'element2'])
strs1_len, strs2_len = 0, 0
for str in strs1:
strs1_len += len(str)
for str in strs2:
strs2_len += len(str)
self.assertTrue(workspace.FeedBlob("testblob_str1", strs1), True)
self.assertTrue(workspace.FeedBlob("testblob_str2", strs2), True)
# size of blob "testblob_str1" = size_str1 * meta_.itemsize() + strs1_len
# size of blob "testblob_str2" = size_str2 * meta_.itemsize() + strs2_len
self.assertEqual(
workspace.GetBlobSizeBytes("testblob_str1") -
workspace.GetBlobSizeBytes("testblob_str2"), strs1_len - strs2_len)
def testFetchFeedBlobZeroDim(self):
data = np.empty(shape=(2, 0, 3), dtype=np.float32)
self.assertEqual(workspace.FeedBlob("testblob_empty", data), True)
fetched_back = workspace.FetchBlob("testblob_empty")
self.assertEqual(fetched_back.shape, (2, 0, 3))
self.assertEqual(fetched_back.dtype, np.float32)
def testFetchFeedLongStringTensor(self):
# long strings trigger array of object creation
strs = np.array([
b' '.join(10 * [b'long string']),
b' '.join(128 * [b'very long string']),
b'small \0\1\2 string',
b"Hello, world! I have special \0 symbols \1!"])
workspace.FeedBlob('my_str_tensor', strs)
strs2 = workspace.FetchBlob('my_str_tensor')
self.assertEqual(strs.shape, strs2.shape)
for i in range(0, strs.shape[0]):
self.assertEqual(strs[i], strs2[i])
def testFetchFeedShortStringTensor(self):
# small strings trigger NPY_STRING array
strs = np.array([b'elem1', b'elem 2', b'element 3'])
workspace.FeedBlob('my_str_tensor_2', strs)
strs2 = workspace.FetchBlob('my_str_tensor_2')
self.assertEqual(strs.shape, strs2.shape)
for i in range(0, strs.shape[0]):
self.assertEqual(strs[i], strs2[i])
def testFetchFeedPlainString(self):
# this is actual string, not a tensor of strings
s = b"Hello, world! I have special \0 symbols \1!"
workspace.FeedBlob('my_plain_string', s)
s2 = workspace.FetchBlob('my_plain_string')
self.assertEqual(s, s2)
def testFetchBlobs(self):
s1 = b"test1"
s2 = b"test2"
workspace.FeedBlob('s1', s1)
workspace.FeedBlob('s2', s2)
fetch1, fetch2 = workspace.FetchBlobs(['s1', 's2'])
self.assertEquals(s1, fetch1)
self.assertEquals(s2, fetch2)
def testFetchFeedViaBlobDict(self):
self.assertEqual(
workspace.RunNetOnce(self.net.Proto().SerializeToString()), True)
fetched = workspace.blobs["testblob"]
# check if fetched is correct.
self.assertEqual(fetched.shape, (1, 2, 3, 4))
np.testing.assert_array_equal(fetched, 1.0)
fetched[:] = 2.0
workspace.blobs["testblob"] = fetched
fetched_again = workspace.blobs["testblob"]
self.assertEqual(fetched_again.shape, (1, 2, 3, 4))
np.testing.assert_array_equal(fetched_again, 2.0)
self.assertTrue("testblob" in workspace.blobs)
self.assertFalse("non_existant" in workspace.blobs)
self.assertEqual(len(workspace.blobs), 1)
for key in workspace.blobs:
self.assertEqual(key, "testblob")
def testTorchInterop(self):
workspace.RunOperatorOnce(core.CreateOperator(
"ConstantFill", [], "foo", shape=(4,), value=2, dtype=10))
t = workspace.FetchTorch("foo")
t.resize_(5)
t[4] = t[2] = 777
np.testing.assert_array_equal(t.numpy(), np.array([2,2,777,2,777]))
np.testing.assert_array_equal(
workspace.FetchBlob("foo"), np.array([2,2,777,2,777]))
z = torch.ones((4,), dtype=torch.int64)
workspace.FeedBlob('bar', z)
workspace.RunOperatorOnce(
core.CreateOperator("Reshape", ['bar'], ['bar', '_'], shape=(2,2)))
z[0,1] = 123
np.testing.assert_array_equal(
workspace.FetchBlob("bar"), np.array([[1,123],[1,1]]))
np.testing.assert_array_equal(z, np.array([[1,123],[1,1]]))
class TestMultiWorkspaces(unittest.TestCase):
def setUp(self):
workspace.SwitchWorkspace("default")
workspace.ResetWorkspace()
def testCreateWorkspace(self):
self.net = core.Net("test-net")
self.net.ConstantFill([], "testblob", shape=[1, 2, 3, 4], value=1.0)
self.assertEqual(
workspace.RunNetOnce(self.net.Proto().SerializeToString()), True
)
self.assertEqual(workspace.HasBlob("testblob"), True)
self.assertEqual(workspace.SwitchWorkspace("test", True), None)
self.assertEqual(workspace.HasBlob("testblob"), False)
self.assertEqual(workspace.SwitchWorkspace("default"), None)
self.assertEqual(workspace.HasBlob("testblob"), True)
try:
# The following should raise an error.
workspace.SwitchWorkspace("non-existing")
# so this should never happen.
self.assertEqual(True, False)
except RuntimeError:
pass
workspaces = workspace.Workspaces()
self.assertTrue("default" in workspaces)
self.assertTrue("test" in workspaces)
@unittest.skipIf(not workspace.has_gpu_support, "No gpu support.")
Loading ...