Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
workloadmgr / workloadmgr / virt / libvirt / migration_vm_flow.py
Size: Mime:
import datetime
import hashlib
import json
import math
import os
import pickle
from queue import Queue
import re
import shutil
import ssl
import socket
import subprocess
from subprocess import check_output
import tempfile
import time
from urllib.parse import urlparse
import uuid

from oslo_config import cfg
from oslo_messaging._drivers import amqp

from taskflow import engines
from taskflow import task
from taskflow.listeners import printing
from taskflow.patterns import unordered_flow as uf
from taskflow.patterns import linear_flow as lf

from pyVmomi import vim, vmodl

from novaclient.exceptions import Unauthorized as nova_unauthorized

from workloadmgr.db.workloadmgrdb import WorkloadMgrDB
from workloadmgr.virt import driver
from workloadmgr.virt import qemuimages
from workloadmgr.virt import power_state
from workloadmgr.virt import driver

from workloadmgr.openstack.common import log as logging
from workloadmgr.openstack.common import fileutils
from workloadmgr.openstack.common import jsonutils
from workloadmgr.openstack.common import timeutils

from workloadmgr.image import glance
from workloadmgr.volume import cinder
from workloadmgr.compute import nova
from workloadmgr.network import neutron
from workloadmgr.keymanager import barbican

from workloadmgr.datamover import contego

from workloadmgr.network import neutron
from workloadmgr.workloads import workload_utils
from workloadmgr.workflows import vmtasks_openstack

from workloadmgr.vault import vault

from workloadmgr import utils
from workloadmgr import flags
from workloadmgr import autolog
from workloadmgr import exception
from workloadmgr.workflows.vmtasks import FreezeVM, ThawVM
from workloadmgr.utils import (get_vcenter_service_instance,
        get_vcenter_snapshots_by_name_recursively)
from workloadmgr.pyvmomi_tools.tasks import wait_for_tasks


LOG = logging.getLogger(__name__)
Logger = autolog.Logger(LOG)
FLAGS = flags.FLAGS
CONF = cfg.CONF


def get_new_volume_type(instance_options, volume_id, volume_type):
    if instance_options and 'vdisks' in instance_options:
        for voloption in instance_options['vdisks']:
            if voloption['id'].lower() == volume_id:
                volume_type = voloption['new_volume_type']
                break

    if volume_type is None or volume_type.lower() == 'none':
        volume_type = None
    return volume_type


def is_supported_backend(volume_type):
    return True


def get_availability_zone(instance_options, volume_id=None, az=None):
    availability_zone = None
    # find the mapping for volume
    if volume_id is not None and 'vdisks' in instance_options and len(
            instance_options['vdisks']) > 0:
        for vdisk in instance_options['vdisks']:
            if vdisk['id'] == volume_id:
                if 'availability_zone' in vdisk and vdisk['availability_zone'] != '':
                    availability_zone = vdisk.get('availability_zone')
                elif az is not None:
                    availability_zone = az
                else:
                    availability_zone = None
                break
    elif volume_id is not None and az is not None and az != '':
        return az
    else:
        # else find the mapping for VM
        if volume_id is not None and az == '':
            availability_zone = None
        elif instance_options and 'availability_zone' in instance_options and \
                instance_options['availability_zone'] != '':
            availability_zone = instance_options['availability_zone']
        else:
            if CONF.default_production_availability_zone == 'None':
                availability_zone = None
            else:
                availability_zone = CONF.default_production_availability_zone

    if availability_zone == '':
        return None
    return availability_zone


class MigrateTask(task.Task):
    def check_migrate_cancellation(self, cancel_file_path):
        """
        This method runs prior execute and makes sure user has not cancelled the operation
        """
        if cancel_file_path and os.path.exists(cancel_file_path):
            LOG.exception(_("Use has initiated the migration cancel operation"))
            raise Exception('User initiated cancel request')


class MigrateVolume(MigrateTask):
    """
       Migrate VMDK file to a Cinder Disk
       volumes including iscsi and fc channel volumes
    """

    def execute(self, context, migration_id, instance_options, volume_type,
                vm_resource_id, image_id=None, cancel_file_path=None):
        self.check_migrate_cancellation(cancel_file_path)
        return self.execute_with_log(
            context, migration_id, instance_options,
            volume_type, vm_resource_id, image_id=image_id)

    def revert(self, *args, **kwargs):
        return self.revert_with_log(*args, **kwargs)

    @autolog.log_method(Logger, 'MigrateVolume.execute')
    def execute_with_log(
        self, context, migration_id, instance_options,
        volume_type, vm_resource_id, image_id=None):
        self.migrated_volume = None
        self.db = db = WorkloadMgrDB().db
        self.cntx = amqp.RpcContext.from_dict(context)
        self.volume_service = volume_service = cinder.API()
        migration_obj = db.migration_get(self.cntx, migration_id)
        migration_plan_obj = db.migration_plan_get(
            self.cntx, migration_obj.migration_plan_id)
        migration_plan_vm_resource = db.migration_plan_vm_resource_get(
            self.cntx, vm_resource_id)

        time_offset = datetime.datetime.now() - datetime.datetime.utcnow()
        volume_size = db.get_metadata_value(
            migration_plan_vm_resource.metadata, 'volume_size')
        volume_name = db.get_metadata_value(
            migration_plan_vm_resource.metadata, 'volume_name')
        volume_description = db.get_metadata_value(
            migration_plan_vm_resource.metadata, 'volume_description')
        volume_id = db.get_metadata_value(
            migration_plan_vm_resource.metadata, 'volume_id')
        volume_metadata = json.loads(db.get_metadata_value(
            migration_plan_vm_resource.metadata, 'metadata', "{}"))
        volume_image_metadata = json.loads(db.get_metadata_value(
            migration_plan_vm_resource.metadata, 'volume_image_metadata', "{}"))
        az = ''
        if db.get_metadata_value(
                migration_plan_vm_resource.metadata, 'availability_zone'):
            az = db.get_metadata_value(
                migration_plan_vm_resource.metadata,
                'availability_zone')
        availability_zone = get_availability_zone(instance_options,
                                                  volume_id=volume_id,
                                                  az=az)

        progressmsg = _('Migrating Volume %s in migration plan %s' %
                        (volume_name, migration_plan_obj.id))
        LOG.debug(progressmsg)
        db.migration_update(
            self.cntx, migration_id, {
                'progress_msg': progressmsg, 'status': 'uploading'})

        if volume_name != "Hard disk 1":
            image_id = None
        self.migrated_volume = volume_service.create(
            self.cntx, volume_size, volume_name, volume_description,
            volume_type=volume_type, metadata=volume_metadata,
            image_id=image_id, availability_zone=availability_zone)

        if not self.migrated_volume:
            raise Exception("Failed to create volume type " + volume_type)

        volume_service.set_volume_image_metadata(
            self.cntx, self.migrated_volume['id'], volume_image_metadata
        )

        start_time = timeutils.utcnow()
        while True:
            time.sleep(10)
            self.migrated_volume = volume_service.get(
                self.cntx, self.migrated_volume['id'])
            if self.migrated_volume['status'].lower() == 'available' or\
                    self.migrated_volume['status'].lower() == 'error':
                break
            now = timeutils.utcnow()
            if (now - start_time) > datetime.timedelta(minutes=4):
                raise exception.ErrorOccurred(
                    reason='Timeout migrating Volume')

        if self.migrated_volume['status'].lower() == 'error':
            raise Exception("Failed to create volume type " + volume_type)

        db.migration_vm_resource_create(self.cntx,
                                           {'migration_id': migration_id,
                                            'vm_id': migration_plan_vm_resource.vm_id,
                                            'resource_type': 'disk',
                                            'resource_name': volume_name,
                                            'status': self.migrated_volume['status'].lower(),
                                            'size': volume_size,
                                            'time_taken': int((timeutils.utcnow() - migration_obj.created_at).total_seconds())})
        return self.migrated_volume['id']

    @autolog.log_method(Logger, 'MigrateVolume.revert')
    def revert_with_log(self, *args, **kwargs):
        try:
            if self.migrated_volume:
                self.volume_service.delete(self.cntx, self.migrated_volume)
        except BaseException as bex:
            LOG.exception(bex)


class MigrateInstanceFromVolume(MigrateTask):
    """
       Migrate instance to cinder volume
    """

    def execute(self, context, vmname, migration_id,
                volume_id, migration_type, instance_options,
                migrated_security_groups, migrated_nics,
                migrated_compute_flavor_id, keyname,
                config_drive, ordered_interfaces, userdata, cancel_file_path=None):
        self.check_migrate_cancellation(cancel_file_path)
        return self.execute_with_log(context, vmname, migration_id,
                                     volume_id, migration_type, instance_options,
                                     migrated_security_groups, migrated_nics,
                                     migrated_compute_flavor_id, keyname,
                                     config_drive, ordered_interfaces, userdata)

    def revert(self, *args, **kwargs):
        return self.revert_with_log(*args, **kwargs)

    @autolog.log_method(Logger, 'MigrateInstanceFromVolume.execute')
    def execute_with_log(self, context, vmname, migration_id,
                         volume_id, migration_type, instance_options,
                         migrated_security_groups, migrated_nics,
                         migrated_compute_flavor_id, keyname,
                         config_drive, ordered_interfaces, userdata):
        self.migrated_instance = None
        self.db = db = WorkloadMgrDB().db
        self.cntx = amqp.RpcContext.from_dict(context)
        self.compute_service = compute_service = nova.API(
            production=True)

        migration_obj = db.migration_get(self.cntx, migration_id)
        migration_plan_obj = db.migration_plan_get(
            self.cntx, migration_obj.migration_plan_id)

        migrated_instance_name = vmname
        if instance_options and 'name' in instance_options:
            migrated_instance_name = instance_options['name']

        LOG.debug('Creating Instance ' + migrated_instance_name)
        migration_plan_obj = db.migration_plan_update(
            self.cntx, migration_plan_obj.id,
            {
                'progress_msg': 'Creating Instance: ' + migrated_instance_name,
                'status': 'migrating'
            })
        availability_zone = get_availability_zone(instance_options)

        if ordered_interfaces:
            ordered_nics = []
            for ip in ordered_interfaces.split(','):
                for nic in migrated_nics:
                    if 'backup_ip_address' in nic and nic['backup_ip_address'] == ip:
                        ordered_nics.append(nic)
            migrated_nics = ordered_nics

        for nic in migrated_nics:
            nic.pop('backup_ip_address', None)

        migrated_compute_flavor = compute_service.get_flavor_by_id(
            self.cntx, migrated_compute_flavor_id)

        self.volume_service = volume_service = cinder.API()
        migrated_volume = volume_service.get(self.cntx, volume_id)
        try:
            volume_service.set_bootable(self.cntx, migrated_volume)
        except Exception as ex:
            LOG.exception(ex)

        # TODO: Is this vda or sda. How do we determine. Looks at nova libvirt code
        block_device_mapping = {'vda': volume_id + ":vol"}
        server_kwargs = {}
        if userdata:
            server_kwargs["userdata"] = userdata
        if config_drive:
            server_kwargs["config_drive"] = config_drive
        self.migrated_instance = migrated_instance = \
            compute_service.create_server(self.cntx, migrated_instance_name,
                                          None, migrated_compute_flavor,
                                          nics=migrated_nics,
                                          block_device_mapping=block_device_mapping,
                                          security_groups=[],
                                          key_name=keyname,
                                          availability_zone=availability_zone,
                                          **server_kwargs)

        if not migrated_instance:
            raise Exception("Cannot create instance from volume")

        start_time = timeutils.utcnow()
        while hasattr(
                migrated_instance,
                'status') == False or migrated_instance.status != 'ACTIVE':
            LOG.debug(
                'Waiting for the instance ' +
                migrated_instance.id +
                ' to boot')
            time.sleep(10)
            migrated_instance = compute_service.get_server_by_id(
                self.cntx, migrated_instance.id)
            if hasattr(migrated_instance, 'status'):
                if migrated_instance.status == 'ERROR':
                    raise Exception(
                        _("Error creating instance " + migrated_instance.id))
            now = timeutils.utcnow()
            if (now - start_time) > datetime.timedelta(minutes=4):
                raise exception.ErrorOccurred(
                    reason='Timeout waiting for the instance to boot from volume')

        self.migrated_instance = migrated_instance
        return migrated_instance.id

    @autolog.log_method(Logger, 'MigrateInstanceFromVolume.revert')
    def revert_with_log(self, *args, **kwargs):
        try:
            if self.migrated_instance:
                self.compute_service.delete(self.cntx, self.migrated_instance.id)
        except BaseException as bex:
            LOG.exception(bex)


class AdjustSG(MigrateTask):
    """
       Adjust security groups
    """
    def execute(self, context, migrated_instance_id,
                migrated_security_groups, cancel_file_path=None):
        self.check_migrate_cancellation(cancel_file_path)
        return self.execute_with_log(context, migrated_instance_id,
                                     migrated_security_groups)

    def revert(self, *args, **kwargs):
        return self.revert_with_log(*args, **kwargs)

    @autolog.log_method(Logger, 'AdjustSG.execute')
    def execute_with_log(self, context, migrated_instance_id,
                         migrated_security_groups):

        try:
            self.db = db = WorkloadMgrDB().db
            self.cntx = amqp.RpcContext.from_dict(context)

            # refresh the token
            self.cntx = nova._get_tenant_context(self.cntx)

            self.compute_service = compute_service = nova.API(
                production=True)
            sec_groups = compute_service.list_security_group(
                self.cntx, migrated_instance_id)
            sec_group_ids = [sec.id for sec in sec_groups]
            ids_to_remove = set(sec_group_ids) - \
                set(migrated_security_groups.values())
            ids_to_add = set(
                migrated_security_groups.values()) - set(sec_group_ids)
            # remove security groups that were not asked for
            for sec in ids_to_remove:
                compute_service.remove_security_group(
                    self.cntx, migrated_instance_id, sec)

            for sec in ids_to_add:
                try:
                    compute_service.add_security_group(
                        self.cntx, migrated_instance_id, sec)
                except Exception as ex:
                    if hasattr(ex, 'message'):
                        if all(text in ex.message.lower() for text in ['port_security_enabled','network']):
                            LOG.warning("Could not add security group: {} to the VM: {}".format(sec, migrated_instance_id))
                            LOG.warning("Error: {}".format(ex))
                    else:
                        raise ex
        except Exception as ex:
            LOG.exception(ex)
            msg = "Could not update security groups on the " \
                  "migrated instance %s" % migrated_instance_id
            LOG.warning(msg)
            raise ex

    @autolog.log_method(Logger, 'AdjustSG.revert')
    def revert_with_log(self, *args, **kwargs):
        pass


def AdjustInstanceSecurityGroups(context, migration_id):
    flow = lf.Flow("adjustinstancesecuritygrouplf")
    db = WorkloadMgrDB().db
    flow.add(AdjustSG("AdjustSG", rebind=dict(cancel_file_path='cancel_file_path_' + migration_id)))
    return flow


def MigrateVolumes(context, instance, migration_planobj, migration_id):
    flow = lf.Flow("migratevolumeslf")

    db = WorkloadMgrDB().db
    migration_plan_vm_resources = db.migration_plan_vm_resources_get(
        context, instance['vm_id'], migration_planobj.id)

    for migration_plan_vm_resource in migration_plan_vm_resources:
        if migration_plan_vm_resource.resource_type != 'disk':
            continue

        if db.get_metadata_value(migration_plan_vm_resource.metadata, 'volume_id'):
            volume_type = db.get_metadata_value(
                migration_plan_vm_resource.metadata, 'volume_type')
            if volume_type:
                volume_type = volume_type.lower()
            else:
                volume_type = '__DEFAULT__'
            volume_id = db.get_metadata_value(
                migration_plan_vm_resource.metadata, 'volume_id').lower()

            flow.add(MigrateVolume("MigrateVolume" + migration_plan_vm_resource.id,
                                   rebind=dict(vm_resource_id=migration_plan_vm_resource.id,
                                               volume_type='volume_type_' + migration_plan_vm_resource.id,
                                               cancel_file_path='cancel_file_path_' + migration_id),
                                   provides='volume_id_' + str(migration_plan_vm_resource.id)))

    return flow


def MigrateInstance(context, instance, migration_planobj, migration_id):
    flow = lf.Flow("migrationinstancelf")
    db = WorkloadMgrDB().db
    migration_plan_vm_resources = db.migration_plan_vm_resources_get(
        context, instance['vm_id'], migration_planobj.id)

    if not migration_plan_vm_resources:
        raise Exception("Migrating VM instance failed. Please make sure to perform DiscoverVM's before Migration")

    bootdisk = set()
    for migration_plan_vm_resource in migration_plan_vm_resources:
        if migration_plan_vm_resource.resource_type != 'disk':
            continue
        if migration_plan_vm_resource.resource_name in ('Hard disk 1',):
            bootdisk.add(migration_plan_vm_resource.resource_name)

    if len(bootdisk) == 2:
        bootdisk = set(['Hard disk 1'])

    for migration_plan_vm_resource in migration_plan_vm_resources:
        if migration_plan_vm_resource.resource_type != 'disk':
            continue
        if not migration_plan_vm_resource.resource_name in bootdisk:
            continue

        flow.add(MigrateInstanceFromVolume(
            "MigrateInstanceFromVolume" + instance['vm_id'],
            rebind=dict(volume_id='volume_id_' + str(migration_plan_vm_resource.id),
                        cancel_file_path='cancel_file_path_' + migration_id),
            provides='migrated_instance_id'))

    return flow


class PowerOffInstance(MigrateTask):
    """
       Power Off migrated instance
    """

    def execute(self, context, migrated_instance_id, cancel_file_path=None):
        self.check_migrate_cancellation(cancel_file_path)
        return self.execute_with_log(context, migrated_instance_id)

    def revert(self, *args, **kwargs):
        return self.revert_with_log(*args, **kwargs)

    @autolog.log_method(Logger, 'PowerOffInstance.execute')
    def execute_with_log(self, context, migrated_instance_id):
        self.cntx = amqp.RpcContext.from_dict(context)
        self.compute_service = compute_service = \
            nova.API(production=True)

        compute_service.stop(self.cntx, migrated_instance_id)

        migrated_instance = compute_service.get_server_by_id(
            self.cntx, migrated_instance_id)
        start_time = timeutils.utcnow()
        while hasattr(migrated_instance, 'status') == False or \
                migrated_instance.status != 'SHUTOFF':
            LOG.debug('Waiting for the instance ' + migrated_instance_id +
                      ' to shutdown')
            time.sleep(10)
            migrated_instance = compute_service.get_server_by_id(
                self.cntx, migrated_instance_id)
            if hasattr(migrated_instance, 'status'):
                if migrated_instance.status == 'ERROR':
                    raise Exception(_("Error creating instance " +
                                      migrated_instance_id))
            now = timeutils.utcnow()
            if (now - start_time) > datetime.timedelta(minutes=4):
                raise exception.ErrorOccurred(
                    reason='Timeout waiting for '
                    'the instance to boot from volume')

        self.migrated_instance = migrated_instance
        return

    @autolog.log_method(Logger, 'PowerOffInstance.revert')
    def revert_with_log(self, *args, **kwargs):
        pass


class PowerOnInstance(MigrateTask):
    """
       Power On migrated instance
    """

    def execute(self, context, migrated_instance_id, cancel_file_path=None):
        self.check_migrate_cancellation(cancel_file_path)
        return self.execute_with_log(context, migrated_instance_id)

    def revert(self, *args, **kwargs):
        return self.revert_with_log(*args, **kwargs)

    @autolog.log_method(Logger, 'PowerOnInstance.execute')
    def execute_with_log(self, context, migrated_instance_id):
        self.cntx = amqp.RpcContext.from_dict(context)
        self.compute_service = compute_service = \
            nova.API(production=True)

        compute_service.start(self.cntx, migrated_instance_id)

        migrated_instance = compute_service.get_server_by_id(
            self.cntx, migrated_instance_id)
        start_time = timeutils.utcnow()
        while hasattr(migrated_instance, 'status') == False or \
                migrated_instance.status != 'ACTIVE':
            LOG.debug('Waiting for the instance ' + migrated_instance_id +
                      ' to boot')
            time.sleep(10)
            migrated_instance = compute_service.get_server_by_id(
                self.cntx, migrated_instance_id)
            if hasattr(migrated_instance, 'status'):
                if migrated_instance.status == 'ERROR':
                    raise Exception(_("Error creating instance " +
                                      migrated_instance_id))
            now = timeutils.utcnow()
            if (now - start_time) > datetime.timedelta(minutes=5):
               # If machine is not up then try to start it again
                try:
                    LOG.warning('Migrated VM(%s) is not started after 5\
                           minutes, restarting it'%migrated_instance.id)
                    compute_service.start(self.cntx, migrated_instance.id)
                    break
                except Exception as ex:
                    LOG.warning('Migrated VM(%s) is taking more than expected\
                                 time to start, proceeding with migrate \
                                 flow.'%migrated_instance.id)
                    break

        self.migrated_instance = migrated_instance
        return


    @autolog.log_method(Logger, 'PowerOnInstance.revert')
    def revert_with_log(self, *args, **kwargs):
        pass


class PowerOffvCenterVM(MigrateTask):
    """
       Power Off source VM
    """
    def execute(self, context, vmid, vmname, cancel_file_path=None):
        self.check_migrate_cancellation(cancel_file_path)
        return self.execute_with_log(context, vmid, vmname)

    def revert(self, *args, **kwargs):
        return self.revert_with_log(*args, **kwargs)

    @autolog.log_method(Logger, 'PowerOffvCenterVM.execute')
    def execute_with_log(self, context, vmid, vmname):
        self.original_power_state = None
        self.vmid = vmid
        si = get_vcenter_service_instance()
        search_index = si.content.searchIndex
        vcenter_vm = search_index.FindByUuid(None, vmid, True, True)
        if vcenter_vm == None:
            raise Exception("vCenter VM %s not found" % vmname)

        current_power_state = vcenter_vm.runtime.powerState
        if current_power_state == "poweredOn":
            self.original_power_state = "poweredOn"
            tasks = [vm.PowerOff() for vm in [vcenter_vm]]
            wait_for_tasks(si, tasks)

        return current_power_state

    @autolog.log_method(Logger, 'PowerOffvCenterVM.revert')
    def revert_with_log(self, *args, **kwargs):
        try:
            if self.original_power_state and self.original_power_state == "poweredOn":
                si = get_vcenter_service_instance()
                search_index = si.content.searchIndex
                vcenter_vm = search_index.FindByUuid(None, self.vmid, True, True)
                if vcenter_vm == None:
                    return

                current_power_state = vcenter_vm.runtime.powerState
                if current_power_state != self.original_power_state:
                    tasks = [vm.PowerOn() for vm in [vcenter_vm]]
                    wait_for_tasks(si, tasks)
        except Exception as ex:
            LOG.exception(ex)


class PowerOnvCenterVM(MigrateTask):
    """
       Power On source VM
    """
    def execute(self, context, vmid, vmname, power_state, cancel_file_path=None):
        self.check_migrate_cancellation(cancel_file_path)
        return self.execute_with_log(context, vmid, vmname, power_state)

    def revert(self, *args, **kwargs):
        return self.revert_with_log(*args, **kwargs)

    @autolog.log_method(Logger, 'PowerOnvCenterVM.execute')
    def execute_with_log(self, context, vmid, vmname, power_state):
        si = get_vcenter_service_instance()
        search_index = si.content.searchIndex
        vcenter_vm = search_index.FindByUuid(None, vmid, True, True)
        if vcenter_vm == None:
            raise Exception("vCenter VM %s not found" % vmname)

        current_power_state = vcenter_vm.runtime.powerState
        if power_state != current_power_state and power_state == "poweredOn":
            tasks = [vm.PowerOn() for vm in [vcenter_vm]]
            wait_for_tasks(si, tasks)

        return


    @autolog.log_method(Logger, 'PowerOnvCenterVM.revert')
    def revert_with_log(self, *args, **kwargs):
        pass


class AssignFloatingIP(MigrateTask):
    """
       Assign floating IP address to migrated instance.
    """

    def execute(self, context, migrated_instance_id, migrated_nics,
                migrated_net_resources, cancel_file_path=None):
        self.check_migrate_cancellation(cancel_file_path)
        return self.execute_with_log(
            context, migrated_instance_id,
            migrated_nics, migrated_net_resources)

    def revert(self, *args, **kwargs):
        return self.revert_with_log(*args, **kwargs)

    @autolog.log_method(Logger, 'AssignFloatingIP.execute')
    def execute_with_log(self, context, migrated_instance_id, migrated_nics,
                         migrated_net_resources):
        self.cntx = amqp.RpcContext.from_dict(context)
        neutron_client = neutron.get_client(context, refresh_token=True)
        for mac, details in migrated_net_resources.items():
            for nic in migrated_nics:
                try:
                    if ((details.get('id', None) and nic.get('port-id', None)) and
                            (details.get('id', None) == nic.get('port-id', None))) or\
                            details.get('ip_address', None) == nic.get('v4-fixed-ip') and \
                            details.get('floating_ip', None) is not None:

                        if details.get('id', None) and nic.get(
                                'port-id', None):
                            floating_ip_json = json.loads(
                                details.get('floating_ip', str({})))
                            floating_ip = floating_ip_json.get('addr', None)
                            fixed_ip = details['fixed_ips'][0]['ip_address']
                        else:
                            floating_ip = details.get('floating_ip', None)
                            fixed_ip = details.get('fixed_ip', None)

                        body = {"floatingip":
                                    {
                                     'fixed_ip_address': fixed_ip,
                                     'port_id': nic.get('port-id', None)
                                    }
                               }
                        floating_ips_list = neutron_client.list_floatingips(
                                                 project_id=context['project_id'])
                        for fp in floating_ips_list['floatingips']:
                            if fp.get('floating_ip_address', '') == floating_ip and\
                                fp.get('fixed_ip_address', None) is None and\
                                fp.get('port_id', None) is None:
                                neutron_client.update_floatingip(fp.get('id'), body=body)

                except Exception as ex:
                    LOG.exception(ex)
                    # we will ignore any exceptions during assigning floating
                    # ip address
                    pass
        return

    @autolog.log_method(Logger, 'AssignFloatingIP.revert')
    def revert_with_log(self, *args, **kwargs):
        pass


class AttachVolume(MigrateTask):
    """
       Attach volume to the instance
    """

    def execute(self, context, migrated_instance_id,
                volume_id, devname, cancel_file_path=None):
        self.check_migrate_cancellation(cancel_file_path)
        return self.execute_with_log(context, migrated_instance_id, volume_id,
                                     devname)

    def revert(self, *args, **kwargs):
        return self.revert_with_log(*args, **kwargs)

    @autolog.log_method(Logger, 'AttachVolume.execute')
    def execute_with_log(self, context, migrated_instance_id, volume_id,
                         devname):
        self.migrated_volume = None
        self.migrated_instance_id = migrated_instance_id
        self.volume_id = volume_id
        self.db = db = WorkloadMgrDB().db
        self.cntx = amqp.RpcContext.from_dict(context)
        # refresh the token
        self.cntx = nova._get_tenant_context(self.cntx)
        self.compute_service = compute_service = nova.API(
            production=True)
        self.contego_service = contego_service = contego.API(
            production=True)
        self.volume_service = volume_service = cinder.API()
        self.migrated_volume = migrated_volume = volume_service.get(
                self.cntx, volume_id)
        start_time = timeutils.utcnow()
        while migrated_volume['status'].lower() not in ['available', 'error']:
            LOG.debug('Waiting for volume %s to be available' % migrated_volume['id'])
            time.sleep(10)
            migrated_volume = volume_service.get(self.cntx, volume_id)
            now = timeutils.utcnow()
            if (now - start_time) > datetime.timedelta(minutes=4):
                raise exception.ErrorOccurred(
                    reason='Timeout waiting for the volume ' + volume_id + ' to be available')

        LOG.debug('Attaching volume ' + volume_id)
        devchr = chr(ord('a') + int(devname.split('Hard disk ')[1]) - 1)
        devpath = os.path.join("/dev", "vd" + devchr)
        compute_service.attach_volume(
            self.cntx, migrated_instance_id, volume_id, devpath)
        time.sleep(15)

    @autolog.log_method(Logger, 'AttachVolume.revert')
    def revert_with_log(self, *args, **kwargs):
        try:
            if self.migrated_volume:
                compute_service = compute_service = nova.API(production=True)
                compute_service.detach_volume(self.cntx, self.migrated_instance_id, self.volume_id)
        except BaseException as bex:
            LOG.exception(bex)


def PowerOffInstanceFlow(context, migration_id):

    flow = lf.Flow("poweroffinstancelf")
    flow.add(PowerOffInstance("PowerOffInstance", rebind=dict(cancel_file_path='cancel_file_path_' + migration_id)))
    return flow


def PowerOnInstanceFlow(context, migration_id):

    flow = lf.Flow("poweroninstancelf")
    flow.add(PowerOnInstance("PowerOnInstance", rebind=dict(cancel_file_path='cancel_file_path_' + migration_id)))
    return flow

def PowerOffvCenterVMFlow(context, instance, migration_id):

    flow = lf.Flow("poweroffvcentrvmlf")
    flow.add(PowerOffvCenterVM("PowerOffvCenterVM" + instance['vm_id'],
                               provides='power_state_' + instance['vm_id'],
                               rebind=dict(cancel_file_path='cancel_file_path_' + migration_id)))
    return flow


def PowerOnvCenterVMFlow(context, instance, migration_id):

    flow = lf.Flow("poweronvcentervmlf")
    flow.add(PowerOnvCenterVM(
        "PowerOnvCenterVM" + instance['vm_id'],
        rebind=dict(power_state='power_state_' + str(instance['vm_id']),
                    cancel_file_path='cancel_file_path_' + migration_id)))
    return flow


def AttachVolumes(context, instance, migration_planobj, migration_id):
    flow = lf.Flow("attachvolumeslf")
    db = WorkloadMgrDB().db
    migration_plan_vm_resources = db.migration_plan_vm_resources_get(
        context, instance['vm_id'], migration_planobj.id)

    bootdisk = set()
    for migration_plan_vm_resource in migration_plan_vm_resources:
        if migration_plan_vm_resource.resource_type != 'disk':
            continue
        if migration_plan_vm_resource.resource_name in ('Hard disk 1'):
            bootdisk.add(migration_plan_vm_resource.resource_name)

    if len(bootdisk) == 2:
        bootdisk = set(['Hard disk 1'])

    for migration_plan_vm_resource in migration_plan_vm_resources:
        if migration_plan_vm_resource.resource_type != 'disk':
            continue
        if migration_plan_vm_resource.resource_name in bootdisk:
            continue
        if db.get_metadata_value(migration_plan_vm_resource.metadata, 'volume_id'):
            flow.add(AttachVolume(
                "AttachVolume" + migration_plan_vm_resource.id, 
                rebind=dict(volume_id='volume_id_' + str(migration_plan_vm_resource.id),
                            devname='devname_' + str(migration_plan_vm_resource.id),
                            cancel_file_path='cancel_file_path_' + migration_id)))
    return flow


def AssignFloatingIPFlow(context, migration_id):

    flow = lf.Flow("assignfloatingiplf")
    flow.add(AssignFloatingIP("AssignFloatingIP", rebind=dict(cancel_file_path='cancel_file_path_' + migration_id)))
    return flow

def InitalSnapshotvCenterVMFlow(context, instance, migration_id):

    flow = lf.Flow("initalsnapshotvcentrvmlf")
    flow.add(SnapshotvCenterVM("InitialSnapshotvCenterVM" + instance['vm_id'],
        provides=('prev_snapshot_changeId_' + str(instance['vm_id']),
            'initial_snapshot_guestInfo_' + str(instance['vm_id']),
            'vcenter_vm_snapshot_name_' + str(instance['vm_id']),
            'snapshot_upload_tracking_file_' + str(instance['vm_id'])),
        rebind=dict(progress_tracking_file_path=\
                   'progress_tracking_file_path_' + str(instance['vm_id']),
                    cancel_file_path='cancel_file_path_' + migration_id)))
    return flow

def SecondSnapshotvCenterVMFlow(context, instance, migration_id):

    flow = lf.Flow("secondsnapshotvcentrvmlf")
    flow.add(SnapshotvCenterVM("SecondSnapshotvCenterVM" + instance['vm_id'],
        provides=('prev_snapshot_changeId_' + str(instance['vm_id']),
            'second_snapshot_guestInfo_' + str(instance['vm_id']),
            'vcenter_vm_snapshot_name_' + str(instance['vm_id']),
            'snapshot_upload_tracking_file_' + str(instance['vm_id'])),
        rebind=dict(progress_tracking_file_path=\
                   'progress_tracking_file_path_' + str(instance['vm_id']),
                    prev_snapshot_changeId=\
                   'prev_snapshot_changeId_' + str(instance['vm_id']),
                    cancel_file_path='cancel_file_path_' + migration_id)))
    return flow


def FinalSnapshotvCenterVMFlow(context, instance, migration_id):

    flow = lf.Flow("finalsnapshotvcentrvmlf")
    flow.add(SnapshotvCenterVM("FinalSnapshotvCenterVM" + instance['vm_id'],
        provides=('prev_snapshot_changeId_' + str(instance['vm_id']),
            'final_snapshot_guestInfo_' + str(instance['vm_id']),
            'vcenter_vm_snapshot_name_' + str(instance['vm_id']),
            'snapshot_upload_tracking_file_' + str(instance['vm_id'])),
        rebind=dict(progress_tracking_file_path=\
                   'progress_tracking_file_path_' + str(instance['vm_id']),
                    prev_snapshot_changeId=\
                   'prev_snapshot_changeId_' + str(instance['vm_id']),
                    cancel_file_path='cancel_file_path_' + migration_id)))
    return flow

class SnapshotvCenterVM(MigrateTask):
    """
       Take snapshot of a source VM
    """
    def execute(self, context, vmid, vmname, progress_tracking_file_path,
            prev_snapshot_changeId = None, cancel_file_path=None):
        self.check_migrate_cancellation(cancel_file_path)
        return self.execute_with_log(context, vmid, vmname,
               progress_tracking_file_path, prev_snapshot_changeId)

    def revert(self, *args, **kwargs):
        return self.revert_with_log(*args, **kwargs)

    @autolog.log_method(Logger, 'SnapshotvCenterVM.execute')
    def execute_with_log(self, context, vmid, vmname,
            progress_tracking_file_path, prev_snapshot_changeId):
        self.vmid = vmid
        si = get_vcenter_service_instance()
        search_index = si.content.searchIndex
        vcenter_vm = search_index.FindByUuid(None, vmid, True, True)
        if vcenter_vm == None:
            raise Exception("vCenter VM %s not found" % vmname)
        vm_moref_id = vcenter_vm._moId

        snapshot_name = "TempTrilioSnap_" + vmid
        description = "Trilio created snapshot for warm migration"
        dump_memory = False
        quiesce = True

        if vcenter_vm.snapshot:
            # Remove the snapshot if already exists with same name
            snap_obj = get_vcenter_snapshots_by_name_recursively(
                    vcenter_vm.snapshot.rootSnapshotList, snapshot_name)
            rem_tasks = []
            for s_obj in snap_obj:
                rem_tasks.append(s_obj.snapshot.RemoveSnapshot_Task(True))
            if rem_tasks:
                wait_for_tasks(si, rem_tasks)

        # Create new snapshot
        task = vcenter_vm.CreateSnapshot(snapshot_name, description,
                                        dump_memory, quiesce)

        wait_for_tasks(si, [task])
        snap_obj = get_vcenter_snapshots_by_name_recursively(
                vcenter_vm.snapshot.rootSnapshotList, snapshot_name)
        if snap_obj:
            snap_obj = snap_obj[0]
            snapshot_id = snap_obj.id
        else:
            raise Exception("Snapshot of vCenter VM %s could not be created" % vmname)

        #prepare contego parameter
        guest_vm_info = {
                vmname: {
                    "moref": vcenter_vm._moId,
                    "disks": {},
                    }
                }

        valid_virtual_backingfiles = [vim.vm.device.VirtualDisk.FlatVer2BackingInfo,
                vim.vm.device.VirtualDisk.SparseVer2BackingInfo,
                vim.vm.device.VirtualDisk.RawDiskMappingVer1BackingInfo,
                vim.vm.device.VirtualDisk.RawDiskVer2BackingInfo,
                ]
        progress_tracking_dir = os.path.dirname(
                progress_tracking_file_path)
        snapshot_upload_tracking_file = os.path.join(progress_tracking_dir,
                'snapshot_'+str(snapshot_id))

        _ret_snapshot_changeId = {}

        for virtual_dev in snap_obj.snapshot.config.hardware.device:
            if type(virtual_dev) != vim.vm.device.VirtualDisk:
                continue
            if type(virtual_dev.backing) not in valid_virtual_backingfiles:
                raise Exception("virtual disk % of VM % ,does not support \
                        Incremental snapshot"%(virtual_dev.backing.fileName,
                            vmname))
            devname = virtual_dev.deviceInfo.label
            diskname = "sd" + chr(ord('a') + int(devname.split(
                'Hard disk ')[1]) - 1)
            guest_vm_info[vmname]['disks'][diskname] = {}
            guest_vm_info[vmname]['disks'][diskname]['diskpath'] = \
                    virtual_dev.backing.fileName

            snapshot_changeId = virtual_dev.backing.changeId
            if not snapshot_changeId:
                raise Exception("Change ID could not be found for the "
                        "Snapshot %s of VM %s" % (snapshot_id, vmname))

            _ret_snapshot_changeId[devname] = snapshot_changeId

            if prev_snapshot_changeId and prev_snapshot_changeId.get(devname):
                # Logic to get the delta disk sectors and legth
                # and store it in a file in the backup target
                diskChangeInfo = vcenter_vm.QueryChangedDiskAreas(
                        snapshot=snap_obj.snapshot,
                        deviceKey=virtual_dev.key, startOffset=0,
                        changeId=prev_snapshot_changeId[devname])

                _, tmp_changeinfo_file = tempfile.mkstemp(
                        dir=progress_tracking_dir)

                total_length = 0;
                with open(tmp_changeinfo_file, 'w') as extentfile:
                    # write the change info in-terms of sectors by
                    # converting from the bytes
                    # default sector size 512bytes
                    for cA in diskChangeInfo.changedArea:
                        extentfile.write("sector:{} length:{}\n".format(
                            int(cA.start/512), int(cA.length/512)))
                        total_length += cA.length
                LOG.info("Total change length : {} bytes ({} KiB)".format(
                    total_length, total_length/1024))
                guest_vm_info[vmname]['disks'][diskname]['extentfile'] = \
                    tmp_changeinfo_file

        # Remove the created snapshots after uploading

        return (_ret_snapshot_changeId, guest_vm_info, snapshot_name,
                snapshot_upload_tracking_file)


    @autolog.log_method(Logger, 'SnapshotvCenterVM.revert')
    def revert_with_log(self, *args, **kwargs):
        try:
            si = get_vcenter_service_instance()
            search_index = si.content.searchIndex
            vcenter_vm = search_index.FindByUuid(None, self.vmid, True, True)
            if vcenter_vm == None or vcenter_vm.snapshot == None:
                return

            snapshot_name = "TempTrilioSnap_" + self.vmid

            snap_obj = get_vcenter_snapshots_by_name_recursively(
                    vcenter_vm.snapshot.rootSnapshotList, snapshot_name)
            rem_tasks = []
            for s_obj in snap_obj:
                rem_tasks.append(s_obj.snapshot.RemoveSnapshot_Task(True))
            if rem_tasks:
                wait_for_tasks(si, rem_tasks)
            #TODO: check for other cleanups
        except BaseException as bex:
            LOG.exception(bex)


def UploadInitialSnapshotvCenterVMFlow(context, instance, migration_plan_obj, migration_id):
    flow = lf.Flow("uploadinitialsnapshotvcentrvmlf")

    flow.add(UploadSnapshotvCenterVM(
             "UploadInitialSnapshotvCenterVM_" + instance['vm_id'],
             rebind=dict(progress_tracking_file_path=\
                        'snapshot_upload_tracking_file_' + str(instance['vm_id']),
                         snapshot_guestInfo=\
                        'initial_snapshot_guestInfo_' + str(instance['vm_id']),
                         cancel_file_path='cancel_file_path_' + migration_id)))
    return flow

def UploadSecondSnapshotvCenterVMFlow(context, instance, migration_plan_obj, migration_id):
    flow = lf.Flow("uploadsecondsnapshotvcentrvmlf")

    flow.add(UploadSnapshotvCenterVM(
             "UploadSecondSnapshotvCenterVM_" + instance['vm_id'],
             rebind=dict(progress_tracking_file_path=\
                        'snapshot_upload_tracking_file_' + str(instance['vm_id']),
                         snapshot_guestInfo=\
                        'second_snapshot_guestInfo_' + str(instance['vm_id']),
                         cancel_file_path='cancel_file_path_' + migration_id),
             inject={'convert_only':False}))
    return flow

def UploadFinalSnapshotvCenterVMFlow(context, instance, migration_plan_obj, migration_id):
    flow = lf.Flow("uploadfinalsnapshotvcentrvmlf")

    flow.add(UploadSnapshotvCenterVM(
             "UploadFinalSnapshotvCenterVM_" + instance['vm_id'],
             rebind=dict(progress_tracking_file_path=\
                        'snapshot_upload_tracking_file_' + str(instance['vm_id']),
                         snapshot_guestInfo=\
                        'final_snapshot_guestInfo_' + str(instance['vm_id']),
                         cancel_file_path='cancel_file_path_' + migration_id),
             inject={'convert_only':False}))
    return flow

def ConvertSnapshotvCenterVMFlow(context, instance, migration_plan_obj, migration_id):
    flow = lf.Flow("convertsnapshotvcentrvmlf")

    flow.add(UploadSnapshotvCenterVM(
             "ConvertSnapshotvCenterVM_" + instance['vm_id'],
             rebind=dict(progress_tracking_file_path=\
                        'progress_tracking_file_path_' + str(instance['vm_id']),
                         snapshot_guestInfo=\
                        'final_snapshot_guestInfo_' + str(instance['vm_id']),
                         prev_snapshot_name=\
                        'vcenter_vm_snapshot_name_' + str(instance['vm_id']),
                         cancel_file_path='cancel_file_path_' + migration_id),
             inject={'convert_only':True}))
    return flow


class UploadSnapshotvCenterVM(MigrateTask):
    def execute(self, context, migration_id, vmid, vmname,
            migrated_instance_id, progress_tracking_file_path,
            snapshot_guestInfo, convert_only=False, prev_snapshot_name= None, cancel_file_path=None):
        self.check_migrate_cancellation(cancel_file_path)
        return self.execute_with_log(context, migration_id,
                                     vmid, vmname, migrated_instance_id,
                                     progress_tracking_file_path,
                                     snapshot_guestInfo,convert_only,
                                     prev_snapshot_name, cancel_file_path)

    def revert(self, *args, **kwargs):
        return self.revert_with_log(*args, **kwargs)

    @autolog.log_method(Logger, 'UploadSnapshotvCenterVM.execute')
    def execute_with_log(self, context, migration_id, vmid, vmname,
            migrated_instance_id, progress_tracking_file_path,
            snapshot_guestInfo, convert_only, prev_snapshot_name, cancel_file_path):
        def _get_vCenter_thumbprint(vcenter_host):
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(1)
            wrappedSocket = ssl.wrap_socket(sock)

            try:
                wrappedSocket.connect((vcenter_host, 443))
            except Exception as ex:
                LOG.exception(ex)
                raise Exception("Cannot retrieve vCenter thumbprint")
            else:
                der_cert = wrappedSocket.getpeercert(True)

                # Print SHA1 Thumbprint
                thumb_sha1 = hashlib.sha1(der_cert).hexdigest()
                s = ""
                for i in range(0, 40, 2):
                    s += thumb_sha1[i:i+2] + ":"
                return s.strip(":")

        # Call into contego to copy the data from backend to volume
        compute_service = nova.API(production=True)
        db = WorkloadMgrDB().db

        # Get a new token, just to be safe
        cntx = amqp.RpcContext.from_dict(context)
        cntx = nova._get_tenant_context(cntx)
        migration_obj = db.migration_get(cntx, migration_id)
        migration_plan_obj = db.migration_plan_get(
            cntx, migration_obj.migration_plan_id)

        server_obj = compute_service.get_server(cntx, migrated_instance_id)
        if server_obj == None:
            raise Exception("Migrated instance %s not found" % migrated_instance_id)

        si = get_vcenter_service_instance()
        search_index = si.content.searchIndex
        vcenter_vm = search_index.FindByUuid(None, vmid, True, True)
        if vcenter_vm == None:
            raise Exception("vCenter VM %s not found" % vmname)

        # Fetch the data center name
        host_name = vcenter_vm.runtime.host.name
        parent = vcenter_vm.runtime.host.parent
        while parent and not isinstance(parent, vim.Datacenter):
            parent = parent.parent

        if not parent:
            raise Exception("Cannot fetch the datacenter name for VM %s" % vmname)

        datacenter_name = parent.name
        o = urlparse(CONF.vcenter_migration.vcenter_url)

        thumbprint = _get_vCenter_thumbprint(o.hostname)

        # We need to send vcenter credentials here
        vast_params = {
            'vcenter_url': CONF.vcenter_migration.vcenter_url,
            'user': CONF.vcenter_migration.vcenter_username,
            'password': CONF.vcenter_migration.vcenter_password,
            'disable_ssl_verification': CONF.vcenter_migration.vcenter_nossl,
            'thumbprint': thumbprint,
            'progress_tracking_file_path': progress_tracking_file_path,
            'vcenter_vm_name': vcenter_vm.summary.config.name,
            'vcenter_datacenter': datacenter_name,
            'vcenter_hostname': host_name,
            'server_obj': server_obj.to_dict(),
            'migration_type': 'warm',
            'guestvm_info': snapshot_guestInfo,
            'convert_only': convert_only
        }

        contego_service = contego.API(production=True)
        contego_service.populate_instance_with_virt_v2v(
            cntx, migrated_instance_id, vast_params)

        basestat = os.stat(progress_tracking_file_path)
        basetime = time.time()
        backup_endpoint = db.get_metadata_value(migration_plan_obj.metadata,
                                                'backup_media_target')
        backup_target = vault.get_backup_target(backup_endpoint)
        while True:
            try:
                self.check_migrate_cancellation(cancel_file_path)
                time.sleep(10)
                async_task_status = {}
                if progress_tracking_file_path:
                    try:
                        async_task_status['status'] = backup_target.read_progress_tracking_file(
                            progress_tracking_file_path
                            )
                    except Exception as ex:
                        LOG.exception(ex)

                    # if the modified timestamp of progress file hasn't for a while
                    # throw an exception
                    progstat = os.stat(progress_tracking_file_path)

                    # if we don't see any update to file time for 5 minutes, something is wrong
                    # deal with it.
                    if progstat.st_mtime > basestat.st_mtime:
                        basestat = progstat
                        basetime = time.time()
                    elif time.time() - basetime > CONF.progress_tracking_update_interval:
                        raise Exception(
                            "No update to %s modified time for last %d minutes. "
                            "Contego may have errored. Aborting Operation" %
                            (progress_tracking_file_path, CONF.progress_tracking_update_interval / 60))
                else:
                    # For swift based backup media
                    params = {'metadata': progress_tracker_metadata,
                              'server_obj:': server_obj.to_dict() }
                    async_task_status = contego_service.vast_async_task_status(
                        cntx, migrated_instance_id, params)
                data_transfer_completed = False
                percentage = "0.0"
                if async_task_status and 'status' in async_task_status and \
                        len(async_task_status['status']):
                    # this need to be changed to reflect virt-v2v output
                    for line in async_task_status['status']:
                        if 'percentage complete' in line and re.search(r'\d+\.\d+', line):
                            percentage = re.search(r'\d+\.\d+', line).group(0)
                        if 'Error' in line:
                            raise Exception(
                                "Data transfer failed - Contego Exception:" + line)
                        if 'Completed' in line:
                            data_transfer_completed = True
                            percentage = "100.0"
                            break

                if data_transfer_completed:
                    break
            except nova_unauthorized as ex:
                LOG.exception(ex)
                # recreate the token here
                cntx = nova._get_tenant_context(cntx)
            except Exception as ex:
                LOG.exception(ex)
                raise ex
        if convert_only and prev_snapshot_name:
            # If the conversion is done, old vcenter VM snapshots
            # are not required anymore
            # Remove the snapshot if already exists with same name
            snap_obj = get_vcenter_snapshots_by_name_recursively(
                    vcenter_vm.snapshot.rootSnapshotList, prev_snapshot_name)
            rem_tasks = []
            for s_obj in snap_obj:
                rem_tasks.append(s_obj.snapshot.RemoveSnapshot_Task(True))
            if rem_tasks:
                wait_for_tasks(si, rem_tasks)

    @autolog.log_method(Logger, 'UploadSnapshotvCenterVM.revert')
    def revert_with_log(self, *args, **kwargs):
        pass


class PopulateInstanceWithVirtV2V(MigrateTask):
    def execute(self, context, migration_id, vmid, vmname,
            migrated_instance_id, progress_tracking_file_path, cancel_file_path=None):
        self.check_migrate_cancellation(cancel_file_path)
        return self.execute_with_log(context, migration_id,
                                     vmid, vmname, migrated_instance_id,
                                     progress_tracking_file_path, cancel_file_path)

    def revert(self, *args, **kwargs):
        return self.revert_with_log(*args, **kwargs)

    @autolog.log_method(Logger, 'PopulateInstanceWithVirtV2V.execute')
    def execute_with_log(self, context, migration_id, vmid, vmname,
            migrated_instance_id, progress_tracking_file_path, cancel_file_path):

        def _get_vCenter_thumbprint(vcenter_host):
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(1)
            wrappedSocket = ssl.wrap_socket(sock)

            try:
                wrappedSocket.connect((vcenter_host, 443))
            except Exception as ex:
                LOG.exception(ex)
                raise Exception("Cannot retrieve vCenter thumbprint")
            else:
                der_cert = wrappedSocket.getpeercert(True)

                # Print SHA1 Thumbprint
                thumb_sha1 = hashlib.sha1(der_cert).hexdigest()
                s = ""
                for i in range(0, 40, 2):
                    s += thumb_sha1[i:i+2] + ":"
                return s.strip(":")

        # Call into contego to copy the data from backend to volume
        compute_service = nova.API(production=True)
        db = WorkloadMgrDB().db

        # Get a new token, just to be safe
        cntx = amqp.RpcContext.from_dict(context)
        cntx = nova._get_tenant_context(cntx)
        migration_obj = db.migration_get(cntx, migration_id)
        migration_plan_obj = db.migration_plan_get(
            cntx, migration_obj.migration_plan_id)

        server_obj = compute_service.get_server(cntx, migrated_instance_id)
        if server_obj == None:
            raise Exception("Migrated instance %s not found" % migrated_instance_id)

        si = get_vcenter_service_instance()
        search_index = si.content.searchIndex
        vcenter_vm = search_index.FindByUuid(None, vmid, True, True)
        if vcenter_vm == None:
            raise Exception("vCenter VM %s not found" % vmname)

        # Fetch the data center name 
        host_name = vcenter_vm.runtime.host.name
        parent = vcenter_vm.runtime.host.parent
        while parent and not isinstance(parent, vim.Datacenter):
            parent = parent.parent

        if not parent:
            raise Exception("Cannot fetch the datacenter name for VM %s" % vmname)
  
        datacenter_name = parent.name
        o = urlparse(CONF.vcenter_migration.vcenter_url)

        thumbprint = _get_vCenter_thumbprint(o.hostname)

        # We need to send vcenter credentials here
        vast_params = {
            'vcenter_url': CONF.vcenter_migration.vcenter_url,
            'user': CONF.vcenter_migration.vcenter_username,
            'password': CONF.vcenter_migration.vcenter_password,
            'disable_ssl_verification': CONF.vcenter_migration.vcenter_nossl,
            'thumbprint': thumbprint,
            'progress_tracking_file_path': progress_tracking_file_path,
            'vcenter_vm_name': vcenter_vm.summary.config.name,
            'vcenter_datacenter': datacenter_name,
            'vcenter_hostname': host_name,
            'server_obj': server_obj.to_dict(),
        }

        contego_service = contego.API(production=True)
        contego_service.populate_instance_with_virt_v2v(
            cntx, migrated_instance_id, vast_params)

        basestat = os.stat(progress_tracking_file_path)
        basetime = time.time()
        backup_endpoint = db.get_metadata_value(migration_plan_obj.metadata,
                                                'backup_media_target')
        backup_target = vault.get_backup_target(backup_endpoint)
        while True:
            try:
                self.check_migrate_cancellation(cancel_file_path)
                time.sleep(10)
                async_task_status = {}
                if progress_tracking_file_path:
                    try:
                        async_task_status['status'] = backup_target.read_progress_tracking_file(
                            progress_tracking_file_path
                            )
                    except Exception as ex:
                        LOG.exception(ex)

                    # if the modified timestamp of progress file hasn't for a while
                    # throw an exception
                    progstat = os.stat(progress_tracking_file_path)

                    # if we don't see any update to file time for 5 minutes, something is wrong
                    # deal with it.
                    if progstat.st_mtime > basestat.st_mtime:
                        basestat = progstat
                        basetime = time.time()
                    elif time.time() - basetime > CONF.progress_tracking_update_interval:
                        raise Exception(
                            "No update to %s modified time for last %d minutes. "
                            "Contego may have errored. Aborting Operation" %
                            (progress_tracking_file_path, CONF.progress_tracking_update_interval / 60))
                else:
                    # For swift based backup media
                    params = {'metadata': progress_tracker_metadata,
                              'server_obj:': server_obj.to_dict() }
                    async_task_status = contego_service.vast_async_task_status(
                        cntx, migrated_instance_id, params)
                data_transfer_completed = False
                percentage = "0.0"
                if async_task_status and 'status' in async_task_status and \
                        len(async_task_status['status']):
                    # this need to be changed to reflect virt-v2v output
                    for line in async_task_status['status']:
                        if 'percentage complete' in line and re.search(r'\d+\.\d+', line):
                            percentage = re.search(r'\d+\.\d+', line).group(0)
                        if 'Error' in line:
                            raise Exception(
                                "Data transfer failed - Contego Exception:" + line)
                        if 'Completed' in line:
                            data_transfer_completed = True
                            percentage = "100.0"
                            break

                if data_transfer_completed:
                    break
            except nova_unauthorized as ex:
                LOG.exception(ex)
                # recreate the token here
                cntx = nova._get_tenant_context(cntx)
            except Exception as ex:
                LOG.exception(ex)
                raise ex

        # TODO: Fill other metadata information including time taken
        #migration_obj = db.migration_update(
            #cntx, migration_id, {
                #'time_taken': image_info.virtual_size})

    @autolog.log_method(Logger, 'PopulateInstanceWithVirtV2V.revert')
    def revert_with_log(self, *args, **kwargs):
        pass


def PopulateInstancesWithVirtV2VFlow(context, instance, migration_plan_obj, migration_id):
    flow = lf.Flow("populateinstancewithvirtv2vlf")

    flow.add(PopulateInstanceWithVirtV2V(
              "PopulateInstanceWithVirtV2V" + instance['vm_id'],
              rebind=dict(progress_tracking_file_path=\
                         'progress_tracking_file_path_' + str(instance['vm_id']),
                          cancel_file_path='cancel_file_path_' + migration_id)))
    return flow


def migrate_vm(cntx, db, instance, migration, migrated_net_resources,
               migrated_security_groups, migrated_compute_flavor,
               migrated_nics, instance_options):

    migration_obj = db.migration_get(cntx, migration['id'])
    migration_plan_obj = db.migration_plan_get(
        cntx, migration_obj.migration_plan_id)
    vm_obj = db.migration_plan_vm_get(cntx, instance['vm_id'],
                                      migration_obj.migration_plan_id)

    vm_metadata = {}
    for meta in vm_obj.metadata:
        vm_metadata[meta.key] = meta.value

    power_off_vm_post_migrate = db.get_metadata_value(migration_obj.metadata,
                                          'power_off_vm_post_migrate') == '1'

    backup_endpoint = db.get_metadata_value(migration_plan_obj.metadata,
                                            'backup_media_target')

    backup_target = vault.get_backup_target(backup_endpoint)

    tracking_dir = backup_target.get_progress_tracker_directory({'migration_id': migration_obj.id})
    cancel_file = os.path.join(tracking_dir, "cancelled")

    msg = 'Creating VM ' + instance['vm_id'] + \
          ' from migration plan %s ' + migration_plan_obj.id
    db.migration_update(cntx, migration_obj.id, {'progress_msg': msg})

    cntx = nova._get_tenant_context(cntx)

    context_dict = dict([('%s' % key, value)
                         for (key, value) in cntx.to_dict().items()])
    context_dict['conf'] = None  # RpcContext object looks for this during init

    migration_plan_vm_resources = db.migration_plan_vm_resources_get(
        cntx, instance['vm_id'], migration_plan_obj.id)

    migrated_security_group_ids = {}
    vm_id = instance['vm_id']
    if migrated_security_groups and vm_id in migrated_security_groups:
        for pit_id, migrated_security_group_id in migrated_security_groups[vm_id].items(
        ):
            migrated_security_group_ids[pit_id] = migrated_security_group_id

    progress_tracker_metadata = {
        'migration_id': migration_obj.id,
        'resource_id': instance['vm_id']}
    progress_tracking_file_path = backup_target.get_progress_tracker_path(
        progress_tracker_metadata)

    # remove items that cannot be jsoned
    migration_dict = dict(iter(migration.items()))
    image_id = instance_options.get("image_source") or None
    store = {
        'connection': 'dir',
        'path': CONF.taskflow_path,  # save data to this directory
        'max_cache_size': CONF.taskflow_max_cache_size,  # keep up-to this much entries in memory
        'context': context_dict,
        'migration': migration_dict,
        'migration_id': migration['id'],
        'vmid': instance['vm_id'],
        'vmname': instance['vm_name'],
        'keyname': 'keyname' in instance and instance['keyname'] or None,
        'migration_plan_id': migration_plan_obj.id,
        'migration_type': migration['migration_type'],
        'migrated_net_resources': migrated_net_resources,
        'migrated_security_groups': migrated_security_group_ids,
        'migrated_compute_flavor_id': migrated_compute_flavor.id,
        'migrated_nics': migrated_nics,
        'image_id': image_id,
        'config_drive': vm_metadata.get('config_drive', False),
        'instance_options': instance_options,
        'ordered_interfaces': vm_metadata.get('trilio_ordered_interfaces', None),
        'progress_tracking_file_path_' + instance['vm_id']: progress_tracking_file_path,
        'cancel_file_path_' + migration['id']: cancel_file
    }
    for migration_plan_vm_resource in migration_plan_vm_resources:
        store[migration_plan_vm_resource.id] = migration_plan_vm_resource.id
        store['devname_' + migration_plan_vm_resource.id] = migration_plan_vm_resource.resource_name

        if migration_plan_vm_resource.resource_type == 'disk':

            volume_id = db.get_metadata_value(
                migration_plan_vm_resource.metadata, 'volume_id')
            if volume_id:
                volume_type = db.get_metadata_value(
                    migration_plan_vm_resource.metadata, 'volume_type') or "None"
                new_volume_type = get_new_volume_type(instance_options,
                                                      volume_id.lower(),
                                                      volume_type)
                store['volume_type_' +
                      migration_plan_vm_resource.id] = new_volume_type
            else:
                store['volume_type_' + migration_plan_vm_resource.id] = None
            user_data = db.get_metadata_value(
                migration_plan_vm_resource.metadata, 'user_data')
            store['userdata'] = None
            if user_data:
                store['userdata'] = user_data

        if migration_plan_vm_resource.resource_type == 'nic':
            vm_nic_migration_plan = db.migration_plan_vm_network_resource_get(
                cntx, migration_plan_vm_resource.id)
            nic_data = pickle.loads(bytes(vm_nic_migration_plan.pickle, 'utf-8'))
            mac_address = nic_data['mac_address']

    LOG.info(_('Processing disks'))
    _migrationvmflow = lf.Flow(instance['vm_id'] + "MigrationInstance")

    childflow = MigrateVolumes(cntx, instance, migration_plan_obj, migration['id'])
    if childflow:
        _migrationvmflow.add(childflow)

    # create nova from image id
    childflow = MigrateInstance(cntx, instance, migration_plan_obj, migration['id'])
    if childflow:
        _migrationvmflow.add(childflow)

    childflow = AdjustInstanceSecurityGroups(cntx, migration['id'])
    if childflow:
        _migrationvmflow.add(childflow)

    # power off the migrated instance until all volumes are attached
    childflow = PowerOffInstanceFlow(cntx, migration['id'])
    if childflow:
        _migrationvmflow.add(childflow)

    # attach migrated volumes to migrated instances
    childflow = AttachVolumes(cntx, instance, migration_plan_obj, migration['id'])
    if childflow:
        _migrationvmflow.add(childflow)

    if migration['migration_type'] == "warm":
        childflow = InitalSnapshotvCenterVMFlow(cntx, instance, migration['id'])
        if childflow:
            _migrationvmflow.add(childflow)

        childflow = UploadInitialSnapshotvCenterVMFlow(cntx, instance, migration_plan_obj, migration['id'])
        if childflow:
            _migrationvmflow.add(childflow)

        childflow = SecondSnapshotvCenterVMFlow(cntx, instance, migration['id'])
        if childflow:
            _migrationvmflow.add(childflow)

        childflow = UploadSecondSnapshotvCenterVMFlow(cntx, instance, migration_plan_obj, migration['id'])
        if childflow:
            _migrationvmflow.add(childflow)

        childflow = PowerOffvCenterVMFlow(cntx, instance, migration['id'])
        if childflow:
            _migrationvmflow.add(childflow)

        childflow = FinalSnapshotvCenterVMFlow(cntx, instance, migration['id'])
        if childflow:
            _migrationvmflow.add(childflow)

        childflow = UploadFinalSnapshotvCenterVMFlow(cntx, instance, migration_plan_obj, migration['id'])
        if childflow:
            _migrationvmflow.add(childflow)

        if not power_off_vm_post_migrate:
            childflow = PowerOnvCenterVMFlow(cntx, instance, migration['id'])
            if childflow:
                _migrationvmflow.add(childflow)

        childflow = ConvertSnapshotvCenterVMFlow( cntx, instance, migration_plan_obj, migration['id'])
        if childflow:
            _migrationvmflow.add(childflow)

    if migration['migration_type'] == "cold":
        childflow = PowerOffvCenterVMFlow(cntx, instance, migration['id'])
        if childflow:
            _migrationvmflow.add(childflow)

        childflow = PopulateInstancesWithVirtV2VFlow(cntx, instance, migration_plan_obj, migration['id'])
        if childflow:
            _migrationvmflow.add(childflow)

        if not power_off_vm_post_migrate:
            childflow = PowerOnvCenterVMFlow(cntx, instance, migration['id'])
            if childflow:
                _migrationvmflow.add(childflow)

    # power on the migrated instance until all volumes are attached
    childflow = PowerOnInstanceFlow(cntx, migration['id'])
    if childflow:
        _migrationvmflow.add(childflow)

    # Assign floating IP address
    childflow = AssignFloatingIPFlow(cntx, migration['id'])
    if childflow:
        _migrationvmflow.add(childflow)

    try:
        store["path"] = os.path.join(store["path"], 'migration_' + str(store['migration_id']), 'vm_' + instance['vm_id'])
        fileutils.ensure_tree(store["path"])
        result = engines.run(_migrationvmflow, engine='serial', engine_conf='serial',
                             backend={
                                 'connection': store['connection'],
                                 'path': store['path'],
                                 'max_cache_size': store['max_cache_size']
                             }, store=store)
    finally:
        fileutils.remove_tree(store["path"])

    if result and 'migrated_instance_id' in result:
        migrated_instance_id = result['migrated_instance_id']
        compute_service = nova.API(production=True)
        migrated_instance = compute_service.get_server_by_id(
            cntx, migrated_instance_id)

        migrated_vm_values = {
            'vm_id': migrated_instance_id,
            'vm_name': migrated_instance.name,
            'migration_id': migration['id'],
            'metadata': {
                'production': migrated_net_resources[mac_address]['production'],
                'instance_id': instance['vm_id'],
                'availability_zone': migrated_instance.__dict__.get('OS-EXT-AZ:availability_zone'),
            },
            'status': 'available'}
        migrated_vm = db.migration_vm_create(cntx, migrated_vm_values)

        LOG.debug(_("Migrated VM is created successfully"))

        db.migration_update(
            cntx, migration_obj.id, {
                'progress_msg': 'Created VM:' + migrated_vm['vm_id'], 'status': 'executing'})
        return migrated_vm
    else:
        raise Exception("Migrating VM instance failed")