Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

Version: 1.8.0 

/ python / core_gradients_test.py






from hypothesis import given, settings
import hypothesis.strategies as st
import unittest

from caffe2.proto import caffe2_pb2
from caffe2.python import core, test_util, workspace
from caffe2.python.core import CreateOperator, GradientRegistry, IR

import numpy as np


# First, we will set up a few gradient registry entries so that we can manually
# construct some test cases.


def NeedAll(op, g_output):
    """A sanity check to make sure that all the gradient are given."""
    for name, g in zip(op.output, g_output):
        if g is None:
            raise RuntimeError(
                'Need gradient for "%s" but it is not provided.' % name)
    return g_output


def GIS(op):
    """A test util function to generate the gradient name for input."""
    return [s + '_grad' for s in op.input]


def CopyDeviceOption(op, src_op):
    if src_op.HasField('device_option'):
        op.device_option.CopyFrom(src_op.device_option)
    return op


# First gradient: (in -> out) leading to (out_grad -> in_grad)
@GradientRegistry.RegisterGradient('Direct')
def AddDirectGradient(op, g_output):
    return (
        CopyDeviceOption(
            CreateOperator('DirectGradient', NeedAll(op, g_output), GIS(op)),
            op),
        GIS(op)
    )


# Second gradient: (in -> out) leading to (out, out_grad -> in_grad)
@GradientRegistry.RegisterGradient('UseOutput')
def AddUseOutputGradient(op, g_output):
    return (
        CopyDeviceOption(
            CreateOperator(
                'UseOutputGradient',
                list(op.output) + NeedAll(op, g_output), GIS(op)),
            op),
        GIS(op)
    )


@GradientRegistry.RegisterGradient('UseInput')
def AddUseInputGradient(op, g_output):
    return (
        CopyDeviceOption(
            CreateOperator(
                'UseInputGradient',
                list(op.input) + NeedAll(op, g_output), GIS(op)),
            op),
        GIS(op)
    )


@GradientRegistry.RegisterGradient('Nogradient')
def AddNogradient(op, g_output):
    return (
        [],
        [None for s in op.input]
    )


class TestGradientCalculation(test_util.TestCase):
    def assertOperatorListEqual(self, operatorDefList1, operatorDefList2):
        for op in operatorDefList1:
            op.debug_info = ""
            if op.device_option:
                del op.device_option.extra_info[:]
        for op in operatorDefList2:
            op.debug_info = ""
            if op.device_option:
                del op.device_option.extra_info[:]
        self.assertEqual(operatorDefList1, operatorDefList2)

    @given(device_option=st.sampled_from([
        None,
        core.DeviceOption(workspace.GpuDeviceType, 1)]))
    @settings(deadline=10000)
    def testDirect(self, device_option):
        operators = [
            CreateOperator('Direct', 'in', 'hidden'),
            CreateOperator('Direct', 'hidden', 'out'),
        ]
        if device_option:
            for op in operators:
                op.device_option.CopyFrom(device_option)
        desired_grad_operators = [
            CreateOperator('DirectGradient', 'out_grad', 'hidden_grad'),
            CreateOperator('DirectGradient', 'hidden_grad', 'in_grad'),
        ]
        if device_option:
            for op in desired_grad_operators:
                op.device_option.CopyFrom(device_option)
        gradients, _ = GradientRegistry.GetBackwardPass(
            operators, {'out': 'out_grad'})
        self.assertOperatorListEqual(gradients, desired_grad_operators)

    def testDirectImplicitGradientSource(self):
        operators = [
            CreateOperator('Direct', 'in', 'hidden'),
            CreateOperator('Direct', 'hidden', 'out'),
        ]
        desired_grad_operators = [
            CreateOperator(
                "ConstantFill", 'out', "out_autogen_grad", value=1.0),
            CreateOperator(
                'DirectGradient', 'out_autogen_grad', 'hidden_grad'),
            CreateOperator('DirectGradient', 'hidden_grad', 'in_grad'),
        ]
        for op in desired_grad_operators:
            op.debug_info = ""
        gradients, _ = GradientRegistry.GetBackwardPass(
            operators, ['out'])
        self.assertOperatorListEqual(gradients, desired_grad_operators)

    def testDoesNotGenerateUnnecessaryGradients(self):
        operators = [
            CreateOperator('Direct', 'in', 'hidden'),
            CreateOperator('Direct', 'hidden', 'out'),
        ]
        desired_grad_operators = [
            CreateOperator('DirectGradient', 'hidden_grad', 'in_grad'),
        ]
        for op in desired_grad_operators:
            op.debug_info = ""
        gradients, _ = GradientRegistry.GetBackwardPass(
            operators, {'hidden': 'hidden_grad'})
        self.assertOperatorListEqual(gradients, desired_grad_operators)

    def testDirectButNoOutputGradientGiven(self):
        operators = [
            CreateOperator('Direct', 'in', 'hidden'),
            CreateOperator('Direct', 'hidden', 'out'),
        ]
        gradients, _ = GradientRegistry.GetBackwardPass(
            operators, {})
        self.assertOperatorListEqual(gradients, [])

    def testDirectInPlace(self):
        operators = [
            CreateOperator('Direct', 'in', 'in'),
            CreateOperator('Direct', 'in', 'out'),
        ]
        desired_grad_operators = [
            CreateOperator('DirectGradient', 'out_grad', 'in_grad'),
            CreateOperator('DirectGradient', 'in_grad', 'in_grad'),
        ]
        gradients, _ = GradientRegistry.GetBackwardPass(
            operators, {'out': 'out_grad'})
        self.assertOperatorListEqual(gradients, desired_grad_operators)

    def testVersionMismatch(self):
        operators = [
            CreateOperator('Direct', 'x', 'x'),
            CreateOperator('Direct', 'y', 'x'),
            CreateOperator('Direct', 'x', 'y'),
        ]
        try:
            gradients, _ = GradientRegistry.GetBackwardPass(
                operators, {'y': 'y_grad'})
            self.assertFalse(True, "Should raise exception of incorrect version")
        except RuntimeError as e:
            print(e)
            self.assertTrue("version" in str(e))
            pass

    def testUseOutput(self):
        operators = [
            CreateOperator('UseOutput', 'in', 'hidden'),
            CreateOperator('UseOutput', 'hidden', 'out'),
            CreateOperator('Direct', 'out', 'sink'),
        ]
        desired_grad_operators = [
            CreateOperator('DirectGradient', 'sink_grad', 'out_grad'),
            CreateOperator(
                'UseOutputGradient',
                ['out', 'out_grad'], 'hidden_grad'
            ),
            CreateOperator(
                'UseOutputGradient',
                ['hidden', 'hidden_grad'], 'in_grad'
            ),
        ]
        gradients, _ = GradientRegistry.GetBackwardPass(
            operators, {'sink': 'sink_grad'})
        self.assertOperatorListEqual(gradients, desired_grad_operators)

    def testUseOutputInPlace(self):
        operators = [
            CreateOperator('UseOutput', 'in', 'in'),
            CreateOperator('UseOutput', 'in', 'out'),
            CreateOperator('Direct', 'out', 'sink'),
        ]
        desired_grad_operators = [
            CreateOperator('DirectGradient', 'sink_grad', 'out_grad'),
            CreateOperator(
                'UseOutputGradient',
                ['out', 'out_grad'], 'in_grad'
            ),
            CreateOperator(
                'UseOutputGradient',
                ['in', 'in_grad'], 'in_grad'
            ),
        ]
        gradients, _ = GradientRegistry.GetBackwardPass(
            operators, {'sink': 'sink_grad'})
        self.assertOperatorListEqual(gradients, desired_grad_operators)

    def testUseOutputButOutputHasBeenChanged(self):
        operators = [
            CreateOperator('UseOutput', 'in', 'hidden'),
            # Note here: we overwrite hidden, but hidden will be needed by the
            # gradient calculation of the first operator, so the gradient
            # registry should return an error.
            CreateOperator('Direct', 'hidden', 'hidden'),
            CreateOperator('UseOutput', 'hidden', 'out'),
            CreateOperator('Direct', 'out', 'sink'),
        ]
        with self.assertRaises(RuntimeError):
            gradients, _ = GradientRegistry.GetBackwardPass(
                operators, {'sink': 'sink_grad'})

    def testUseInput(self):
        operators = [
            CreateOperator('Direct', 'in', 'hidden'),
            CreateOperator('UseInput', 'hidden', 'out'),
            CreateOperator('Direct', 'out', 'sink'),
        ]
        desired_grad_operators = [
            CreateOperator('DirectGradient', 'sink_grad', 'out_grad'),
            CreateOperator(
                'UseInputGradient',
                ['hidden', 'out_grad'], 'hidden_grad'
            ),
            CreateOperator(
                'DirectGradient',
                'hidden_grad', 'in_grad'
            ),
        ]
        gradients, _ = GradientRegistry.GetBackwardPass(
            operators, {'sink': 'sink_grad'})
        self.assertOperatorListEqual(gradients, desired_grad_operators)

    def testUseInputButInputHasBeenChanged(self):
        """Test gradient for the following case:

        in -> out, with UseInput
        in -> in

        Since we overwrite in in op#1, but in will be needed by the gradient
        calculation of op#0, the gradient registry should raise an error.
        """
        operators = [
            CreateOperator('UseInput', 'in', 'out'),
            CreateOperator('Direct', 'in', 'in'),
        ]
        with self.assertRaises(RuntimeError):
            gradients, _ = GradientRegistry.GetBackwardPass(
                operators, {'out': 'out_grad'})

    @given(device_option=st.sampled_from([
        None,
        core.DeviceOption(workspace.GpuDeviceType, 1)]))
    @settings(deadline=10000)
    def testMultiUseInput(self, device_option):
        """Test gradient for the following case:

        in -> hidden1
        in -> hidden2
        hidden1, hidden2 -> out
        """
        operators = [
            CreateOperator('Direct', 'in', 'hidden1'),
            CreateOperator('Direct', 'in', 'hidden2'),
            CreateOperator('Direct', ['hidden1', 'hidden2'], 'out'),
        ]
        if device_option:
            for op in operators:
                op.device_option.CopyFrom(device_option)
        desired_grad_operators = [
            CreateOperator(
                'DirectGradient',
                'out_grad', ['hidden1_grad', 'hidden2_grad']
            ),
            CreateOperator(
                'DirectGradient',
                'hidden2_grad', 'in_grad'
            ),
            CreateOperator(
                'DirectGradient',
                'hidden1_grad', '_in_grad_autosplit_0'
            ),
            CreateOperator(
                'Sum',
                ['in_grad', '_in_grad_autosplit_0'], 'in_grad'
            ),
        ]
        if device_option:
            for op in desired_grad_operators:
                op.device_option.CopyFrom(device_option)
        gradients, _ = GradientRegistry.GetBackwardPass(
            operators, {"out": "out_grad"})
        self.assertOperatorListEqual(gradients, desired_grad_operators)

    def testMultiUseInputButWithNoGradient(self):
        """Test gradient for the following case:

        in -> hidden1
        in -(no gradient)-> hidden2
        hidden1, hidden2 -> out
        """
        operators = [
            CreateOperator('Direct', 'in', 'hidden1'),
            CreateOperator('Nogradient', 'in', 'hidden2'),
            CreateOperator('Direct', ['hidden1', 'hidden2'], 'out'),
        ]
        desired_grad_operators = [
            CreateOperator(
                'DirectGradient',
                'out_grad', ['hidden1_grad', 'hidden2_grad']
            ),
            CreateOperator(
                'DirectGradient',
Loading ...