from hypothesis import given, settings
import hypothesis.strategies as st
import unittest
from caffe2.proto import caffe2_pb2
from caffe2.python import core, test_util, workspace
from caffe2.python.core import CreateOperator, GradientRegistry, IR
import numpy as np
# First, we will set up a few gradient registry entries so that we can manually
# construct some test cases.
def NeedAll(op, g_output):
"""A sanity check to make sure that all the gradient are given."""
for name, g in zip(op.output, g_output):
if g is None:
raise RuntimeError(
'Need gradient for "%s" but it is not provided.' % name)
return g_output
def GIS(op):
"""A test util function to generate the gradient name for input."""
return [s + '_grad' for s in op.input]
def CopyDeviceOption(op, src_op):
if src_op.HasField('device_option'):
op.device_option.CopyFrom(src_op.device_option)
return op
# First gradient: (in -> out) leading to (out_grad -> in_grad)
@GradientRegistry.RegisterGradient('Direct')
def AddDirectGradient(op, g_output):
return (
CopyDeviceOption(
CreateOperator('DirectGradient', NeedAll(op, g_output), GIS(op)),
op),
GIS(op)
)
# Second gradient: (in -> out) leading to (out, out_grad -> in_grad)
@GradientRegistry.RegisterGradient('UseOutput')
def AddUseOutputGradient(op, g_output):
return (
CopyDeviceOption(
CreateOperator(
'UseOutputGradient',
list(op.output) + NeedAll(op, g_output), GIS(op)),
op),
GIS(op)
)
@GradientRegistry.RegisterGradient('UseInput')
def AddUseInputGradient(op, g_output):
return (
CopyDeviceOption(
CreateOperator(
'UseInputGradient',
list(op.input) + NeedAll(op, g_output), GIS(op)),
op),
GIS(op)
)
@GradientRegistry.RegisterGradient('Nogradient')
def AddNogradient(op, g_output):
return (
[],
[None for s in op.input]
)
class TestGradientCalculation(test_util.TestCase):
def assertOperatorListEqual(self, operatorDefList1, operatorDefList2):
for op in operatorDefList1:
op.debug_info = ""
if op.device_option:
del op.device_option.extra_info[:]
for op in operatorDefList2:
op.debug_info = ""
if op.device_option:
del op.device_option.extra_info[:]
self.assertEqual(operatorDefList1, operatorDefList2)
@given(device_option=st.sampled_from([
None,
core.DeviceOption(workspace.GpuDeviceType, 1)]))
@settings(deadline=10000)
def testDirect(self, device_option):
operators = [
CreateOperator('Direct', 'in', 'hidden'),
CreateOperator('Direct', 'hidden', 'out'),
]
if device_option:
for op in operators:
op.device_option.CopyFrom(device_option)
desired_grad_operators = [
CreateOperator('DirectGradient', 'out_grad', 'hidden_grad'),
CreateOperator('DirectGradient', 'hidden_grad', 'in_grad'),
]
if device_option:
for op in desired_grad_operators:
op.device_option.CopyFrom(device_option)
gradients, _ = GradientRegistry.GetBackwardPass(
operators, {'out': 'out_grad'})
self.assertOperatorListEqual(gradients, desired_grad_operators)
def testDirectImplicitGradientSource(self):
operators = [
CreateOperator('Direct', 'in', 'hidden'),
CreateOperator('Direct', 'hidden', 'out'),
]
desired_grad_operators = [
CreateOperator(
"ConstantFill", 'out', "out_autogen_grad", value=1.0),
CreateOperator(
'DirectGradient', 'out_autogen_grad', 'hidden_grad'),
CreateOperator('DirectGradient', 'hidden_grad', 'in_grad'),
]
for op in desired_grad_operators:
op.debug_info = ""
gradients, _ = GradientRegistry.GetBackwardPass(
operators, ['out'])
self.assertOperatorListEqual(gradients, desired_grad_operators)
def testDoesNotGenerateUnnecessaryGradients(self):
operators = [
CreateOperator('Direct', 'in', 'hidden'),
CreateOperator('Direct', 'hidden', 'out'),
]
desired_grad_operators = [
CreateOperator('DirectGradient', 'hidden_grad', 'in_grad'),
]
for op in desired_grad_operators:
op.debug_info = ""
gradients, _ = GradientRegistry.GetBackwardPass(
operators, {'hidden': 'hidden_grad'})
self.assertOperatorListEqual(gradients, desired_grad_operators)
def testDirectButNoOutputGradientGiven(self):
operators = [
CreateOperator('Direct', 'in', 'hidden'),
CreateOperator('Direct', 'hidden', 'out'),
]
gradients, _ = GradientRegistry.GetBackwardPass(
operators, {})
self.assertOperatorListEqual(gradients, [])
def testDirectInPlace(self):
operators = [
CreateOperator('Direct', 'in', 'in'),
CreateOperator('Direct', 'in', 'out'),
]
desired_grad_operators = [
CreateOperator('DirectGradient', 'out_grad', 'in_grad'),
CreateOperator('DirectGradient', 'in_grad', 'in_grad'),
]
gradients, _ = GradientRegistry.GetBackwardPass(
operators, {'out': 'out_grad'})
self.assertOperatorListEqual(gradients, desired_grad_operators)
def testVersionMismatch(self):
operators = [
CreateOperator('Direct', 'x', 'x'),
CreateOperator('Direct', 'y', 'x'),
CreateOperator('Direct', 'x', 'y'),
]
try:
gradients, _ = GradientRegistry.GetBackwardPass(
operators, {'y': 'y_grad'})
self.assertFalse(True, "Should raise exception of incorrect version")
except RuntimeError as e:
print(e)
self.assertTrue("version" in str(e))
pass
def testUseOutput(self):
operators = [
CreateOperator('UseOutput', 'in', 'hidden'),
CreateOperator('UseOutput', 'hidden', 'out'),
CreateOperator('Direct', 'out', 'sink'),
]
desired_grad_operators = [
CreateOperator('DirectGradient', 'sink_grad', 'out_grad'),
CreateOperator(
'UseOutputGradient',
['out', 'out_grad'], 'hidden_grad'
),
CreateOperator(
'UseOutputGradient',
['hidden', 'hidden_grad'], 'in_grad'
),
]
gradients, _ = GradientRegistry.GetBackwardPass(
operators, {'sink': 'sink_grad'})
self.assertOperatorListEqual(gradients, desired_grad_operators)
def testUseOutputInPlace(self):
operators = [
CreateOperator('UseOutput', 'in', 'in'),
CreateOperator('UseOutput', 'in', 'out'),
CreateOperator('Direct', 'out', 'sink'),
]
desired_grad_operators = [
CreateOperator('DirectGradient', 'sink_grad', 'out_grad'),
CreateOperator(
'UseOutputGradient',
['out', 'out_grad'], 'in_grad'
),
CreateOperator(
'UseOutputGradient',
['in', 'in_grad'], 'in_grad'
),
]
gradients, _ = GradientRegistry.GetBackwardPass(
operators, {'sink': 'sink_grad'})
self.assertOperatorListEqual(gradients, desired_grad_operators)
def testUseOutputButOutputHasBeenChanged(self):
operators = [
CreateOperator('UseOutput', 'in', 'hidden'),
# Note here: we overwrite hidden, but hidden will be needed by the
# gradient calculation of the first operator, so the gradient
# registry should return an error.
CreateOperator('Direct', 'hidden', 'hidden'),
CreateOperator('UseOutput', 'hidden', 'out'),
CreateOperator('Direct', 'out', 'sink'),
]
with self.assertRaises(RuntimeError):
gradients, _ = GradientRegistry.GetBackwardPass(
operators, {'sink': 'sink_grad'})
def testUseInput(self):
operators = [
CreateOperator('Direct', 'in', 'hidden'),
CreateOperator('UseInput', 'hidden', 'out'),
CreateOperator('Direct', 'out', 'sink'),
]
desired_grad_operators = [
CreateOperator('DirectGradient', 'sink_grad', 'out_grad'),
CreateOperator(
'UseInputGradient',
['hidden', 'out_grad'], 'hidden_grad'
),
CreateOperator(
'DirectGradient',
'hidden_grad', 'in_grad'
),
]
gradients, _ = GradientRegistry.GetBackwardPass(
operators, {'sink': 'sink_grad'})
self.assertOperatorListEqual(gradients, desired_grad_operators)
def testUseInputButInputHasBeenChanged(self):
"""Test gradient for the following case:
in -> out, with UseInput
in -> in
Since we overwrite in in op#1, but in will be needed by the gradient
calculation of op#0, the gradient registry should raise an error.
"""
operators = [
CreateOperator('UseInput', 'in', 'out'),
CreateOperator('Direct', 'in', 'in'),
]
with self.assertRaises(RuntimeError):
gradients, _ = GradientRegistry.GetBackwardPass(
operators, {'out': 'out_grad'})
@given(device_option=st.sampled_from([
None,
core.DeviceOption(workspace.GpuDeviceType, 1)]))
@settings(deadline=10000)
def testMultiUseInput(self, device_option):
"""Test gradient for the following case:
in -> hidden1
in -> hidden2
hidden1, hidden2 -> out
"""
operators = [
CreateOperator('Direct', 'in', 'hidden1'),
CreateOperator('Direct', 'in', 'hidden2'),
CreateOperator('Direct', ['hidden1', 'hidden2'], 'out'),
]
if device_option:
for op in operators:
op.device_option.CopyFrom(device_option)
desired_grad_operators = [
CreateOperator(
'DirectGradient',
'out_grad', ['hidden1_grad', 'hidden2_grad']
),
CreateOperator(
'DirectGradient',
'hidden2_grad', 'in_grad'
),
CreateOperator(
'DirectGradient',
'hidden1_grad', '_in_grad_autosplit_0'
),
CreateOperator(
'Sum',
['in_grad', '_in_grad_autosplit_0'], 'in_grad'
),
]
if device_option:
for op in desired_grad_operators:
op.device_option.CopyFrom(device_option)
gradients, _ = GradientRegistry.GetBackwardPass(
operators, {"out": "out_grad"})
self.assertOperatorListEqual(gradients, desired_grad_operators)
def testMultiUseInputButWithNoGradient(self):
"""Test gradient for the following case:
in -> hidden1
in -(no gradient)-> hidden2
hidden1, hidden2 -> out
"""
operators = [
CreateOperator('Direct', 'in', 'hidden1'),
CreateOperator('Nogradient', 'in', 'hidden2'),
CreateOperator('Direct', ['hidden1', 'hidden2'], 'out'),
]
desired_grad_operators = [
CreateOperator(
'DirectGradient',
'out_grad', ['hidden1_grad', 'hidden2_grad']
),
CreateOperator(
'DirectGradient',
Loading ...