Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

Version: 1.8.0 

/ python / data_parallel_model_test.py





from future.utils import viewkeys
from multiprocessing import Process, Queue
import numpy as np
import os
import shutil
import tempfile
import unittest
import time
from mock import Mock
from hypothesis import assume, given, settings
import hypothesis.strategies as st

from caffe2.proto import caffe2_pb2
from caffe2.python import brew, core, cnn, data_parallel_model, dyndep, \
    model_helper, optimizer, rnn_cell, workspace
from caffe2.python.test_util import TestCase


dyndep.InitOpsLibrary("@/caffe2/caffe2/distributed:file_store_handler_ops")


class TemporaryDirectory:
    def __enter__(self):
        self.tmpdir = tempfile.mkdtemp()
        return self.tmpdir

    def __exit__(self, type, value, traceback):
        shutil.rmtree(self.tmpdir)

# Note(jiayq): we are yet to find out why Travis gives out an error in gloo
# like:
# RuntimeError: [enforce fail at /home/travis/build/caffe2/caffe2/third_party/gloo/gloo/transport/tcp/device.cc:113] ifa != nullptr. Unable to find interface for: [127.0.1.1]
# See for example https://travis-ci.org/caffe2/caffe2/jobs/262433866
# As a result, we will check if this is travis, and if yes, disable it.
@unittest.skipIf(os.environ.get("TRAVIS"), "DPMTest has a known issue with Travis.")
class DataParallelModelTest(TestCase):

    def run_model(self, devices, gpu):
        '''
        Helper function for test_equiv
        '''
        def input_builder_fun(model):
            return None

        def model_build_fun(model, loss_scale):
            fc = model.FC("data", "fc", 16, 1,
                          ("ConstantFill", {}), ("ConstantFill", {}))
            fc_fl = model.FlattenToVec(fc, "fc_fl")
            sigm = model.Sigmoid(fc_fl, "sigm")
            sq = model.SquaredL2Distance([sigm, "label"], "sq")
            loss = model.AveragedLoss(sq, "loss")
            loss = model.Scale(loss, scale=loss_scale)

            # For testing explicit sync
            model.param_init_net.UniformFill([], ["sync_num"], shape=[1])
            return [loss]

        def add_optimizer(model):
            return optimizer.build_sgd(
                model,
                0.1,
                policy="fixed",
                max_gradient_norm=5.0,
                allow_lr_injection=True,
            )

        workspace.ResetWorkspace()
        model = cnn.CNNModelHelper(
            order="NHWC",
            name="test{}".format(devices),
        )
        data_parallel_model.Parallelize(
            model,
            input_builder_fun=input_builder_fun,
            forward_pass_builder_fun=model_build_fun,
            optimizer_builder_fun=add_optimizer,
            devices=devices,
            cpu_device=not gpu,
            shared_model=not gpu,
            combine_spatial_bn=not gpu,
        )
        data_parallel_model.AddBlobSync(model, ["sync_num"])

        # Light test for LR names
        lr_names = data_parallel_model.GetLearningRateBlobNames(model)
        self.assertGreater(len(lr_names), 0)

        np.random.seed(2603)

        # Each run has same input, independent of number of gpus
        batch_size = 64
        for i in range(0, 10):
            full_data = np.random.rand(batch_size, 16)
            full_labels = np.round(full_data[:, 0])
            batch_per_device = batch_size // len(devices)

            for (j, g) in enumerate(devices):
                st = j * batch_per_device
                en = st + batch_per_device
                data = full_data[st:en, :].astype(np.float32)
                labels = full_labels[st:en].astype(np.float32)
                with core.DeviceScope(core.DeviceOption(model._device_type, g)):
                    workspace.FeedBlob(
                        "{}_{}/data".format(model._device_prefix, g), data
                    )
                    workspace.FeedBlob(
                        "{}_{}/label".format(model._device_prefix, g), labels
                    )

            if i == 0:
                workspace.RunNetOnce(model.param_init_net)
                workspace.CreateNet(model.net)

            workspace.FeedBlob(
                model._device_prefix + "_0/sync_num",
                np.array([i * 2]).astype(np.float32),
                device_option=core.DeviceOption(model._device_type, 0))
            workspace.RunNet(model.net.Proto().name)

            # Test AddBlobSync
            for j in model._devices:
                sync = workspace.FetchBlob(
                    model._device_prefix + "_{}/sync_num".format(j))[0]
                self.assertTrue(abs(sync - i * 2) < 0.01)

        return workspace.FetchBlob("{}_0/fc_w".format(model._device_prefix))

    def run_test_locally(self, fn, device_option=None, **kwargs):
        # Queue for assertion errors on subprocesses
        queue = Queue()

        # Capture any exception thrown by the subprocess
        def run_fn(*args, **kwargs):
            try:
                if device_option is None:
                    fn(*args, **kwargs)
                    workspace.ResetWorkspace()
                else:
                    with core.DeviceScope(device_option):
                        fn(*args, **kwargs)
                        workspace.ResetWorkspace()
            except Exception as ex:
                queue.put(ex)

        # Start N processes in the background
        procs = []
        for i in range(kwargs['comm_size']):
            kwargs['comm_rank'] = i
            proc = Process(
                target=run_fn,
                kwargs=kwargs)
            proc.start()
            procs.append(proc)

        # Test complete, join background processes
        while len(procs) > 0:
            proc = procs.pop(0)
            while proc.is_alive():
                proc.join(1)

                # Raise exception if we find any.
                # Note that the following is executed ALSO after
                # the last process was joined, so if ANY exception
                # was raised, it will be re-raised here.
                if not queue.empty():
                    raise queue.get()

    def test_equiv(self):
        '''
        Test that the model produces exactly same results given
        total batchsize, independent of number of GPUs.
        '''
        for gpu in [True, False]:
            if gpu and (not workspace.has_gpu_support or
                        workspace.NumCudaDevices() < 2):
                continue
            result_2gpus = self.run_model([0, 1], gpu=gpu)
            result_1gpus = self.run_model([0], gpu=gpu)

            self.assertTrue(np.allclose(result_1gpus, result_2gpus))

            if not gpu or workspace.NumCudaDevices() >= 4:
                result_4gpus = self.run_model(list(range(4)), gpu=gpu)
                self.assertTrue(np.allclose(result_1gpus, result_4gpus))

            if not gpu or workspace.NumCudaDevices() >= 8:
                result_8gpus = self.run_model(list(range(8)), gpu=gpu)
                self.assertTrue(np.allclose(result_1gpus, result_8gpus))

            if not gpu or workspace.NumCudaDevices() >= 16:
                result_16gpus = self.run_model(list(range(16)), gpu=gpu)
                self.assertTrue(np.allclose(result_1gpus, result_16gpus))

    def test_checkpoint_params(self):
        def add_input_ops(model):
            pass

        def add_model_ops(model, loss_scale):
            model.NHWC2NCHW("data", "data_nchw")
            model.Conv("data_nchw", 'conv1', 3, 64,
                       weight_init=("MSRAFill", {}), kernel=7,
                       stride=2, pad=3, no_bias=0)
            model.SpatialBN('conv1', 'conv1_spatbn_relu', 64, epsilon=1e-3, is_test=False)
            model.Relu('conv1_spatbn_relu', 'conv1_spatbn_relu')
            model.MaxPool('conv1_spatbn_relu', 'pool1', kernel=3, stride=2)
            model.FC('pool1', 'fc', dim_in=(64 * 56 * 56), dim_out=100)
            model.Sigmoid('fc', 'fc_sigm')
            model.Softmax('fc_sigm', 'softmax')
            model.LabelCrossEntropy(['softmax', 'label'], 'xent')
            loss = model.AveragedLoss('xent', 'loss')

            # Add a duplicate param init to ensure it does not cause issues
            model.param_init_net.ConstantFill(
                [], ["fc_w"], shape=((64 * 56 * 56), 1000)
            )
            return [loss]

        def add_optimizer(model):
            optimizer.build_sgd(model, 0.1, policy="fixed", momentum=0.9)

        model = cnn.CNNModelHelper(
            order="NHWC",
            name="test",
        )
        data_parallel_model.Parallelize_CPU(
            model,
            input_builder_fun=add_input_ops,
            forward_pass_builder_fun=add_model_ops,
            optimizer_builder_fun=add_optimizer,
            devices=[1, 2, 3],
        )

        # Only gpu_1 params should be returned (gpu_1 is the first gpu)
        checkpoint_params = data_parallel_model.GetCheckpointParams(model)
        for p in model.GetParams("cpu_1/"):
            self.assertTrue(p in checkpoint_params)
            self.assertTrue(p + "_momentum" in checkpoint_params)
        for p in model.GetParams("cpu_2/"):
            self.assertFalse(p in checkpoint_params)
        self.assertTrue(
            core.BlobReference("cpu_1/fc_w_momentum") in checkpoint_params)
        for c in model.GetComputedParams("cpu_1/"):
            self.assertTrue(c in checkpoint_params)
        for c in model.GetComputedParams("cpu_2/"):
            self.assertFalse(c in checkpoint_params)
        self.assertFalse(core.BlobReference("cpu_1/data") in checkpoint_params)
        self.assertTrue(core.BlobReference("optimizer_iteration") in checkpoint_params)

    def test_net_conversion_and_append_net(self):
        other = model_helper.ModelHelper()
        fc1 = brew.fc(other, "data", "other_fc1", dim_in=3*227*227, dim_out=10)
        fc2 = brew.fc(other, fc1, "other_fc2", dim_in=10, dim_out=10)
        brew.fc(other, fc2, "other_fc3", dim_in=10, dim_out=10)

        def add_input_ops(model):
            model.net.UniformFill([], ["data"], shape=[4, 227, 227, 3])
            model.net.UniformFill([], ["label"], shape=[4])

        def add_model_ops(model, loss_scale):
            model.NHWC2NCHW("data", "data_nchw")
            model.Conv("data_nchw", 'conv1', 3, 64,
                       weight_init=("MSRAFill", {}), kernel=7,
                       stride=2, pad=3, no_bias=0)
            model.SpatialBN('conv1', 'conv1_spatbn_relu', 64, epsilon=1e-3, is_test=False)
            model.Relu('conv1_spatbn_relu', 'conv1_spatbn_relu')
            model.MaxPool('conv1_spatbn_relu', 'pool1', kernel=3, stride=2)
            model.FC('pool1', 'fc', dim_in=(64 * 56 * 56), dim_out=10)

            # Append the net and param_init_net of the other model
            appendnet = data_parallel_model.ConvertNetForDevice(other.net)
            model.net.AppendNet(appendnet)

            model.param_init_net.AppendNet(
                data_parallel_model.ConvertNetForDevice(other.param_init_net))

            model.Sigmoid('fc', 'fc_sigm')
            model.Softmax('fc_sigm', 'softmax')
            loss = model.AveragedLoss('softmax', 'loss')
            return [loss]

        def add_optimizer(model):
            optimizer.build_sgd(model, 0.1, policy="fixed", momentum=0.9)

        model = cnn.CNNModelHelper(
            order="NCHW",
            name="test",
        )
        data_parallel_model.Parallelize_CPU(
            model,
            input_builder_fun=add_input_ops,
            forward_pass_builder_fun=add_model_ops,
            optimizer_builder_fun=add_optimizer,
            devices=range(4)
        )

        # Just create and run net and confirm no exception is thrown
        workspace.RunNetOnce(model.param_init_net)
        workspace.CreateNet(model.net)
        workspace.RunNet(model.net)

    @unittest.skip("Test fails on GPU/RE")
    def test_synchronization_barrier(self):
        def run(comm_rank, comm_size, tmpdir):
            def add_input_ops(model):
                pass

            def add_model_ops(model, loss_scale):
                return []

            def add_optimizer(model):
                pass

            store_handler = "store_handler"
            workspace.RunOperatorOnce(
                core.CreateOperator(
                    "FileStoreHandlerCreate",
                    [],
                    [store_handler],
                    path=tmpdir))
            rendezvous = dict(
                kv_handler=store_handler,
                shard_id=comm_rank,
                num_shards=comm_size,
                engine='GLOO',
            )

            model = cnn.CNNModelHelper(
                order="NHWC",
                name="test",
            )
            data_parallel_model.Parallelize_CPU(
                model,
                input_builder_fun=add_input_ops,
                forward_pass_builder_fun=add_model_ops,
                optimizer_builder_fun=add_optimizer,
                devices=[1, 2, 3],
                rendezvous=rendezvous
            )
            data_parallel_model.RunInitNet(model)

            for _ in range(2):
Loading ...