Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
workloadmgr / workloadmgr / db / imports / import_workloads.py
Size: Mime:
# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright (c) 2015 TrilioData, Inc.
# All Rights Reserved.

import socket
import json
import os
import uuid
import ast
import importlib
from operator import itemgetter
import pickle as pickle
import shutil
import tempfile
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Pool as ProcessPool

from workloadmgr.db.workloadmgrdb import WorkloadMgrDB
from workloadmgr import workloads as workloadAPI
from workloadmgr.vault import vault
from workloadmgr import exception
from workloadmgr.openstack.common import log as logging
from workloadmgr.workloads import workload_utils
from workloadmgr.common import context as wlm_context
from workloadmgr.common import clients
from workloadmgr.db.sqlalchemy import models
from workloadmgr.db.sqlalchemy.session import get_session
from workloadmgr.common.workloadmgr_keystoneclient import KeystoneClient
from workloadmgr.compute import nova
from workloadmgr import utils

LOG = logging.getLogger(__name__)
DBSession = get_session()

# Directory to store database files for all json files.
workloads = []
migration_plans = []
workload_backup_endpoint = {}
migration_plan_backup_endpoint = {}
workload_backup_media_size = {}
vault_backend = None
all_cloud_projects = []
wl_import_count = 0
count = 0
percent_progress = 0

import_map = [
    {'file': 'workload_db',
     'model_class': 'Workloads',
     'metadata_model_class': 'WorkloadMetadata',
     'getter_method': 'workload_get',
     'getter_method_params': ['id'],
     'getter_method_kwargs': {'read_deleted': 'yes'}
     },
    {'file': 'workload_vms_db',
     'model_class': 'WorkloadVMs',
     'metadata_model_class': 'WorkloadVMMetadata',
     'getter_method': 'workload_vm_get',
     'getter_method_params': ['id'],
     'getter_method_kwargs': {}
     },
    {'file': 'snapshot_db',
     'model_class': 'Snapshots',
     'metadata_model_class': 'SnapshotMetadata',
     'getter_method': 'snapshot_get',
     'getter_method_params': ['id'],
     'getter_method_kwargs': {'read_deleted': 'yes'}
     },
    {'file': 'snapshot_vms_db',
     'model_class': 'SnapshotVMs',
     'metadata_model_class': 'SnapshotVMMetadata',
     'getter_method': 'snapshot_vm_get',
     'getter_method_params': ['vm_id', 'snapshot_id'],
     'getter_method_kwargs': {}
     },
    {'file': 'resources_db',
     'model_class': 'SnapshotVMResources',
     'metadata_model_class': 'SnapshotVMResourceMetadata',
     'getter_method': 'snapshot_vm_resource_get',
     'getter_method_params': ['id'],
     'getter_method_kwargs': {}
     },
    {'file': 'disk_db',
     'model_class': 'VMDiskResourceSnaps',
     'metadata_model_class': 'VMDiskResourceSnapMetadata',
     'getter_method': 'vm_disk_resource_snaps_get',
     'getter_method_params': ['snapshot_vm_resource_id'],
     'getter_method_kwargs': {}
     },
    {'file': 'network_db',
     'model_class': 'VMNetworkResourceSnaps',
     'metadata_model_class': 'VMNetworkResourceSnapMetadata',
     'getter_method': 'vm_network_resource_snaps_get',
     'getter_method_params': ['vm_network_resource_snap_id'],
     'getter_method_kwargs': {}
     },
    {'file': 'security_group_db',
     'model_class': 'VMSecurityGroupRuleSnaps',
     'metadata_model_class': 'VMSecurityGroupRuleSnapMetadata',
     'getter_method': 'vm_security_group_rule_snaps_get',
     'getter_method_params': ['vm_security_group_snap_id'],
     'getter_method_kwargs': {}
     },
    {'file': 'config_workload_db',
     'model_class': 'ConfigWorkloads',
     'metadata_model_class': 'ConfigWorkloadMetadata',
     'getter_method': 'config_workload_get',
     'getter_method_params': ['id'],
     'getter_method_kwargs': {}
     },
    {'file': 'network_topology_db',
     'model_class': 'SnapNetworkResources',
     'metadata_model_class': 'SnapNetworkResourceMetadata',
     'getter_method': 'snapshot_network_resource_get',
     'getter_method_params': ['id'],
     'getter_method_kwargs': {}
     }, 
]

import_migration_plan_map = [
    {'file': 'migration_plan_db',
     'model_class': 'MigrationPlans',
     'metadata_model_class': 'MigrationPlanMetadata',
     'getter_method': 'migration_plan_get',
     'getter_method_params': ['id'],
     'getter_method_kwargs': {'read_deleted': 'yes'}
     },
    {'file': 'migration_plan_vms_db',
     'model_class': 'MigrationPlanVMs',
     'metadata_model_class': 'MigrationPlanVMMetadata',
     'getter_method': 'migration_plan_vm_get',
     'getter_method_params': ['id'],
     'getter_method_kwargs': {}
     },
]


def fetch_project_list(cntx):
    try:
        global all_cloud_projects
        keystone_client = KeystoneClient(cntx)
        projects = keystone_client.client.get_project_list_for_import(cntx)
        all_cloud_projects = {prj.id for prj in projects}
    except Exception as ex:
        LOG.exception(ex)
        raise ex


def project_id_exists(cntx, project_id):
    """
    Check whether given project id exist in current cloud.
    """
    try:
        global all_cloud_projects
        if project_id in all_cloud_projects:
            return True
        else:
            return False
    except Exception as ex:
        LOG.exception(ex)
        raise ex


def check_tenant(cntx, object_path, upgrade):
    '''
    Check for given object (workload/migration_plan) tenant whether it exist with-in the cloud or not.
    '''
    try:
        object_data = vault_backend.get_object(object_path)
        if object_data is not None and len(object_data) > 0:
            object_values = json.loads(object_data)
            tenant_id = object_values.get('tenant_id', None)
            tenant_id = object_values.get('project_id', tenant_id)
            if project_id_exists(cntx, tenant_id):
                return True
            else:
                raise exception.InvalidRequest(
                    reason=(
                            "Object %s tenant %s does not belong to this cloud" %
                            (object_values['id'], tenant_id)))
    except Exception as ex:
        LOG.exception(ex)


def get_context(values):
    try:
        tenant_id = values.get('tenant_id', None)
        tenant_id = values.get('project_id', tenant_id)
        auth_token = values.get('auth_token')
        tenantcontext = wlm_context.RequestContext(
            user_id=values['user_id'],
            auth_token=auth_token,
            project_id=tenant_id,
            tenant_id=tenant_id)
        return tenantcontext
    except Exception as ex:
        LOG.exception(ex)


def _adjust_values(cntx, new_version, values, upgrade):
    values['version'] = new_version
    if not upgrade:
        values['user_id'] = cntx.user_id
        values['project_id'] = cntx.project_id
    if 'metadata' in values:
        metadata = {}
        for meta in values['metadata']:
            metadata[meta['key']] = meta['value']
        values['metadata'] = metadata
    if 'host' in values:
        values['host'] = socket.gethostname()
    return values


def import_allowed_quotas(cntx, source_btt=None):
    try:
        db = WorkloadMgrDB().db
        (backup_target, path) = vault.get_allowed_quota_backup_target()
        allowed_quotas_values = json.loads(backup_target.get_object(path))
        admin_context = nova._get_tenant_context(cntx, cloud_admin=True)
        admin_context.is_admin = True
        db.create_allowed_quotas(admin_context, allowed_quotas_values)
    except Exception as ex:
        LOG.exception(ex)


def import_settings(cntx, new_version, source_btt=None, upgrade=True):

    def should_create_setting(db_records, json_values):
        for obj in db_records:
            if obj.name == json_values.get('name', '') and obj.project_id == json_values.get('project_id', ''):
                return False
        return True

    try:
        db = WorkloadMgrDB().db
        (backup_target, path) = vault.get_settings_backup_target()
        settings = json.loads(backup_target.get_object(path))
        db_settings = db.setting_get_all(cntx, **{'type': 'trust_id', 'get_hidden': True})
        for setting_values in settings:
            try:
                if 'key' in setting_values:
                    setting_values['name'] = setting_values['key']
                flag = True
                # in case of workload import along with configure;
                # configuration (during trust creation) updates backend file,
                # so avoiding inserting duplicate entry
                if setting_values['type'] == 'trust_id':
                    flag = should_create_setting(db_settings, setting_values)
                setting_values = _adjust_values(cntx, new_version,
                                                setting_values, upgrade)
                if flag:
                    db.setting_create(cntx, setting_values)
            except Exception as ex:
                LOG.exception(ex)
    except Exception as ex:
        LOG.exception(ex)


def import_policy(cntx, new_version, source_btt=None, upgrade=True):
    try:
        db = WorkloadMgrDB().db
        backup_target, path = vault.get_settings_backup_target()
        policy_path = backup_target.get_policy_path()
        policy_db_files = [f for f in os.listdir(policy_path) if (
                os.path.isfile(os.path.join(policy_path, f)) and f.startswith("policy_"))]
        existing_policies = [existing_policy.id for existing_policy in db.policy_get_all(cntx)]
        for policy_db in policy_db_files:
            get_values = lambda objs, k, v: {obj[k]: obj[v] for obj in objs}
            policy_json = json.loads(backup_target.get_object(
                os.path.join(policy_path, policy_db)))
            if policy_json['id'] in existing_policies:
                # if policy already part of the DB then skip it
                continue
            policy_assignments = policy_json.pop('policy_assignments', [])
            field_values = policy_json.pop('field_values', [])
            policy_json['field_values'] = get_values(field_values, 'policy_field_name', 'value')
            if policy_json['field_values'].get('retention_policy_type', None):
                # in case importing from older env. then convert as per new policy fields
                policy_json['field_values'] = workload_utils.convert_old_policy_to_new_policy_field(field_values)
            metadata = policy_json.pop('metadata', [])
            policy_json['metadata'] = get_values(metadata, 'key', 'value')
            _adjust_values(cntx, new_version, policy_json, upgrade)
            try:
                db.policy_create(cntx, policy_json)
            except exception.DBError as ex:
                # handeling the race condition as we initiate per process for each wl import, corner case of two process trying sam epolicy creation
                LOG.error(ex)
                continue
            if len(policy_assignments) > 0:
                for pa in policy_assignments:
                    values = {'policy_id': pa['policy_id'], 'project_id': pa['project_id'], \
                              'project_name': pa['project_name'], 'policy_name': pa['policy_name']}
                    db.policy_assignment_create(cntx, values)
    except Exception as ex:
        LOG.exception(ex)


def update_backup_media_target(file_path, backup_endpoint):
    try:
        file_data = vault_backend.get_object(file_path)
        if file_data is None or len(file_data) <= 0:
            return
        json_obj = json.loads(file_data)

        # This case is for config_workload
        if json_obj.get('backup_media_target', None):
            if backup_endpoint != json_obj.get('backup_media_target'):
                json_obj['backup_media_target'] = backup_endpoint
        # Check for config_backup
        elif json_obj.get('vault_storage_path', None):
            vault_storage_path = json_obj.get('vault_storage_path')
            mount_path = vault.get_backup_target(backup_endpoint).mount_path
            if vault_storage_path.startswith(mount_path) is False:
                backup_path = vault_storage_path.split(
                    vault.CONF.cloud_unique_id + "/")[1]
                json_obj['vault_storage_path'] = os.path.join(
                    mount_path, vault.CONF.cloud_unique_id, backup_path)
        else:
            # Case for workload and snapshot
            metadata = json_obj.get('metadata', None)
            if metadata:
                for meta in metadata:
                    if meta['key'] == 'backup_media_target':
                        if backup_endpoint != meta['value']:
                            meta['value'] = backup_endpoint
                            break

            json_obj['metadata'] = metadata

        with open(file_path, 'w') as outfile:
            json.dump(json_obj, outfile)

    except Exception as ex:
        LOG.exception(ex)


def get_workload_url(context, workload_ids, source_btt, target_btt, upgrade):
    '''
    Iterate over all NFS backups mounted for list of workloads available.
    '''
    workload_url_iterate = []
    workload_ids_to_import = list(workload_ids) if workload_ids else []
    failed_workloads = []

    def add_config_workload(context, config_workload_path):
        try:
            # If config_workload is not in the database then only import it.
            db = WorkloadMgrDB().db
            config_workload = db.config_workload_get(context)
        except exception.ConfigWorkloadNotFound:
            workload_url_iterate.append(config_workload_path)
            # Updating backup media and adding config_workload for import
            config_workload_db = os.path.join(
                config_workload_path, "config_workload_db")
            if os.path.exists(config_workload_db):
                update_backup_media_target(config_workload_db, backup_endpoint)

    def add_workload(context, workload_id, workload, backup_endpoint, upgrade):
        # Before adding the workload check whether workload is valid or not
        if vault.validate_workload(workload) is False:
            failed_workloads.append(workload_id)
            LOG.error(
                "Workload %s doesn't contains required database files," %
                workload_id)
            return

        # Check whether workload tenant exist in current cloud or not
        object_path = os.path.join(workload, "workload_db")
        if check_tenant(context, object_path, upgrade):
            # update workload_backend_endpoint map
            workload_backup_endpoint[workload_id] = backup_endpoint
            workload_url_iterate.append(workload)

            if vault.CONF.vault_storage_type == 'nfs':
                # Update backup media target
                update_backup_media_target(
                    os.path.join(
                        workload,
                        "workload_db"),
                    backup_endpoint)
                for item in os.listdir(workload):
                    snapshot_db = os.path.join(workload, item, "snapshot_db")
                    if os.path.exists(snapshot_db):
                        update_backup_media_target(snapshot_db, backup_endpoint)
        else:
            failed_workloads.append(workload_id)

    for backup_endpoint in [source_btt['backup_target_types']['filesystem_export']]:
        backup_target = None
        global vault_backend
        try:
            backup_target = vault.get_backup_target(backup_endpoint)
            if vault_backend is None:
                vault_backend = backup_target

            # importing config backup only when user has not specified any
            # workload id
            if len(workload_ids) == 0:
                config_workload_path = os.path.join(
                        backup_target.mount_path,
                        vault.CONF.cloud_unique_id,
                        'config_workload')
                if os.path.exists(config_workload_path):
                        add_config_workload(context, config_workload_path)

            workload_url = backup_target.get_workloads(context)

            for workload in workload_url:
                workload_id = os.path.split(
                    workload)[1].replace('workload_', '')

                if workload_ids and len(workload_ids) > 0:
                    # If workload found in given workload id's then add to
                    # iterate list
                    if workload_id in workload_ids_to_import:
                        workload_ids_to_import.remove(workload_id)
                        add_workload(
                            context,
                            workload_id,
                            workload,
                            backup_endpoint,
                            upgrade)
                else:
                    add_workload(
                        context,
                        workload_id,
                        workload,
                        backup_endpoint,
                        upgrade)

        except Exception as ex:
            LOG.exception(ex)

        finally:
            pass

    if len(workload_ids_to_import) > 0:
        failed_workloads.extend(workload_ids_to_import)

    return (workload_url_iterate, failed_workloads)


def update_migration_plan_metadata(migration_plan_values):
    '''
    Update migration values with "backup_media_target"
    '''
    try:
        migration_plan_metadata = migration_plan_values['metadata']
        if 'backup_media_target' not in migration_plan_metadata:
            migration_plan_values['metadata'][0]['backup_media_target'] = \
                migration_plan_backup_endpoint[migration_plan_values['id']]

        return migration_plan_values
    except Exception as ex:
        LOG.exception(ex)


def get_migration_plan_url(context, migration_plan_ids, source_backup_endpoints, target_btt, upgrade):
    '''
    Iterate over all NFS backups mounted for list of migration_plans available.
    '''
    migration_plan_url_iterate = []
    migration_plan_ids_to_import = list(migration_plan_ids) if migration_plan_ids else []
    failed_migration_plans = []

    def add_migration_plan(context, migration_plan_id, migration_plan, backup_endpoint, upgrade):
        # Before adding the migration plan check whether migration plan is valid or not
        if vault.validate_migration_plan(migration_plan) is False:
            failed_migration_plans.append(migration_plan_id)
            LOG.error(
                "Migration Plan %s doesn't contains required database files," %
                migration_plan_id)
            return

        # Check whether migration plan tenant exist in current cloud or not
        object_path = os.path.join(migration_plan, "migration_plan_db")
        if check_tenant(context, object_path, upgrade):
            # update workload_backend_endpoint map
            migration_plan_backup_endpoint[migration_plan_id] = backup_endpoint
            migration_plan_url_iterate.append(migration_plan)

            # we only support  NFS as backend for migration
            assert vault.CONF.vault_storage_type == 'nfs'
            # Update backup media target
            update_backup_media_target(
                os.path.join(migration_plan, "migration_plan_db"),
                backup_endpoint)
            for item in os.listdir(migration_plan):
                migration_db = os.path.join(migration_plan, item, "migration_db")
                if os.path.exists(migration_db):
                    update_backup_media_target(migration_db, backup_endpoint)
        else:
            failed_migration_plans.append(migration_plan_id)

    db = WorkloadMgrDB().db
    use_default_target_btt = False
    if not target_btt:
        # if target-btt is not given then use default BTT as target-btt
        target_btt_ref = db.get_default_backup_target_type(context)
        target_btt = target_btt_ref[0].name
        default_target_btt = target_btt
        use_default_target_btt = True
    default_target_btt = target_btt
    target_bt_ref = db.get_backup_target_by_btt(context, target_btt)
    default_target_bt_ref = target_bt_ref
    target_backup_endpoint = target_bt_ref.filesystem_export.strip()
    default_target_backup_endpoint = target_backup_endpoint
    for existing_backup_target in db.backup_target_get_all(context):
        backup_endpoint = existing_backup_target.filesystem_export
        backup_target = None
        global vault_backend
        try:
            if source_backup_endpoints and backup_endpoint not in source_backup_endpoints:
                continue
            backup_target = vault.get_backup_target(backup_endpoint)
            if vault_backend is None:
                vault_backend = backup_target

            migration_plan_url = backup_target.get_migration_plans(context)

            for migration_plan in migration_plan_url:
                if use_default_target_btt:
                    mp_db_file_data = vault_backend.get_object(os.path.join(migration_plan, 'migration_plan_db'))
                    mp_json_obj = json.loads(mp_db_file_data)
                    for mp_metadata in mp_json_obj['metadata']:
                        if mp_metadata['key'] == 'backup_media_target':
                            # check if we have BTT belongs to the original backup endpoint
                            try:
                                target_bt_ref = db.get_backup_target_by_backend_endpoint(context, mp_metadata['value'])
                                target_btt_ref = db.backup_target_type_get_all_by_backup_target(context, target_bt_ref.id)
                                target_btt = target_btt_ref[0].name
                                target_backup_endpoint = target_bt_ref.filesystem_export.strip()
                            except Exception as ex:
                                LOG.exception(ex)
                                target_btt = default_target_btt
                                target_backup_endpoint = default_target_backup_endpoint

                migration_plan_id = os.path.split(
                    migration_plan)[1].replace('migration_plan_', '')

                if migration_plan_ids and len(migration_plan_ids) > 0:
                    # If migration_plan found in given migration_plan id's then add to
                    # iterate list
                    if migration_plan_id in migration_plan_ids_to_import:
                        migration_plan_ids_to_import.remove(migration_plan_id)
                        add_migration_plan(
                            context, migration_plan_id, migration_plan,
                            target_backup_endpoint, upgrade)
                elif source_backup_endpoints:
                    if backup_endpoint in source_backup_endpoints:
                        add_migration_plan(
                            context, migration_plan_id, migration_plan,
                            target_backup_endpoint, upgrade)
                else:
                    add_migration_plan(
                        context, migration_plan_id, migration_plan,
                        target_backup_endpoint, upgrade)
        except Exception as ex:
            LOG.exception(ex)

        finally:
            pass

    if len(migration_plan_ids_to_import) > 0:
        failed_migration_plans.extend(migration_plan_ids_to_import)

    return (migration_plan_url_iterate, failed_migration_plans)


def get_json_files(context, jobid, workload_ids, db_dir, upgrade, source_btt, target_btt, importing_wl_job_data_id, **kwargs):
    # Map to store all path of all JSON files for a  resource
    backup_target_path = source_btt['backup_target_types']['filesystem_export_mount_path']
    source_btt_endpoint = source_btt['backup_target_types']['filesystem_export']
    db_json = []
    allowed_resource = ['network_db', 'security_group_db']
    db_files_map = OrderedDict(
        [
            ('snapshot_db', []),
            ('workload_db', []),
            ('workload_vms_db', []),
            ('snapshot_vms_db', []),
            ('resources_db', []),
            ('network_db', []),
            ('disk_db', []),
            ('security_group_db', []),
            ('config_workload_db', []),
            ('network_topology_db', []),
        ])

    def multi_run_wrapper(args):
        return generate_path_for_import(*args)

    def generate_path_for_import(workload_id, snapshot_id):
        workload_db_path = os.path.join(
                              backup_target_path, 
                              'workload_'+workload_id,
                              'workload_db')
        workload_vms_db_path = os.path.join(
                                  backup_target_path,
                                  'workload_'+workload_id,
                                  'workload_vms_db')
        if not snapshot_id:
            db_files_map['workload_db'].append(workload_db_path) if workload_db_path not in db_files_map['workload_db'] else None
            db_files_map['workload_vms_db'].append(workload_vms_db_path) if workload_vms_db_path not in db_files_map['workload_vms_db'] else None
            return db_files_map

        snapshot_path = os.path.join(
                            backup_target_path, 
                            'workload_'+workload_id,
                             snapshot_id)
        snapshot_db_path = os.path.join(snapshot_path, 'snapshot_db')
        snapshot_vms_db_path = os.path.join(snapshot_path, 'snapshot_vms_db')
        snapshot_network_topology_db_path = os.path.join(
                                              snapshot_path, 
                                              'network_topology_db')
        snapshot_resources_db_path = os.path.join(
                                       snapshot_path,
                                       'resources_db')
        snapshot_network_path = os.path.join(
                                    snapshot_path,
                                    'network')
        snapshot_security_group_path = os.path.join(
                                         snapshot_path,
                                         'security_group')

        db_files_map['workload_db'].append(workload_db_path) if workload_db_path not in db_files_map['workload_db'] else None
        db_files_map['workload_vms_db'].append(workload_vms_db_path) if workload_vms_db_path not in db_files_map['workload_vms_db'] else None
        db_files_map['snapshot_db'].append(snapshot_db_path)
        db_files_map['snapshot_vms_db'].append(snapshot_vms_db_path)
        db_files_map['network_topology_db'].append(snapshot_network_topology_db_path)
        db_files_map['resources_db'].append(snapshot_resources_db_path)
        resource_data = []
        try:
            with open(snapshot_resources_db_path) as f:
                resource_data = json.load(f)
        except Exception as ex:
            LOG.warning('skipping snapshot import as resource file is not present, snapshot is not proper.')

        for each_resource in resource_data:
            if each_resource.get('resource_type').lower() == 'security_group':
                security_group_db_path = os.path.join(
                                             snapshot_security_group_path,
                                             'vm_res_id_'+each_resource.get('id'),
                                             'security_group_db')
                db_files_map['security_group_db'].append(security_group_db_path)
            if each_resource.get('resource_type').lower() == 'network' or \
                each_resource.get('resource_type').lower() == 'nic' or \
                each_resource.get('resource_type').lower() == 'subnet':
                network_db_path = os.path.join(
                                    snapshot_network_path,
                                    'vm_res_id_'+each_resource.get('id'),
                                    'network_db')
                db_files_map['network_db'].append(network_db_path)
            if each_resource.get('resource_type').lower() == 'disk':
                disk_db_path = os.path.join(
                                 snapshot_path,
                                 'vm_id_'+each_resource.get('vm_id'),
                                 'vm_res_id_'+each_resource.get('id'),
                                 'disk_db')
                db_files_map['disk_db'].append(disk_db_path)

        return db_files_map

    def generate_file_chunks():
        data = []
        for k, v in db_files_map.items():
                for x in range(0, len(v), 100):
                    data.append(
                       (k, v[x:x+100])
                    )
        return data
    
    def count_resources(args):
        """ returns total count of allowed resources."""
        db, files = args[0], args[1]
        global count
        if db not in allowed_resource:
            count += len(files)

    import threading
    from workloadmgr.db.workloadmgrdb import WorkloadMgrDB
    lock = threading.Lock()
    def process_files(args):
        db = args[0]
        files = args[1]
        workloadid = args[2]
        source_btt_endpoint = args[3]
        vault_backend = vault.get_backup_target(source_btt_endpoint)

        def process(file_names):
            try:
                db_json = []
                for file_name in file_names:
                    try:
                        file_data = vault_backend.get_object(file_name)
                    except Exception as ex:
                        LOG.warning("File {0} doesn't exist.".format(file_name))
                        continue
                    if file_data is not None and len(file_data) > 0:
                        json_obj = json.loads(file_data)
                        if db == 'snapshot_db':
                            # Creating a map for each workload with workload_backup_media_size.
                            if json_obj['snapshot_type'] == 'full':
                                workload_backup_media_size[json_obj['workload_id']
                                ] = json_obj['size']

                            # add retention tags as '["hourly"]' if not there already
                            # update backup_target_type metadata
                            json_obj = workload_utils.\
                                    convert_to_new_snapshot_meta(json_obj, source_btt['name'])
                        elif db == 'workload_db':
                            # In case of workload updating each object with
                            # "workload_backup_media_size" and "backup_media_target"
                            json_obj = workload_utils.convert_old_to_new_schedule(
                                    json_obj, workload_backup_media_size)
                        db_json.append(json_obj)
                        if jobid:
                            global wl_import_count
                            global  percent_progress
                            wl_import_count = 1
                            percent_progress += (wl_import_count/count)*78
                            update_wl_res = {"progress": percent_progress}
                            WorkloadMgrDB().db.workload_import_update(context, importing_wl_job_data_id, update_wl_res)
                with lock:
                    pickle.dump(db_json, open(os.path.join(db_dir, db), 'ab'))
            except Exception as ex:
                LOG.exception(ex)
        if db not in allowed_resource:
            if jobid:
                WorkloadMgrDB().db.workload_import_update(context, importing_wl_job_data_id, {'message': "Starting Importing {0} Resource.".format(db)})
            process(files)
            if jobid:
                WorkloadMgrDB().db.workload_import_update(context, importing_wl_job_data_id, {'message': "Completed Importing {0} Resource.".format(db)})
        elif db == 'network_db':
            process([files])
            #pickle.dump([], open(os.path.join(db_dir, db), 'wb'))

    try:
        failed_workloads = []
        if not kwargs:
            workload_url_iterate, failed_workloads = get_workload_url(
                context, workload_ids, source_btt, target_btt, upgrade)

            if len(failed_workloads) == len(workload_url_iterate) == 0:
                WorkloadMgrDB().db.workload_import_update(context, importing_wl_job_data_id, {'status': 'error', 'progress': 0})
                raise exception.WorkloadsNotFound()

            if len(workload_ids) > 0 and len(
                failed_workloads) == len(workload_ids):
                #WorkloadMgrDB().db.workload_import_update(context, importing_wl_job_data_id, {'status': 'error', 'progress': 0})
                raise Exception('Failed Workload.')

            desired_file_list = [(workload_ids[0], file_name, ) for file_name in os.listdir(workload_url_iterate[0]) if file_name.startswith("snapshot_")]
            # desired_file_list is Empty, means No snapshots only workloads info.
            if not desired_file_list:
                if bool( [ each in ['workload_vms_db', 'workload_db'] for each in os.listdir(workload_url_iterate[0])]):
                    desired_file_list = [(workload_ids[0], None)]
                else:
                    WorkloadMgrDB().db.workload_import_update(context,
                                 importing_wl_job_data_id,
                                 {'status': 'error', 'message': 'Empty Workload.'})
                    raise Exception('Empty Workload.')
        else:
            resource_path = kwargs.get('resource_path')
            resource_type = kwargs.get('resource_type')
            workload_id = kwargs.get('workload_id')
            desired_file_list = None
            if resource_type == 'resources_db':
                desired_file_list = [(resource_type, resource_path)]
            if resource_type == 'network_db':
                desired_file_list = [(resource_type, os.path.join(resource_path, file_name, 'network_db'), workload_id, source_btt_endpoint) for file_name in os.listdir(resource_path) if file_name.startswith("vm_res_id_")]

        with ThreadPoolExecutor(max_workers = 16) as executor:
            if not kwargs:
                res = executor.map(multi_run_wrapper, desired_file_list)
                data = [(k, v, workload_ids[0], source_btt_endpoint) for k, v in db_files_map.items()]
                executor.map(count_resources, data)
                res1 = executor.map(process_files, data)
            else:
                res1 = executor.map(process_files, desired_file_list)
        return failed_workloads

    except Exception as ex:
        LOG.exception(ex)
        raise ex


def get_migration_plan_json_files(context, migration_plan_ids, source_backup_endpoints, target_btt, db_dir, upgrade):
    # Map to store all path of all JSON files for a  resource
    db_files_map = OrderedDict(
        [
            ('migration_plan_db', []),
            ('migration_plan_vms_db', []),
        ])

    try:
        migration_plan_url_iterate, failed_migration_plans = get_migration_plan_url(
            context, migration_plan_ids, source_backup_endpoints, target_btt, upgrade)

        if len(failed_migration_plans) == len(migration_plan_url_iterate) == 0:
            raise exception.MigrationPlansNotFound()

        if migration_plan_ids and len(migration_plan_ids) > 0 and len(
                failed_migration_plans) == len(migration_plan_ids):
            return failed_migration_plans
        wlm_db = WorkloadMgrDB().db
        use_default_target_btt = False
        if not target_btt:
            # if target-btt is not given then use default BTT as target-btt
            target_btt_ref = wlm_db.get_default_backup_target_type(context)
            target_btt = target_btt_ref[0].name
            use_default_target_btt = True
        default_target_btt = target_btt
        target_bt_ref = wlm_db.get_backup_target_by_btt(context, target_btt)
        default_target_bt_ref = target_bt_ref
        # Create list of all files related to a common resource
        # TODO:Find alternate for os.walk
        for migration_plan_path in migration_plan_url_iterate:
            for path, subdirs, files in os.walk(migration_plan_path):
                for name in files:
                    if name.endswith("migration_plan_db"):
                        db_files_map['migration_plan_db'].append(
                            os.path.join(path, name))
                    elif name.endswith("migration_plan_vms_db"):
                        db_files_map['migration_plan_vms_db'].append(
                            os.path.join(path, name))

        # Iterate over each file for a resource in all NFS mounts
        # and create a single db file for that.
        for db, files in list(db_files_map.items()):
            db_json = []

            for file_name in files:
                file_data = vault_backend.get_object(file_name)
                if file_data is not None and len(file_data) > 0:
                    json_obj = json.loads(file_data)

                    if db == 'migration_plan_db':
                        if use_default_target_btt:
                            for mp_metadata in json_obj['metadata']:
                                if mp_metadata['key'] == 'backup_media_target':
                                    # check if we have BTT belongs to the original backup endpoint
                                    try:
                                        target_bt_ref = wlm_db.get_backup_target_by_backend_endpoint(context, mp_metadata['value'])
                                        target_btt_ref = wlm_db.backup_target_type_get_all_by_backup_target(context, target_bt_ref.id)
                                        target_btt = target_btt_ref[0].name
                                        target_backup_endpoint = target_bt_ref.filesystem_export.strip()
                                    except Exception as ex:
                                        LOG.exception(ex)
                                        target_btt = default_target_btt
                                        target_bt_ref = default_target_bt_ref

                        # In case of migration plan updating each object with
                        # "migration_plan_backup_media_size" and "backup_media_target" and "backup_target_type"
                        found_backup_media_target, found_backup_target_type = False, False
                        for mp_metadata in json_obj['metadata']:
                            if mp_metadata['key'] == 'backup_target_type':
                                mp_metadata['value'] = target_btt
                                found_backup_target_type = True
                            if mp_metadata['key'] == 'backup_media_target':
                                mp_metadata['value'] = target_bt_ref.filesystem_export
                                found_backup_media_target = True
                        if not found_backup_media_target:
                            json_obj["metadata"].append({"migration_plan_id": json_obj["id"], "key": "backup_media_target", "value": target_bt_ref.filesystem_export, "id": str(uuid.uuid4())})
                        if not found_backup_target_type:
                            json_obj["metadata"].append({"migration_plan_id": json_obj["id"], "key": "backup_target_type", "value": target_btt, 'id': str(uuid.uuid4())})
                        json_obj = update_migration_plan_metadata(json_obj)
                    db_json.append(json_obj)

            pickle.dump(db_json, open(os.path.join(db_dir, db), 'wb'))
        return failed_migration_plans

    except Exception as ex:
        LOG.exception(ex)
        raise ex


def import_resources(tenantcontext, jobid, workload_ids, resource_map,
                     new_version, db_dir, upgrade, importing_wl_job_data_id, **kwargs):
    '''
    create list of dictionary object for each resource and
    dump it into the database.
    '''
    resources_list = []  # Contains list of json objects need to insert
    resources_list_update = []  # Contains list of json objects need to update
    resources_metadata_list = []
    resources_metadata_list_update = []
    models_module = importlib.import_module('workloadmgr.db.sqlalchemy.models')
    file_name = resource_map['file']
    model_class = getattr(models_module, resource_map['model_class'])
    metadata_model_class = getattr(models_module, resource_map['metadata_model_class'])
    getter_method = resource_map['getter_method']
    getter_method_params = resource_map['getter_method_params']
    getter_method_kwargs = resource_map['getter_method_kwargs']

    db = WorkloadMgrDB().db
    get_resource_method = getattr(db, getter_method)

    def update_resource_list(cntxt, resource):
        '''
        Update resource list with resource objects need to
        insert/update in database.
        '''
        # if resource is workload then check the status of workload and
        # set it to available.
        if file_name in ['workload_db', 'config_workload_db']:
            if resource['status'] == 'locked':
                resource['status'] = 'available'

        if file_name in ['snapshot_db']:
            if resource['status'] != 'available':
                resource['status'] = 'error'
                resource['error_msg'] = 'Upload was not completed successfully.'

        try:
            # Check if resource already in the database then update.
            param_list=None
            if isinstance(resource, dict):
                param_list = tuple([resource[param]
                                for param in getter_method_params])
            else:
                resource = resource[0]
                param_list = tuple([resource[param]
                                for param in getter_method_params])
            if get_resource_method(tenantcontext, *param_list, **getter_method_kwargs):
                # pass
                # TODO: Uncomment the code for updating existing resources
                for resource_metadata in resource.pop('metadata'):
                    resources_metadata_list_update.append(resource_metadata)
                resource = _adjust_values(tenantcontext, new_version, resource, upgrade)
                resources_list_update.append(resource)
            else:
                raise exception.NotFound()
        except Exception:
            # If resource not found then create new entry in database
            for resource_metadata in resource.pop('metadata'):
                resources_metadata_list.append(resource_metadata)
            resource = _adjust_values(
                tenantcontext, new_version, resource, upgrade)
            resources_list.append(resource)

    try:
        # Load file for resource containing all objects neeed to import
        resources_db_list = []
        f = open(os.path.join(db_dir, file_name), 'rb')
        if not kwargs:
            resources_db_list = pickle.load(f)
        else:
            while True:
                try:
                    data = pickle.load(f)
                    resources_db_list.append(data)
                except EOFError:
                    break

        for resources in resources_db_list:
            if resources is None:
                continue
            if isinstance(resources, list):
                for resource in resources:
                    # In case if workoad/snapshod updating object values
                    # with their respective tenant id and user id using context
                    if file_name in ['workload_db', 'snapshot_db']:
                        tenantcontext = get_context(resource)
                    update_resource_list(tenantcontext, resource)
            else:
                if file_name in ['workload_db', 'snapshot_db']:
                    tenantcontext = get_context(resources)
                update_resource_list(tenantcontext, resources)

        # raise exception when workload already exists in DB
        if file_name == 'workload_db' and resources_list_update:
            raise exception.WorkloadAlreadyExist()

        try:
            if resources_list:
                # Dump list of objects into the database.
                DBSession.rollback()
                DBSession.bulk_insert_mappings(model_class, resources_list)
                DBSession.commit()
                DBSession.bulk_insert_mappings(metadata_model_class, resources_metadata_list)
                DBSession.commit()
                if jobid:
                    WorkloadMgrDB().db.workload_import_update(tenantcontext, importing_wl_job_data_id, {"message": file_name + ' DB Dump Done.'})

        except Exception as ex:
            DBSession.rollback()
            LOG.exception(ex)
            LOG.error("Importing new workloads {} failed".format(workload_ids))

        try:
            if resources_list_update:
                # TODO: Uncomment the code for updating existing resources
                DBSession.rollback()
                DBSession.bulk_update_mappings(model_class, resources_list_update)
                DBSession.commit()
                DBSession.bulk_update_mappings(metadata_model_class, resources_metadata_list_update)
                DBSession.commit()

        except Exception as ex:
            DBSession.rollback()
            LOG.exception(ex)
            LOG.error("updating existing resources {} failed".format(resources_list_update))

        # if workloads/config_workload then check for job schedule, if it's
        # there then update it.
        if file_name in ['workload_db', 'config_workload_db']:
            for resource in resources_list:
                if file_name == 'workload_db':
                    workload = models.Workloads()
                    workload.update(resource)
                    workloads.append(workload)
                else:
                    workload = models.ConfigWorkloads()
                    workload.update(resource)

                # Check if job schedule is enable then add scheduler.
                if len(resource['jobschedule']) and str(pickle.loads(bytes(
                        resource['jobschedule'], 'utf-8'))['enabled']).lower() == 'true':
                    workload_api = workloadAPI.API()
                    workload_api.cron_rpcapi.workload_add_scheduler_job(tenantcontext, pickle.loads(bytes(
                        resource['jobschedule'], 'utf-8')), workload)
    except exception.WorkloadAlreadyExist as ex:
        LOG.exception(ex)
        raise ex
    except Exception as ex:
        LOG.exception(ex)

def import_workload(cntx, jobid, workload_ids, new_version, upgrade, source_btt=None, target_btt=None, importable_wl_job_data_id=None, importing_wl_job_data_id=None, **kwargs):
    '''
    Read all json files for all workloads from all available NFS mounts
    and perform bulk insert in the database.
    '''
    try:
        # Create temporary folder to store JSON files.
        db_dir = tempfile.mkdtemp()

        del workloads[:]

        """Fetch list off all existing projects, 
           if kwargs is present it means we need to load some resources on demand.
           In that case, we can skip fetching all projects.
        """
        if not kwargs:
            try:
                fetch_project_list(cntx)
            except Exception as ex:
                cntx = get_context(cntx.__dict__)
                fetch_project_list(cntx)

        DBSession.autocommit = False
        failed_workloads = get_json_files(cntx, jobid, workload_ids, db_dir, upgrade, source_btt, target_btt, importing_wl_job_data_id, **kwargs)
        for resource_map in import_map:
            if kwargs.get('resource_type') == resource_map['file']:
                import_resources(cntx, jobid, workload_ids,
                        resource_map, new_version, db_dir,
                        upgrade, importing_wl_job_data_id, **kwargs)
                break
            elif not kwargs.get('resource_type') and \
                    (resource_map['file'] in ('workload_db', 'workload_vms_db') or \
                    (source_btt['backup_target_types']['filesystem_export'] == \
                    target_btt['backup_target_types']['filesystem_export'])):
                # if user changes BT of workload then we skip importing of snapshots
                import_resources(cntx, jobid, workload_ids, resource_map, new_version, db_dir, upgrade, importing_wl_job_data_id, **kwargs)
            if jobid:
                global percent_progress
                percent_progress += (1/len(import_map)) * 22
                WorkloadMgrDB().db.workload_import_update(cntx, importing_wl_job_data_id, {'progress': percent_progress})
        DBSession.autocommit = True

        if jobid:
            if importable_wl_job_data_id:
                try:
                    # making sure currently processing workload is imported or not
                    wl_db_obj = WorkloadMgrDB().db.workload_get(cntx, workload_ids[0])
                    wl_btt = [w_meta['value'] for w_meta in wl_db_obj.metadata if w_meta['key'] == 'backup_target_types']
                    importable_wl_job_data_ref = WorkloadMgrDB().db.importable_workload_get(cntx, importable_wl_job_data_id)
                    if wl_btt and wl_btt[0] in json.loads(importable_wl_job_data_ref.data)['backup_target_types']:
                        WorkloadMgrDB().db.workload_import_update(cntx, importing_wl_job_data_id, {"status": 'imported'})
                        WorkloadMgrDB().db.importable_workloads_update(cntx, importable_wl_job_data_id, {'status': 'imported'})
                    else:
                        WorkloadMgrDB().db.workload_import_update(cntx, importing_wl_job_data_id, {"status": 'skipped', "message": 'Workload {0} Already Present.'.format(workload_ids[0])})
                        WorkloadMgrDB().db.importable_workloads_update(cntx, importable_wl_job_data_id, {'status': 'skipped'})
                except exception.WorkloadsNotFound as ex:
                    raise ex
            else:
                WorkloadMgrDB().db.workload_import_update(cntx, importing_wl_job_data_id, {"status": 'imported'})

        return {'workloads': {'imported_workloads': workloads,
                              'failed_workloads': failed_workloads}}
    except Exception as ex:
        LOG.exception(ex)
        raise ex
    finally:
        # Remove temporary folder
        if os.path.exists(db_dir):
            shutil.rmtree(db_dir, ignore_errors=True)


def import_migration_plan_resources(tenantcontext, resource_map,
                                    new_version, db_dir, upgrade):
    '''
    create list of dictionary object for each resource and
    dump it into the database.
    '''
    resources_list = []  # Contains list of json objects need to insert
    resources_list_update = []  # Contains list of json objects need to update
    resources_metadata_list = []
    resources_metadata_list_update = []
    models_module = importlib.import_module('workloadmgr.db.sqlalchemy.models')
    file_name = resource_map['file']
    model_class = getattr(models_module, resource_map['model_class'])
    metadata_model_class = getattr(models_module, resource_map['metadata_model_class'])
    getter_method = resource_map['getter_method']
    getter_method_params = resource_map['getter_method_params']
    getter_method_kwargs = resource_map['getter_method_kwargs']

    db = WorkloadMgrDB().db
    get_resource_method = getattr(db, getter_method)

    def update_resource_list(cntxt, resource):
        '''
        Update resource list with resource objects need to
        insert/update in database.
        '''
        # if resource is workload then check the status of workload and
        # set it to available.
        if file_name == 'migration_plan_db':
            if resource['status'] == 'locked':
                resource['status'] = 'available'

        try:
            # Check if resource already in the database then update.
            param_list = tuple([resource[param]
                                for param in getter_method_params])
            if get_resource_method(tenantcontext, *param_list, **getter_method_kwargs):
                # pass
                # TODO: Uncomment the code for updating existing resources
                for resource_metadata in resource.pop('metadata'):
                    resources_metadata_list_update.append(resource_metadata)
                resource = _adjust_values(tenantcontext, new_version, resource, upgrade)
                resources_list_update.append(resource)
            else:
                raise exception.NotFound()
        except Exception:
            # If resource not found then create new entry in database
            for resource_metadata in resource.pop('metadata'):
                resources_metadata_list.append(resource_metadata)
            resource = _adjust_values(
                tenantcontext, new_version, resource, upgrade)
            resources_list.append(resource)

    try:
        # Load file for resource containing all objects neeed to import
        resources_db_list = pickle.load(
            open(os.path.join(db_dir, file_name), 'rb'))

        for resources in resources_db_list:
            if resources is None:
                continue
            if isinstance(resources, list):
                for resource in resources:
                    # In case if workoad/snapshod updating object values
                    # with their respective tenant id and user id using context
                    if file_name in ['migration_plan_db']:
                        tenantcontext = get_context(resource)
                    update_resource_list(tenantcontext, resource)
            else:
                if file_name in ['migration_plan_db']:
                    tenantcontext = get_context(resources)
                update_resource_list(tenantcontext, resources)

        try:
            # Dump list of objects into the database.
            DBSession.rollback()
            DBSession.bulk_insert_mappings(model_class, resources_list)
            DBSession.commit()
            DBSession.bulk_insert_mappings(metadata_model_class, resources_metadata_list)
            DBSession.commit()
        except Exception as ex:
            DBSession.rollback()
            LOG.exception(ex)
            LOG.error("Importing new migration plan failed")

        try:
            # TODO: Uncomment the code for updating existing resources
            DBSession.rollback()
            DBSession.bulk_update_mappings(model_class, resources_list_update)
            DBSession.commit()
            DBSession.bulk_update_mappings(metadata_model_class, resources_metadata_list_update)
            DBSession.commit()
            for resource in resources_list_update:
                workload_utils.upload_migration_plan_db_entry(tenantcontext, resource['id'])
        except Exception as ex:
            DBSession.rollback()
            LOG.exception(ex)
            LOG.error("Importing existing migration plan failed")

        for resource in resources_list:
            migration_plan = models.MigrationPlans()
            migration_plan.update(resource)
            migration_plans.append(migration_plan)
            workload_utils.upload_migration_plan_db_entry(tenantcontext, resource['id'])

    except Exception as ex:
        LOG.exception(ex)


def import_migration_plan(cntx, migration_plan_ids, source_backup_endpoints, target_btt, new_version, upgrade=True):
    '''
    Read all json files for all migration_plans from all available NFS mounts
    and perform bulk insert in the database.
    '''
    try:
        # Create temporary folder to store JSON files.
        db_dir = tempfile.mkdtemp()

        del migration_plans[:]

        # Fetch list off all existing projects
        fetch_project_list(cntx)

        DBSession.autocommit = False
        failed_migration_plans = get_migration_plan_json_files(cntx, migration_plan_ids, source_backup_endpoints, target_btt, db_dir, upgrade)
        for resource_map in import_migration_plan_map:
            import_migration_plan_resources(cntx, resource_map, new_version, db_dir, upgrade)
        DBSession.autocommit = True
        return {'migration_plans': {'imported_migration_plans': migration_plans,
                                    'failed_migration_plans': failed_migration_plans}}
    except Exception as ex:
        LOG.exception(ex)
        raise ex
    finally:
        # Remove temporary folder
        if os.path.exists(db_dir):
            shutil.rmtree(db_dir, ignore_errors=True)