Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
workloadmgr / workloadmgr / workflows / migration_plan_workflow.py
Size: Mime:
import abc
import six
import contextlib
import os
import pickle as pickle
import random
import sys
import time

import datetime
import paramiko
import uuid

from pyVmomi import vim

from taskflow import engines
from taskflow.utils import misc
from taskflow.listeners import printing
from taskflow.patterns import unordered_flow as uf
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import graph_flow as gf
from taskflow import task
from taskflow import flow
from taskflow.listeners import timing

from oslo_messaging._drivers import amqp

from workloadmgr import autolog
from workloadmgr.openstack.common.gettextutils import _
from workloadmgr.openstack.common import log as logging
from workloadmgr import exception as wlm_exceptions
from workloadmgr.openstack.common import fileutils
from workloadmgr.utils import get_vcenter_service_instance

from . import vmtasks
from . import workflow

LOG = logging.getLogger(__name__)
Logger = autolog.Logger(LOG)


@autolog.log_method(Logger, 'migration_plan_workflow.vm_flavors')
def vms_flavor(context, db, vms, migration_plan):
    si = get_vcenter_service_instance()
    search_index = si.content.searchIndex
    def get_folder_path(vcenter_vm):
        parent = vcenter_vm.parent
        pstack = [vcenter_vm.name]
        while parent:
            pstack.append(parent.name)
            parent = parent.parent
        pstack.reverse()
        if pstack:
            folder_path = os.path.join(*pstack)
        else:
            folder_path = "NoPath"
        return folder_path
    for vm in vms:
        vm_inst = search_index.FindByUuid(None, vm['vm_id'], True, True)
        if vm_inst is None:
            raise wlm_exceptions("VM %s is not found on vCenter" % vm['vm_id'])

        assert vm_inst.config.instanceUuid == vm['vm_id']
        boot_disk_size = 10 # Default

        db_vm = db.migration_plan_vm_get_by_id(context, vm['vm_id'])[0]
        vc_vm = {}
        vc_vm['vm-name'] = vm_inst.summary.config.name
        vc_vm['metadata'] = {'tools_running': vm_inst.summary.guest.toolsStatus,
                             'esx_host': vm_inst.runtime.host.name,
                             'power_state': vm_inst.runtime.powerState,
                             'guest_family': vm_inst.guest.guestFamily,
                             'guest_fullname': vm_inst.guest.guestFullName,
                             'boot_options': "UEFI" if vm_inst.config.firmware == "efi" else "BIOS",
                             'vcenter_vm_path': get_folder_path(vm_inst),
                            }

        values = {'migration_plan_id': migration_plan['id'],
                  'vm_id': vm['vm_id'],
                  'vm_name': vc_vm['vm-name'],
                  'status': 'available',
                  'metadata': vc_vm.get('metadata', {})}
        db.migration_plan_vms_update(
            context, db_vm.id, values)
        for dev in vm_inst.config.hardware.device:
            if not isinstance(dev, vim.vm.device.VirtualDisk):
                continue
            if dev.deviceInfo.label == "Hard disk 1":
                boot_disk_size = dev.capacityInKB / 1024/1024
                break
        metadata = {'name': "%s-flavor" % vm_inst.config.name,
                    'vcpus': vm_inst.summary.config.numCpu,
                    'ram': vm_inst.summary.config.memorySizeMB,
                    'disk': boot_disk_size,
                    'ephemeral': 0,
                    'swap': 0,
                    'id': "%s-flavor" % vm_inst.config.name,
                    }
        migration_plan_vm_resource_values = {
            'id': str(uuid.uuid4()),
            'vm_id': vm['vm_id'],
            'migration_plan_id': migration_plan['id'],
            'resource_type': 'flavor',
            'resource_name': '%s-flavor' % vm_inst.config.name,
            'metadata': metadata,
            'status': 'available'
        }

        db.migration_plan_vm_resource_create(
            context, migration_plan_vm_resource_values)


def _get_vm_nics(vm):
    nics = []
    supported_nic_types = [
        vim.vm.device.VirtualEthernetCard.NetworkBackingInfo,
        vim.vm.device.VirtualEthernetCard.DistributedVirtualPortBackingInfo,
        ]
    si = get_vcenter_service_instance()

    for dev in vm.config.hardware.device:
        dev_backing = dev.backing
        if type(dev_backing) not in supported_nic_types:
            continue
        port_group = None
        vlan_id = None
        v_switch = None
        ippool_name = None

        vswitch_uuid = None
        ippool_uuid = None

        if hasattr(dev_backing, 'port'):
            port_group_key = dev.backing.port.portgroupKey
            dvs_uuid = dev.backing.port.switchUuid
            try:
                dvs = si.content.dvSwitchManager.QueryDvsByUuid(dvs_uuid)
            except Exception:
                port_group = "** Error: DVS not found **"
                vlan_id = "NA"
                v_switch = "NA"
            else:
                pg_obj = dvs.LookupDvPortGroup(port_group_key)
                port_group = pg_obj.config.name
                vlan_id = str(pg_obj.config.defaultPortConfig.vlan.vlanId)
                v_switch = str(dvs.name)
        else:
            port_group = dev.backing.network.name
            ippool_name = dev.backing.network.summary.ipPoolName
            vm_host = vm.runtime.host

            # global variable host_pg_dict stores portgroups per host
            pgs = vm.runtime.host.config.network.portgroup
            for p in pgs:
                if port_group in p.key:
                    vlan_id = str(p.spec.vlanId)
                    v_switch = str(p.spec.vswitchName)
                    break
        if port_group is None:
            port_group = 'NA'
        if vlan_id is None:
            vlan_id = 'NA'
        if v_switch is None:
            v_switch = 'NA'
        if ippool_name in (None, ""):
            ippool_name = 'NA'
        nics.append({'label': dev.deviceInfo.label,
                     'macaddress': dev.macAddress,
                     'vswitch': v_switch,
                     'vswitch_uuid': vswitch_uuid,
                     'portgroup': port_group,
                     'vlan_id': vlan_id,
                     'ippool_name': ippool_name,
                     'ippool_uuid': ippool_uuid,
                    })

    return nics


@autolog.log_method(Logger, 'migration_plan_workflow.vms_networks')
def vms_networks(context, db, vms, migration_plan):
    def get_nic_ip(vm, mac):
        for net in vm.guest.net:
            if mac != getattr(net,'macAddress',None):
                continue
            ip_addresses  = getattr(net, 'ipAddress',[]).copy()
            return ip_addresses
        return []

    subnets = []
    networks = []

    si = get_vcenter_service_instance()
    search_index = si.content.searchIndex
    unique_networks = set()
    unique_subnets = set()
    for vm in vms:
        vm_inst = search_index.FindByUuid(None, vm['vm_id'], True, True)
        if vm_inst is None:
            raise wlm_exceptions("VM %s is not found on vCenter" % vm['vm_id'])
        nics = _get_vm_nics(vm_inst)
        for nic in nics:
            unique_networks.add(nic['portgroup'])
            unique_networks.add(nic['vswitch'])
            metadata = {
                'network_type': 'port_group'  if nic['portgroup']  else nic['vswitch'],
            }
            migration_plan_vm_resource_values = {
                'id': str(uuid.uuid4()),
                'vm_id': vm['vm_id'],
                'migration_plan_id': migration_plan['id'],
                'resource_type': 'nic',
                'resource_name': nic['macaddress'],
                'metadata': metadata,
                'status': 'available'}
            vm_resource = db.migration_plan_vm_resource_create(
                context, migration_plan_vm_resource_values)

            # ip_address, mac_address, subnet_id, network_id
            # 
            vm_network_resource_metadata = nic
            ip_addresses = get_nic_ip(vm_inst, nic['macaddress'])
            nic_data = { 'ip_address': ip_addresses,
                         'mac_address': nic['macaddress'],
                         'network_id': str(nic['vswitch_uuid']),
                         'network_name': nic['portgroup'] if nic['portgroup'] else nic['vswitch'],
                         'network_type': 'vmware',
                         'port_data': '{"port": {"id": '
                                      '"7816813e-ff36-4256-9406-d6a0a17726fa", '
                                      '"name": "", "network_id": '
                                      '"17390bff-a6d9-46c8-8726-ecd146d37906", '
                                      '"tenant_id": '
                                      '"8820f5632c9f4876822623f1cba2919f", '
                                      '"mac_address": "fa:16:3e:c9:40:75", '
                                      '"admin_state_up": true, "status": '
                                      '"ACTIVE", "device_id": '
                                      '"b2bc4699-3a53-4436-8f41-aaa570efabd1", '
                                      '"device_owner": "compute:nova", '
                                      '"fixed_ips": [{"subnet_id": '
                                      '"c2137387-7c9f-468b-8e71-f870385e95ab", '
                                      '"ip_address": "10.10.1.24"}], '
                                      '"allowed_address_pairs": [], '
                                      '"extra_dhcp_opts": [], "security_groups": '
                                      '["8f41bce4-edb0-41f3-bc2c-963e928efeda"], '
                                      '"description": "", "binding:vnic_type": '
                                      '"normal", "port_security_enabled": true, '
                                      '"qos_policy_id": null, '
                                      '"qos_network_policy_id": null, "dns_name": '
                                      '"test", "dns_assignment": [{"ip_address": '
                                      '"10.10.1.24", "hostname": "test", "fqdn": '
                                      '"test.openstack.internal."}], '
                                      '"dns_domain": "", "tags": [], '
                                      '"created_at": "2023-04-28T10:28:14Z", '
                                      '"updated_at": "2023-04-28T10:28:19Z", '
                                      '"revision_number": 4, "project_id": '
                                      '"8820f5632c9f4876822623f1cba2919f"}}',
                      'subnet_id': nic['ippool_uuid'],
                      'subnet_name': nic['ippool_name']}
            vm_network_resource_values = {
                    'migration_plan_vm_network_resource_id': vm_resource.id,
                    'pickle': str(pickle.dumps(nic_data, 0), encoding='utf-8'),
                    'metadata': vm_network_resource_metadata,
                    'status': 'available'}

            db.migration_plan_vm_network_resource_create(
                    context, vm_network_resource_values)

    for subnet in list(unique_subnets):
        vm_resource_values = {'id': str(uuid.uuid4()),
                              'vm_id': vm['vm_id'],
                              'migration_plan_id': migration_plan['id'],
                              'resource_type': 'subnet',
                              'resource_name': subnet['name'],
                              'metadata': {},
                              'status': 'available'}
        vm_resource = db.migration_plan_vm_resource_create(
                context, vm_resource_values)

        # create an entry in the vm_network_resource_snaps table
        vm_network_resource_metadata = {}
        vm_network_resource_values = {
                'migration_plan_vm_network_resource_id': vm_resource.id,
                'pickle': str(pickle.dumps(subnet, 0), encoding='utf-8'),
                'metadata': vm_network_resource_metadata,
                'status': 'available'}

        db.migration_plan_vm_network_resource_create(
                context, vm_network_resource_values)

    for network in list(unique_networks):
        vm_resource_values = {'id': str(uuid.uuid4()),
                              'vm_id': migration_plan['id'],
                              'migration_plan_id': migration_plan['id'],
                              'resource_type': 'network',
                              'resource_name': network,
                              'metadata': {},
                              'status': 'available'}
        vm_resource = db.migration_plan_vm_resource_create(
                context, vm_resource_values)

        # create an entry in the vm_network_resource_snaps table
        vm_network_resource_metadata = {}
        vm_network_resource_values = {
                'migration_plan_vm_network_resource_id': vm_resource.id,
                'pickle': str(pickle.dumps(network, 0), encoding='utf-8'),
                'metadata': vm_network_resource_metadata,
                'status': 'available'}

        db.migration_plan_vm_network_resource_create(
                context, vm_network_resource_values)


@autolog.log_method(Logger, 'migration_plan_workflow.vms_security_groups')
def vms_security_groups(cntx, db, vms, migration_plan):

    for vm in vms:
        vm_security_group_values = {
            'id': str( uuid.uuid4()),
            'vm_id': vm['vm_id'],
            'migration_plan_id': migration_plan['id'],
            'resource_type': 'security_group',
            'status': 'available',
            'resource_name': '8f41bce4-edb0-41f3-bc2c-963e928efeda',
            'metadata': {
                'security_group_type': 'neutron',
                'name': 'default',
                'description': 'Default security group',
                'vm_id': vm['vm_id'],
                'vm_attached': '1',
            },
        }

        vm_security_group = db.migration_plan_vm_resource_create(
                cntx, vm_security_group_values)
        rules = [
                    { 'created_at': '2022-07-06T07:02:56Z',
                      'description': None,
                      'direction': 'ingress',
                      'ethertype': 'IPv4',
                      'id': '23047d15-2e9c-441b-bd8b-98eb717bbb5a',
                      'port_range_max': None,
                      'port_range_min': None,
                      'project_id': '8820f5632c9f4876822623f1cba2919f',
                      'protocol': None,
                      'remote_group_id': '8f41bce4-edb0-41f3-bc2c-963e928efeda',
                      'remote_ip_prefix': None,
                      'revision_number': 0,
                      'security_group_id': '8f41bce4-edb0-41f3-bc2c-963e928efeda',
                      'tags': [],
                      'tenant_id': '8820f5632c9f4876822623f1cba2919f',
                      'updated_at': '2022-07-06T07:02:56Z'
                    },
                    { 'created_at': '2022-07-07T12:00:54Z',
                      'description': '',
                      'direction': 'egress',
                      'ethertype': 'IPv4',
                      'id': '991ef285-b999-453d-afa7-ebafa174d384',
                      'port_range_max': None,
                      'port_range_min': None,
                      'project_id': '8820f5632c9f4876822623f1cba2919f',
                      'protocol': 'icmp',
                      'remote_group_id': None,
                      'remote_ip_prefix': '0.0.0.0/0',
                      'revision_number': 0,
                      'security_group_id': '8f41bce4-edb0-41f3-bc2c-963e928efeda',
                      'tags': [],
                      'tenant_id': '8820f5632c9f4876822623f1cba2919f',
                      'updated_at': '2022-07-07T12:00:54Z'
                    },
                    { 'created_at': '2022-07-06T07:02:56Z',
                      'description': None,
                      'direction': 'egress',
                      'ethertype': 'IPv6',
                      'id': 'a1dc0d9b-865c-4f14-8cea-5a58727b8c82',
                      'port_range_max': None,
                      'port_range_min': None,
                      'project_id': '8820f5632c9f4876822623f1cba2919f',
                      'protocol': None,
                      'remote_group_id': None,
                      'remote_ip_prefix': None,
                      'revision_number': 0,
                      'security_group_id': '8f41bce4-edb0-41f3-bc2c-963e928efeda',
                      'tags': [],
                      'tenant_id': '8820f5632c9f4876822623f1cba2919f',
                      'updated_at': '2022-07-06T07:02:56Z',
                    },
                    { 'created_at': '2022-07-07T12:00:53Z',
                      'description': '',
                      'direction': 'ingress',
                      'ethertype': 'IPv4',
                      'id': 'b2c82530-a5b5-49a3-bba9-8af91cd6840e',
                      'port_range_max': None,
                      'port_range_min': None,
                      'project_id': '8820f5632c9f4876822623f1cba2919f',
                      'protocol': 'icmp',
                      'remote_group_id': None,
                      'remote_ip_prefix': '0.0.0.0/0',
                      'revision_number': 0,
                      'security_group_id': '8f41bce4-edb0-41f3-bc2c-963e928efeda',
                      'tags': [],
                      'tenant_id': '8820f5632c9f4876822623f1cba2919f',
                      'updated_at': '2022-07-07T12:00:53Z'
                    },
                    { 'created_at': '2022-07-06T07:02:56Z',
                      'description': None,
                      'direction': 'egress',
                      'ethertype': 'IPv4',
                      'id': '19b7eb9f-6048-485b-8ef2-c72ecfc691d6',
                      'port_range_max': None,
                      'port_range_min': None,
                      'project_id': '8820f5632c9f4876822623f1cba2919f',
                      'protocol': None,
                      'remote_group_id': None,
                      'remote_ip_prefix': None,
                      'revision_number': 0,
                      'security_group_id': '8f41bce4-edb0-41f3-bc2c-963e928efeda',
                      'tags': [],
                      'tenant_id': '8820f5632c9f4876822623f1cba2919f',
                      'updated_at': '2022-07-06T07:02:56Z'
                    },
                    { 'created_at': '2022-07-06T07:02:56Z',
                      'description': None,
                      'direction': 'ingress',
                      'ethertype': 'IPv6',
                      'id': '14e27aa5-3d67-40ac-8989-1d7c95a9adf2',
                      'port_range_max': None,
                      'port_range_min': None,
                      'project_id': '8820f5632c9f4876822623f1cba2919f',
                      'protocol': None,
                      'remote_group_id': '8f41bce4-edb0-41f3-bc2c-963e928efeda',
                      'remote_ip_prefix': None,
                      'revision_number': 0,
                      'security_group_id': '8f41bce4-edb0-41f3-bc2c-963e928efeda',
                      'tags': [],
                      'tenant_id': '8820f5632c9f4876822623f1cba2919f',
                      'updated_at': '2022-07-06T07:02:56Z'
                    },
                    { 'created_at': '2022-07-07T12:00:50Z',
                      'description': '',
                      'direction': 'egress',
                      'ethertype': 'IPv4',
                      'id': '6d1741fa-416f-402a-b8de-f08352cb054f',
                      'port_range_max': None,
                      'port_range_min': None,
                      'project_id': '8820f5632c9f4876822623f1cba2919f',
                      'protocol': 'tcp',
                      'remote_group_id': None,
                      'remote_ip_prefix': '0.0.0.0/0',
                      'revision_number': 0,
                      'security_group_id': '8f41bce4-edb0-41f3-bc2c-963e928efeda',
                      'tags': [],
                      'tenant_id': '8820f5632c9f4876822623f1cba2919f',
                      'updated_at': '2022-07-07T12:00:50Z'
                    },
             ]
        vm_security_group_rule_metadata = {
                    'security_group_type': 'neutron', }
        for security_group_rule in rules:
            vm_security_group_rule_values = {
                'id': str(uuid.uuid4()),
                'migration_plan_vm_security_group_id': vm_security_group.id,
                'pickle': str(pickle.dumps(security_group_rule, 0), encoding='ISO-8859-1'),
                'metadata': vm_security_group_rule_metadata,
                'status': 'available',}

            db.migration_plan_vm_security_group_rule_create(
                    cntx, vm_security_group_rule_values)


@autolog.log_method(Logger, 'migration_plan_workflow.vms_virtual_disks')
def vms_virtual_disks(cntx, db, vms, migration_plan):
    si = get_vcenter_service_instance()
    search_index = si.content.searchIndex
    for vm in vms:
        vm_inst = search_index.FindByUuid(None, vm['vm_id'], True, True)
        if vm_inst is None:
            raise wlm_exceptions("VM %s is not found on vCenter" % vm['vm_id'])

        idx = 0
        for d in vm_inst.config.hardware.device:
            if not isinstance(d, vim.vm.device.VirtualDisk):
                continue

            parent = d.backing
            while parent.parent:
                parent = parent.parent
            vm_resource_metadata = {}
            vm_resource_metadata['volume_id'] = parent.uuid
            vm_resource_metadata['volume_name'] = d.deviceInfo.label

            vm_resource_metadata['volume_description'] = "VMware VMDK Disk"
            vm_resource_metadata['volume_size'] = d.capacityInKB/1024/1024
            vm_resource_metadata['volume_type'] = parent.datastore.name
            try:
                vm_resource_metadata['volume_mountpoint'] = vm_inst.guest.disk[idx].diskPath
            except:
                vm_resource_metadata['volume_mountpoint'] = "NA"
            vm_resource_metadata['availability_zone'] = ''
            vm_resource_metadata['disk_path'] = parent.fileName
            idx += 1
            vm_resource_values = {
                'id': str( uuid.uuid4()),
                'vm_id': vm['vm_id'],
                'migration_plan_id': migration_plan['id'],
                'resource_type': 'disk',
                'resource_name': d.deviceInfo.label,
                'metadata': vm_resource_metadata,
                'status': 'available'
            }
            vm_resource = db.migration_plan_vm_resource_create(
                cntx, vm_resource_values)

            vm_disk_resource_id = str(uuid.uuid4())
            vm_disk_resource_metadata = {}  # Dictionary to hold the metadata
            vm_disk_resource_metadata['disk_format'] = 'vmdk' 
            vm_disk_resource_values = {
                'id': vm_disk_resource_id,
                'migration_plan_vm_resource_id': vm_resource.id,
                'vm_disk_resource_backing_id': None,
                'metadata': vm_disk_resource_metadata,
                'top': True,
                'size': d.capacityInKB/1024/1024,
                'status': 'available'
            }

            vm_disk_resource = db.migration_plan_vm_disk_resource_create(
                    cntx, vm_disk_resource_values)


class VMsNetworks(task.Task):

    def execute(self, context, vms, migration_plan):
        return self.execute_with_log(
            context, vms, migration_plan)

    def revert(self, context, vms, migration_plan, result, flow_failures):
        return self.revert_with_log(context)

    @autolog.log_method(Logger, 'VMsNetworks.execute')
    def execute_with_log(self, context, vms, migration_plan):
        # Snapshot the networking configuration of VMs
        db = vmtasks.WorkloadMgrDB().db
        cntx = amqp.RpcContext.from_dict(context)

        db.migration_plan_get_metadata_cancel_flag(cntx, migration_plan['id'])

        return vms_networks(cntx, db, vms, migration_plan)

    @autolog.log_method(Logger, 'VMsNetworks.revert')
    def revert_with_log(self, *args, **kwargs):
        pass


class VMsFlavors(task.Task):

    def execute(self, context, vms, migration_plan, source_platform="vmware"):
        return self.execute_with_log(
            context, vms, migration_plan,
            source_platform=source_platform)

    def revert(self, context, vms, migration_plan,
               result, flow_failures, source_platform="vmware"):
        return self.revert_with_log(context)

    @autolog.log_method(Logger, 'VMsFlavors.execute')
    def execute_with_log(self, context, vms, migration_plan, source_platform="vmware"):
        db = vmtasks.WorkloadMgrDB().db
        cntx = amqp.RpcContext.from_dict(context)

        db.migration_plan_get_metadata_cancel_flag(cntx, migration_plan['id'])

        return vms_flavor(
            cntx, db, vms, migration_plan)

    @autolog.log_method(Logger, 'VMsFlavors.revert')
    def revert_with_log(self, *args, **kwargs):
        pass


class VMsSecurityGroups(task.Task):

    def execute(self, context, vms, migration_plan, source_platform="vmware"):
        return self.execute_with_log(
            context, vms, migration_plan, source_platform=source_platform)

    def revert(self, context, vms, migration_plan, result, flow_failures):
        return self.revert_with_log(context)

    @autolog.log_method(Logger, 'VMsSecurityGroups.execute')
    def execute_with_log(self, context, vms, migration_plan, source_platform="vmware"):
        db = vmtasks.WorkloadMgrDB().db
        cntx = amqp.RpcContext.from_dict(context)

        db.migration_plan_get_metadata_cancel_flag(cntx, migration_plan['id'])

        return vms_security_groups(cntx, db, vms, migration_plan)

    @autolog.log_method(Logger, 'VMsSecurityGroups.revert')
    def revert_with_log(self, *args, **kwargs):
        pass


class VMsVirtualDisks(task.Task):

    def execute(self, context, vms, migration_plan, source_platform="vmware"):
        return self.execute_with_log(
            context, vms, migration_plan, source_platform=source_platform)

    def revert(self, context, vms, migration_plan, result, flow_failures):
        return self.revert_with_log(context)

    @autolog.log_method(Logger, 'VMsSecurityGroups.execute')
    def execute_with_log(self, context, vms, migration_plan, source_platform="vmware"):
        db = vmtasks.WorkloadMgrDB().db
        cntx = amqp.RpcContext.from_dict(context)

        db.migration_plan_get_metadata_cancel_flag(cntx, migration_plan['id'])

        return vms_virtual_disks(cntx, db, vms, migration_plan)

    @autolog.log_method(Logger, 'VMsSecurityGroups.revert')
    def revert_with_log(self, *args, **kwargs):
        pass


@six.add_metaclass(abc.ABCMeta)
class MigrationPlanWorkFlow(workflow.Workflow):

    """
    This is the migration plan workflow that discovers all resources of the
    VMs in vCenter that need to be migrated to OpenStack
    """
    def __init__(self, name, store):
        super(MigrationPlanWorkFlow, self).__init__(name)
        self._name = name
        self._store = store

    def initflow(self):
        cntx = amqp.RpcContext.from_dict(self._store['context'])
        db = vmtasks.WorkloadMgrDB().db
        vms = db.migration_plan_vms_get(cntx, self._store['migration_plan']['id'])
        if not vms:
            raise wlm_exceptions.ErrorOccurred(
                "Failed to initialize the migration plan workflow: "
                "vmware vms cannot be None")

        # make sure all vms are valid
        self._store['vms'] = []
        si = get_vcenter_service_instance()
        search_index = si.content.searchIndex
        for vm in vms:
            vm_inst = search_index.FindByUuid(None, vm['vm_id'], True, True)
            if vm_inst is None:
                raise wlm_exceptions.ErrorOccurred("VM %s is not found on vCenter" % vm['vm_id'])

            self._store['vms'].append({
                'vm_id': vm.vm_id,
                'vm_name': vm.name,
                'vm_metadata': vm.metadata,
                'vm_flavor_id': "flavor1",
                'hostname': vm.name,
                'vm_power_state': vm_inst.runtime.powerState,
                'hypervisor_hostname': None,
                'hypervisor_type': "ESXi",
                'availability_zone': None,
            })
        # Discover VM networks
        _vmsmetadata = lf.Flow(self.name + "#VMsMetadata")
        _vmsmetadata.add(VMsNetworks(self._name + "#VMsNetwork"))

        # flavors of VMs
        _vmsmetadata.add(VMsFlavors(self._name + "#VMsFlavors"))

        # security groups of VMs
        _vmsmetadata.add(
            VMsSecurityGroups(self._name + "#VMsSecurityGroups"))

        # VMs virtual disks
        _vmsmetadata.add(
            VMsVirtualDisks(self._name + "#VMsVirtualDisks"))

        self._flow = lf.Flow(self.name)

        self._flow.add(_vmsmetadata)

    @property
    def name(self):
        """A non-unique name for this workflow (human readable)."""
        return self._name

    @property
    def vmwarevms(self):
        """Returns references to vmwarevms workflow."""
        return self._store['vms']

    @property
    def migration_plan_metadata(self):
        """Returns references to vmwarevms workflow."""
        return self._vmsmetadata

    def __str__(self):
        lines = ["%s: %s" % (reflection.get_class_name(self), self.name)]
        if hasattr(self, 'len'):
            lines.append("%s" % (len(self)))
        return "; ".join(lines)

    def details(self):
        """Provides the workflow details in json format."""
        # workflow details based on the
        # current topology, number of VMs etc
        def recurseflow(item):
            if isinstance(item, task.Task):
                taskdetails = {
                    'name': item._name.split("_")[0],
                    'type': 'Task'}
                taskdetails['input'] = []
                if len(item._name.split('_')) == 2:
                    nodename = item._name.split("_")[1]
                    for n in nodes['instances']:
                        if n['vm_id'] == nodename:
                            nodename = n['vm_name']
                    taskdetails['input'] = [['vm', nodename]]
                return taskdetails

            flowdetails = {}
            flowdetails['name'] = str(item).split("==")[0]
            flowdetails['type'] = str(item).split('.')[2]
            flowdetails['children'] = []
            for it in item:
                flowdetails['children'].append(recurseflow(it))

            return flowdetails

        nodes = self.discover()
        workflow = recurseflow(self._flow)
        return dict(workflow=workflow)

    def execute(self):
        try:
            self._store["path"] = os.path.join(self._store["path"], 'migration_plan_' + str(self._store['migration_plan_id']))
            fileutils.ensure_tree(self._store["path"])
            exec_engine = engines.load(
                self._flow,
                engine='parallel',
                engine_conf='parallel',
                backend={
                    'connection': self._store['connection'],
                    'path': self._store['path'],  # save data to this directory
                    'max_cache_size': self._store['max_cache_size'],  # keep up-to this much entries in memory
                },
                store=self._store)
            with timing.PrintingDurationListener(exec_engine, LOG.info):
                result = exec_engine.run()
        finally:
            fileutils.remove_tree(self._store["path"])