Repository URL to install this package:
Version:
5.3.9 ▾
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 TrilioData, Inc.
# All Rights Reserved.
"""
Job scheduler manages WorkloadMgr
**Related Flags**
:workloads_topic: What :mod:`rpc` topic to listen to (default:`workloadmgr-workloads`).
:workloads_manager: The module name of a class derived from
:class:`manager.Manager` (default:
:class:`workloadmgr.workload.manager.Manager`).
"""
from concurrent.futures import ThreadPoolExecutor
from sqlalchemy import *
from datetime import datetime, timedelta
import time
import uuid
import ast
import pickle as pickle
import json
from threading import Lock
import sys
import subprocess
import importlib
import shutil
from workloadmgr.db.sqlalchemy import models
from subprocess import check_output
import smtplib
import socket
import os
# Import the email modules
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from oslo_config import cfg
from taskflow.patterns import linear_flow as lf
from taskflow import engines
from workloadmgr.common import context as wlm_context
from workloadmgr import flags
from workloadmgr import manager
from workloadmgr import mountutils
from workloadmgr.virt import driver
from workloadmgr.virt import virtapi
from workloadmgr.openstack.common import excutils
from workloadmgr.openstack.common import importutils
from workloadmgr.openstack.common import log as logging
from workloadmgr.openstack.common import jsonutils
from workloadmgr.compute import nova
from workloadmgr.network import neutron
from workloadmgr.volume import cinder
from workloadmgr.keymanager import barbican
from workloadmgr.vault import vault
from workloadmgr import utils
from workloadmgr.utils import get_vcenter_service_instance
from workloadmgr.common.workloadmgr_keystoneclient import KeystoneClient
import workloadmgr.workflows
from workloadmgr.workflows import vmtasks_openstack
from workloadmgr.workflows import vmtasks_vcloud
from workloadmgr.workflows import vmtasks
from workloadmgr.workflows import migration_plan_workflow
import workloadmgr.workflows.migrationworkflow as migrationflow
from workloadmgr.db.workloadmgrdb import WorkloadMgrDB
from workloadmgr import exception as wlm_exceptions
from workloadmgr.openstack.common import timeutils
from taskflow.exceptions import WrappedFailure
from workloadmgr.workloads import workload_utils
from workloadmgr.openstack.common import fileutils
from workloadmgr import autolog
from workloadmgr import settings
from keystoneauth1.exceptions.http import Unauthorized as KsUnauthorized
LOG = logging.getLogger(__name__)
Logger = autolog.Logger(LOG)
workloads_manager_opts = [
cfg.StrOpt('mountdir',
default='/var/triliovault/tvault-mounts',
help='Root directory where all snapshots are mounted.'),
cfg.BoolOpt('pause_vm_before_snapshot',
default=False,
help='pause VM before snapshot operation'
' libvirt calls'),
]
filesearch_opt_group = cfg.OptGroup(name='filesearch')
filesearch_opts = [
cfg.IntOpt('process_timeout',default=300),
]
scheduler_config = {'standalone': 'True'}
FLAGS = flags.FLAGS
FLAGS.register_opts(workloads_manager_opts)
FLAGS.register_group(filesearch_opt_group)
FLAGS.register_opts(filesearch_opts, filesearch_opt_group)
CONF = cfg.CONF
def workflow_lookup_class(class_name):
parts = class_name.split('.')
module = ".".join(parts[:-1])
workflow_class = __import__(module)
for comp in parts[1:]:
workflow_class = getattr(workflow_class, comp)
return workflow_class
@autolog.log_method(logger=Logger)
def get_workflow_class(context, workload_type_id, restore=False):
# TODO(giri): implement a driver model for the workload types
if workload_type_id:
workload_type = WorkloadMgrDB().db.workload_type_get(context, workload_type_id)
if(workload_type.display_name == 'Serial'):
if restore:
workflow_class_name = 'workloadmgr.workflows.restoreworkflow.RestoreWorkflow'
else:
workflow_class_name = 'workloadmgr.workflows.serialworkflow.SerialWorkflow'
elif(workload_type.display_name == 'Parallel'):
if restore:
workflow_class_name = 'workloadmgr.workflows.restoreworkflow.RestoreWorkflow'
else:
""" This is for backward compatibility as type has been removed."""
workflow_class_name = 'workloadmgr.workflows.serialworkflow.SerialWorkflow'
elif(workload_type.display_name == 'MongoDB'):
if restore:
workflow_class_name = 'workloadmgr.workflows.restoreworkflow.RestoreWorkflow'
else:
workflow_class_name = 'workloadmgr.workflows.mongodbflow.MongoDBWorkflow'
elif(workload_type.display_name == 'Hadoop'):
if restore:
workflow_class_name = 'workloadmgr.workflows.restoreworkflow.RestoreWorkflow'
else:
workflow_class_name = 'workloadmgr.workflows.hadoopworkflow.HadoopWorkflow'
elif(workload_type.display_name == 'Cassandra'):
if restore:
workflow_class_name = 'workloadmgr.workflows.cassandraworkflow.CassandraRestore'
else:
workflow_class_name = 'workloadmgr.workflows.cassandraworkflow.CassandraWorkflow'
elif(workload_type.display_name == 'Composite'):
if restore:
workflow_class_name = 'workloadmgr.workflows.restoreworkflow.RestoreWorkflow'
else:
workflow_class_name = 'workloadmgr.workflows.compositeworkflow.CompositeWorkflow'
else:
kwargs = {'workload_type_id': workload_type_id}
raise wlm_exceptions.WorkloadTypeNotFound(**kwargs)
return workflow_lookup_class(workflow_class_name)
workloadlock = Lock()
def synchronized(lock):
'''Synchronization decorator.'''
def wrap(f):
def new_function(*args, **kw):
lock.acquire()
try:
return f(*args, **kw)
finally:
lock.release()
return new_function
return wrap
class objectview(object):
def __init__(self, d):
self.__dict__ = d
class WorkloadMgrManager(manager.SchedulerDependentManager):
"""Manages WorkloadMgr """
RPC_API_VERSION = '2.0'
@autolog.log_method(logger=Logger)
def __init__(self, service_name=None, *args, **kwargs):
self.az = FLAGS.storage_availability_zone
self.pool = ThreadPoolExecutor(max_workers=5)
super(
WorkloadMgrManager,
self).__init__(
service_name='workloadscheduler',
*args,
**kwargs)
@autolog.log_method(logger=Logger)
def init_host(self):
"""
Do any initialization that needs to be run if this is a standalone service.
"""
ctxt = wlm_context.get_admin_context()
LOG.info(_("Cleaning up incomplete operations"))
try:
self.db.snapshot_mark_incomplete_as_error(ctxt, self.host)
self.db.restore_mark_incomplete_as_error(ctxt, self.host)
self.db.workloads_mark_deleting_as_error(ctxt, self.host)
self.db.unlock_workloads_for_host(ctxt, self.host)
self.db.migration_mark_incomplete_as_error(ctxt, self.host)
self.db.migration_plans_mark_deleting_as_error(ctxt, self.host)
self.db.unlock_migration_plans_for_host(ctxt, self.host)
kwargs = {'host': self.host, 'status': 'completed'}
list_search = self.db.file_search_get_all(ctxt, **kwargs)
for search in list_search:
self.db.file_search_update(
ctxt, search.id, {
'status': 'error', 'error_msg': 'Search did not finish successfully'})
except Exception as ex:
LOG.debug(ex)
@manager.periodic_task
def file_search_delete(self, context):
try:
kwargs = {'host': self.host, 'time_in_minutes': 24 * 60}
list_search = self.db.file_search_get_all(context, **kwargs)
if len(list_search) > 0:
for search in list_search:
self.db.file_search_delete(context, search.id)
except Exception as ex:
LOG.exception(ex)
@autolog.log_method(logger=Logger)
def _get_snapshot_size_of_vm(self, context, snapshot_vm):
"""
calculate the restore data size
"""
instance_size = 0
snapshot_vm_resources = self.db.snapshot_vm_resources_get(
context, snapshot_vm.vm_id, snapshot_vm.snapshot_id)
for snapshot_vm_resource in snapshot_vm_resources:
if snapshot_vm_resource.resource_type != 'disk':
continue
vm_disk_resource_snap = self.db.vm_disk_resource_snap_get_top(
context, snapshot_vm_resource.id)
instance_size = instance_size + vm_disk_resource_snap.size
while vm_disk_resource_snap.vm_disk_resource_snap_backing_id is not None:
vm_disk_resource_snap_backing = self.db.vm_disk_resource_snap_get(
context, vm_disk_resource_snap.vm_disk_resource_snap_backing_id)
instance_size = instance_size + vm_disk_resource_snap_backing.size
vm_disk_resource_snap = vm_disk_resource_snap_backing
return instance_size
@autolog.log_method(logger=Logger)
def _get_metadata_value(self, vm_network_resource_snap, key):
for metadata in vm_network_resource_snap.metadata:
if metadata['key'] == key:
return metadata['value']
@autolog.log_method(logger=Logger)
def workload_type_discover_instances(self, context, workload_type_id,
metadata, workload_id=None):
"""
Discover instances of a workload_type
"""
context_dict = dict([('%s' % key, value)
for (key, value) in context.to_dict().items()])
# RpcContext object looks for this during init
context_dict['conf'] = None
store = {
'context': context_dict, # context dictionary
'source_platform': 'openstack',
'workload_id': workload_id,
}
for key in metadata:
store[key] = str(metadata[key])
workflow_class = get_workflow_class(context, workload_type_id)
workflow = workflow_class("discover_instances", store)
instances = workflow.discover()
return instances
@autolog.log_method(logger=Logger)
def workload_type_topology(self, context, workload_type_id,
metadata, workload_id=None):
"""
Topology of a workload_type
"""
context_dict = dict([('%s' % key, value)
for (key, value) in context.to_dict().items()])
# RpcContext object looks for this during init
context_dict['conf'] = None
store = {
'context': context_dict, # context dictionary
'source_platform': 'openstack',
'workload_id': workload_id,
}
for key in metadata:
store[key] = str(metadata[key])
workflow_class = get_workflow_class(context, workload_type_id)
workflow = workflow_class("workload_topology", store)
topology = workflow.topology()
return topology
@autolog.log_method(logger=Logger)
def workload_discover_instances(self, context, workload_id):
"""
Discover instances of workload
"""
workload = self.db.workload_get(context, workload_id)
context_dict = dict([('%s' % key, value)
for (key, value) in context.to_dict().items()])
# RpcContext object looks for this during init
context_dict['conf'] = None
store = {
'context': context_dict, # context dictionary
'source_platform': 'openstack',
'workload_id': workload_id,
}
for meta in workload.metadata:
if meta.key == 'preferredgroup':
continue
store[meta.key] = meta.value
workflow_class = get_workflow_class(context, workload.workload_type_id)
workflow = workflow_class("discover_instances", store)
instances = workflow.discover()
compute_service = nova.API(production=True)
for vm in self.db.workload_vms_get(context, workload.id):
try:
self.db.workload_vms_delete(context, vm.vm_id, workload.id)
compute_service.delete_meta(context, vm.vm_id,
["workload_id", "workload_name"])
except nova.nova_exception.NotFound as ex:
LOG.info('Instance ID:{} does not exist. Error: {}'.format(vm.vm_id, ex))
if instances and 'instances' in instances:
for instance in instances['instances']:
values = {'workload_id': workload.id,
'vm_id': instance['vm_id'],
'metadata': instance['vm_metadata'],
'vm_name': instance['vm_name']}
vm = self.db.workload_vms_create(context, values)
compute_service.set_meta_item(context, vm.vm_id,
"workload_id", workload.id)
compute_service.set_meta_item(
context, vm.vm_id, "workload_name", workload.display_name)
if instances and 'topology' in instances:
workload_metadata = {'topology': json.dumps(instances['topology'])}
self.db.workload_update(context,
workload_id,
{'metadata': workload_metadata})
return instances
@autolog.log_method(logger=Logger)
def workload_get_topology(self, context, workload_id):
"""
Return workload topology
"""
try:
context_dict = dict([('%s' % key, value)
for (key, value) in context.to_dict().items()])
# RpcContext object looks for this during init
context_dict['conf'] = None
workload = self.db.workload_get(context, workload_id)
store = {
'context': context_dict, # context dictionary
'workload_id': workload_id, # workload_id
'source_platform': workload.source_platform,
}
for kvpair in workload.metadata:
store[kvpair['key']] = kvpair['value']
workflow_class = get_workflow_class(
context, workload.workload_type_id)
workflow = workflow_class("workload_topology", store)
topology = workflow.topology()
return topology
except Exception as err:
with excutils.save_and_reraise_exception():
msg = _("Error getting workload topology %(workload_id)s with failure: %(exception)s") % {
'workload_id': workload_id, 'exception': err, }
LOG.error(msg)
LOG.exception(err)
pass
@autolog.log_method(logger=Logger)
def workload_get_workflow_details(self, context, workload_id):
"""
Return workload workflow
"""
context_dict = dict([('%s' % key, value)
for (key, value) in context.to_dict().items()])
# RpcContext object looks for this during init
context_dict['conf'] = None
workload = self.db.workload_get(context, workload_id)
store = {
'context': context_dict, # context dictionary
'workload_id': workload_id, # workload_id
'source_platform': workload.source_platform
}
for kvpair in workload.metadata:
store[kvpair['key']] = kvpair['value']
workflow_class = get_workflow_class(context, workload.workload_type_id)
workflow = workflow_class("workload_workflow_details", store)
workflow.initflow()
details = workflow.details()
return details
@autolog.log_method(Logger, 'WorkloadMgrManager.workload_create')
def workload_create(self, context, workload_id):
"""
Create a scheduled workload in the workload scheduler
"""
try:
workload = self.db.workload_get(context, workload_id)
vms = self.db.workload_vms_get(context, workload_id)
compute_service = nova.API(production=True)
volume_service = cinder.API()
workload_backup_media_size = 0
for vm in vms:
compute_service.set_meta_item(context, vm.vm_id,
"workload_id", workload_id)
compute_service.set_meta_item(
context, vm.vm_id, "workload_name", workload['display_name'])
instance = compute_service.get_server_by_id(
context, vm.vm_id, admin=False)
flavor = compute_service.get_flavor_by_id(
context, instance.flavor['id'])
workload_backup_media_size += flavor.disk
for volume in getattr(
instance, 'os-extended-volumes:volumes_attached'):
vol_obj = volume_service.get(
context, volume['id'], no_translate=True)
workload_backup_media_size += vol_obj.size
# calculate approximate size of backup storage needed for this backup job
# TODO: Handle number of snapshots by days
jobschedule = pickle.loads(bytes(workload.jobschedule, 'utf-8'))
if jobschedule['retention_policy_type'] == 'Number of Snapshots to Keep':
incrs = int(jobschedule['retention_policy_value'])
else:
jobsperday = int(jobschedule['interval'].split("hr")[0])
incrs = int(jobschedule['retention_policy_value']) * jobsperday
if int(jobschedule['fullbackup_interval']) == -1:
fulls = 1
elif int(jobschedule['fullbackup_interval']) == 0:
fulls = incrs
incrs = 0
else:
fulls = incrs / int(jobschedule['fullbackup_interval'])
incrs = incrs - fulls
workload_approx_backup_size = \
(fulls * workload_backup_media_size * CONF.workload_full_backup_factor +
incrs * workload_backup_media_size * CONF.workload_incr_backup_factor) / 100
backup_endpoint = \
vault.get_nfs_share_for_workload_by_free_overcommit(
context,
workload)
workload_metadata = {
'workload_approx_backup_size': workload_approx_backup_size,
'backup_media_target': backup_endpoint}
# Create swift container for the workload
json_wl = jsonutils.dumps(workload)
json_wl_vms = jsonutils.dumps(vms)
self.db.workload_update(context,
workload_id,
{
'host': self.host,
'status': 'available',
'availability_zone': self.az,
'metadata': workload_metadata
})
workload_utils.upload_workload_db_entry(context, workload_id)
except Exception as err:
with excutils.save_and_reraise_exception():
self.db.workload_update(context, workload_id,
{'status': 'error',
'error_msg': str(err)})
@autolog.log_method(logger=Logger)
def file_search(self, context, search_id):
"""
File search
"""
try:
self.db.file_search_update(
context, search_id, {
'host': self.host, 'status': 'searching'})
search = self.db.file_search_get(context, search_id)
vm_found = self.db.workload_vm_get_by_id(
context, search.vm_id, read_deleted='yes', workloads_filter='deleted')
if len(vm_found) == 0:
# Check in snapshot vms
vm_found = self.db.snapshot_vm_get(context, search.vm_id, None)
if vm_found is None:
msg = _('vm_id not existing with this tenant')
raise wlm_exceptions.InvalidState(reason=msg)
snapshot = self.db.snapshot_get(context, vm_found.snapshot_id)
workload_id = snapshot.workload_id
else:
workload_id = vm_found[0].workload_id
workload_obj = self.db.workload_get(context, workload_id)
backup_endpoint = self.db.get_metadata_value(workload_obj.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
if search.snapshot_ids != '' and len(search.snapshot_ids) > 1:
filtered_snapshots = search.snapshot_ids.split(',')
search_list_snapshots = []
for filtered_snapshot in filtered_snapshots:
if filtered_snapshot in search_list_snapshots:
continue
filter_snapshot = self.db.snapshot_get(
context, filtered_snapshot)
if filter_snapshot.workload_id != workload_id:
msg = _('Invalid snapshot_ids provided')
raise wlm_exceptions.InvalidState(reason=msg)
search_list_snapshots.append(filtered_snapshot)
elif search.end != 0 or search.start != 0:
kwargs = {
'workload_id': workload_id,
'get_all': False,
'start': search.start,
'end': search.end,
'status': 'available'}
search_list_snapshots = self.db.snapshot_get_all(
context, **kwargs)
elif search.date_from != '':
kwargs = {
'workload_id': workload_id,
'get_all': False,
'date_from': search.date_from,
'date_to': search.date_to,
'status': 'available'}
search_list_snapshots = self.db.snapshot_get_all(
context, **kwargs)
else:
kwargs = {
'workload_id': workload_id,
'get_all': False,
'status': 'available'}
search_list_snapshots = self.db.snapshot_get_all(
context, **kwargs)
guestfs_input = []
if len(search_list_snapshots) == 0:
self.db.file_search_update(
context, search_id, {
'status': 'error', 'error_msg': 'There are not any valid snapshots available for search'})
return
errored_snapshots = []
for search_list_snapshot in search_list_snapshots:
search_list_snapshot_id = search_list_snapshot
if not isinstance(search_list_snapshot, str):
search_list_snapshot_id = search_list_snapshot.id
snapshot_vm_resources = self.db.snapshot_vm_resources_get(
context, search.vm_id, search_list_snapshot_id)
if len(snapshot_vm_resources) == 0:
continue
guestfs_input_str = []
proceed_path = True
for snapshot_vm_resource in snapshot_vm_resources:
if snapshot_vm_resource.resource_type != 'disk':
continue
vm_disk_resource_snap = self.db.vm_disk_resource_snap_get_top(
context, snapshot_vm_resource.id)
resource_snap_path = os.path.join(
backup_target.mount_path,
vm_disk_resource_snap.vault_url.strip(
os.sep))
if os.path.exists(resource_snap_path):
guestfs_input_str.append(resource_snap_path)
else:
proceed_path = False
errored_snapshots.append(search_list_snapshot_id)
if proceed_path:
guestfs_input.extend(guestfs_input_str)
err_out = {}
if len(errored_snapshots):
LOG.exception(
"Following snapshots have error for file search:{}".format(
errored_snapshots
)
)
for snapshot_id in errored_snapshots:
err_out.update({
snapshot_id: [{
"error: The path is not present on disk": []
}]
})
try:
secret_uuid = ''
if workload_obj.encryption:
barbican_service = barbican.API()
secret_uuid = barbican_service.get_payload_from_secret_href(context, workload_obj.secret_uuid)
if not secret_uuid:
secret_uuid = ''
out = subprocess.check_output([sys.executable, os.path.dirname(
__file__) + os.path.sep + "filesearch.py",
"--log_dir", CONF.log_dir,
"--rootwrap_config", CONF.rootwrap_config,
"--process_timeout", str(CONF.filesearch.process_timeout),
"--secret", secret_uuid, "--sec_id=sec0",
"--raw", "--pattern",
search.filepath] + guestfs_input)
out = ast.literal_eval(out.decode('utf-8'))
out = jsonutils.values_to_str(out)
if err_out:
out.append(err_out)
out = str(out)
except Exception as err:
try:
LOG.info(err)
command = ['sudo', 'service', 'libvirt-bin', 'restart']
subprocess.call(command, shell=False)
out = subprocess.check_output([sys.executable,
os.path.dirname( __file__) + os.path.sep + \
"filesearch.py", "--log_dir", CONF.log_dir,
"--rootwrap_config", CONF.rootwrap_config,
"--process_timeout",
str(CONF.filesearch.process_timeout), "--secret",
secret_uuid, "--sec_id=sec0", "--raw", "--pattern",
search.filepath] + guestfs_input)
out = ast.literal_eval(out.decode('utf-8'))
out = jsonutils.values_to_str(out)
if err_out:
out.append(err_out)
out = str(out)
except Exception as err:
msg = _('Error in searching files, Contact your administrator')
raise wlm_exceptions.InvalidState(reason=msg)
self.db.file_search_update(
context, search_id, {
'status': 'completed', 'json_resp': out})
except Exception as err:
self.db.file_search_update(
context, search_id, {
'status': 'error', 'error_msg': str(err)})
LOG.debug(err)
#@synchronized(workloadlock)
@autolog.log_method(logger=Logger)
def workload_snapshot(self, context, snapshot_id):
"""
Take a snapshot of the workload
"""
try:
try:
import gc
gc.collect()
except Exception as ex:
LOG.exception(ex)
context = nova._get_tenant_context(context)
snapshot = self.db.snapshot_update(
context,
snapshot_id,
{
'host': self.host,
'progress_percent': 0,
'progress_msg': 'Snapshot of workload is starting',
'status': 'starting'})
workload = self.db.workload_get(context, snapshot.workload_id)
backup_endpoint = self.db.get_metadata_value(workload.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
try:
backup_target.purge_snapshot_from_staging_area(
context, {'workload_id': workload.id, 'snapshot_id': snapshot.id})
except Exception as ex:
LOG.exception(ex)
# Upload snapshot metadata to the vault
#workload_utils.upload_snapshot_db_entry(context, snapshot_id)
pause_at_snapshot = CONF.pause_vm_before_snapshot
for metadata in workload.metadata:
for key in metadata:
if key == 'pause_at_snapshot':
pause_at_snapshot = bool(int(metadata[key]))
context_dict = dict([('%s' % key, value)
for (key, value) in context.to_dict().items()])
# RpcContext object looks for this during init
context_dict['conf'] = None
snapshot_dict = dict(snapshot.items())
snapshot_dict.pop('created_at')
snapshot_dict.pop('updated_at')
store = {
'connection': 'dir',
"path": CONF.taskflow_path, # save data to this directory
"max_cache_size": CONF.taskflow_max_cache_size, # keep up-to this much entries in memory
'context': context_dict, # context dictionary
'snapshot': snapshot_dict,
'workload_id': snapshot.workload_id, # workload_id
'source_platform': workload.source_platform,
'pause_at_snapshot': pause_at_snapshot,
}
snapshot_metadata = {}
for kvpair in workload.metadata:
store[kvpair['key']] = str(kvpair['value'])
snapshot_metadata[kvpair['key']] = str(kvpair['value'])
store['topology'] = json.dumps("")
if CONF.serial_vm_backup:
workflow_class = workflow_lookup_class('workloadmgr.workflows.vmserialworkflow.VmSerialWorkflow')
else:
workflow_class = get_workflow_class(
context, workload.workload_type_id)
workflow = workflow_class(workload.display_name, store)
self.db.snapshot_update(context,
snapshot_id,
{'progress_percent': 0,
'progress_msg': 'Initializing Snapshot Workflow',
'status': 'executing'})
workflow.initflow()
workflow.execute()
self.db.snapshot_type_time_size_update(context, snapshot_id)
# Update vms of the workload
hostnames = []
if 'instances' in workflow._store and workflow._store['instances']:
compute_service = nova.API(production=True)
for vm in self.db.workload_vms_get(context, workload.id):
self.db.workload_vms_delete(context, vm.vm_id, workload.id)
try:
compute_service.delete_meta(
context, vm.vm_id, [
"workload_id", 'workload_name'])
except nova.nova_exception.NotFound as ex:
LOG.info('Instance ID:{} does not exist. Error: {}'.format(vm.vm_id, ex))
except Exception as ex:
LOG.exception(ex)
try:
context = nova._get_tenant_context(context)
compute_service.delete_meta(
context, vm.vm_id, [
"workload_id", 'workload_name'])
except nova.nova_exception.NotFound as ex:
LOG.info('Instance ID:{} does not exist. Error: {}'.format(vm.vm_id, ex))
except Exception as ex:
LOG.exception(ex)
raise wlm_exceptions.ErrorOccurred(
reason=str(ex) %
(ex.kwargs if hasattr(
ex, 'kwargs') else {}))
for instance in workflow._store['instances']:
values = {'workload_id': workload.id,
'status': 'available',
'vm_id': instance['vm_id'],
'metadata': instance['vm_metadata'],
'vm_name': instance['vm_name']}
vm = self.db.workload_vms_create(context, values)
compute_service.set_meta_item(context, vm.vm_id,
"workload_id", workload.id)
compute_service.set_meta_item(
context, vm.vm_id, "workload_name", workload.display_name)
for inst in workflow._store['instances']:
hostnames.append(inst['hostname'])
if 'root_partition_type' not in inst:
inst['root_partition_type'] = "Linux"
self.db.snapshot_vm_update(
context,
inst['vm_id'],
snapshot.id,
{
'metadata': {
'root_partition_type': inst['root_partition_type'],
'availability_zone': inst['availability_zone'],
'vm_metadata': json.dumps(
inst['vm_metadata'])}})
workload_metadata = {
'hostnames': json.dumps(hostnames),
'topology': json.dumps(
workflow._store['topology'])}
self.db.workload_update(context,
snapshot.workload_id,
{'metadata': workload_metadata})
snapshot_metadata['topology'] = json.dumps(
workflow._store['topology'])
self.db.snapshot_update(context,
snapshot_id,
{'metadata': snapshot_metadata})
# Upload snapshot metadata to the vault
workload_utils.upload_snapshot_db_entry(
context, snapshot_id, snapshot_status='available')
# upload the data to object store... this function will check if
# the object store is configured
backup_target.upload_snapshot_metatdata_to_object_store(
context, {
'workload_id': workload.id, 'workload_name': workload.display_name, 'snapshot_id': snapshot.id})
self.db.snapshot_update(context,
snapshot_id,
{'progress_percent': 100,
'progress_msg': 'Snapshot of workload is complete',
'finished_at': timeutils.utcnow(),
'status': 'available',
'metadata': snapshot_metadata})
except WrappedFailure as ex:
LOG.exception(ex)
flag = self.db.snapshot_get_metadata_cancel_flag(
context, snapshot_id, 1)
if flag == '1':
msg = _("%(exception)s") % {'exception': ex}
status = 'cancelled'
for vm in self.db.workload_vms_get(context, workload.id):
self.db.snapshot_vm_update(
context, vm.vm_id, snapshot_id, {
'status': status, })
else:
msg = _("Failed creating workload snapshot with following error(s):")
if hasattr(ex, '_causes'):
for cause in ex._causes:
if cause._exception_str not in msg:
msg = msg + ' ' + cause._exception_str
LOG.error(msg)
status = 'error'
self.db.snapshot_update(context,
snapshot_id,
{'progress_percent': 100,
'progress_msg': '',
'error_msg': msg,
'finished_at': timeutils.utcnow(),
'status': status
})
try:
self.db.snapshot_type_time_size_update(context, snapshot_id)
except Exception as ex:
LOG.exception(ex)
except Exception as ex:
LOG.exception(ex)
flag = self.db.snapshot_get_metadata_cancel_flag(
context, snapshot_id, 1)
if flag == '1':
msg = _("%(exception)s") % {'exception': ex}
status = 'cancelled'
for vm in self.db.workload_vms_get(context, workload.id):
self.db.snapshot_vm_update(
context, vm.vm_id, snapshot_id, {
'status': status, })
else:
if hasattr(ex, 'code') and ex.code == 401:
if hasattr(
context,
'tenant') and context.tenant != '' and context.tenant is not None:
tenant = context.tenant
else:
tenant = context.project_id
msg = _(
"Failed creating workload snapshot: Make sure trustee role " +
CONF.trustee_role +
" assigned to tenant " +
tenant)
else:
msg = _("Failed creating workload snapshot: %(exception)s") % {
'exception': ex}
LOG.error(msg)
status = 'error'
self.db.snapshot_update(context,
snapshot_id,
{'progress_percent': 100,
'progress_msg': '',
'error_msg': msg,
'finished_at': timeutils.utcnow(),
'status': status
})
try:
self.db.snapshot_type_time_size_update(context, snapshot_id)
except Exception as ex:
LOG.exception(ex)
finally:
try:
backup_target.purge_snapshot_from_staging_area(
context, {'workload_id': workload.id, 'snapshot_id': snapshot.id})
except Exception as ex:
LOG.exception(ex)
try:
snapshot = self.db.snapshot_get(context, snapshot_id)
self.db.workload_update(
context, snapshot.workload_id, {
'status': 'available'})
except Exception as ex:
LOG.exception(ex)
try:
import gc
gc.collect()
except Exception as ex:
LOG.exception(ex)
try:
snapshot = self.db.snapshot_get(context, snapshot_id)
if settings.get_settings(context).get('smtp_email_enable') == 'yes' or \
settings.get_settings(context).get('smtp_email_enable') == '1':
self.send_email(context, snapshot, 'snapshot')
except Exception as ex:
LOG.exception(ex)
@autolog.log_method(logger=Logger)
def workload_reset(self, context, workload_id, status_update=True):
"""
Reset an existing workload
"""
try:
workload = self.db.workload_get(context, workload_id)
vms = self.db.workload_vms_get(context, workload.id)
# get the recent snapshot
if workload.source_platform == 'openstack':
virtdriver = driver.load_compute_driver(
None, 'libvirt.LibvirtDriver')
for vm in vms:
virtdriver.reset_vm(context, workload_id, vm.vm_id)
except Exception as ex:
LOG.exception(ex)
msg = _("Failed to reset: %(exception)s") % {'exception': ex}
LOG.error(msg)
finally:
status_update is True and self.db.workload_update(
context, workload_id, {'status': 'available'})
return
def workload_import(self, context, workload_id, jobid, upgrade):
try:
wl_status = None
try:
wl_status = self.db.workload_get(context, workload_id)
except Exception as ex:
pass
if wl_status:
raise wlm_exceptions.WorkloadAlreadyExist()
self.db.workload_import_update(context, jobid, workload_id, {'status': 'importing', 'progress': 0, 'updated_at': None})
module_name = 'workloadmgr.db.imports.import_workloads'
import_workload_module = importlib.import_module(module_name)
import_workload_module.import_settings(context, models.DB_VERSION)
import_workload_module.import_allowed_quotas(context)
# Import Workload policies
import_workload_module.import_policy(context, models.DB_VERSION)
workloads = import_workload_module.import_workload(context, jobid, workload_id, models.DB_VERSION, upgrade)
self.db.workload_import_update(context, jobid, workload_id, {'status': 'completed'})
except wlm_exceptions.WorkloadsNotFound as ex:
LOG.exception("Exception while importing Workload: {0}".format(workload_id))
self.db.workload_import_update(context, jobid, workload_id, {'status': 'error', 'message': 'Workload {0} Not Found'.format(workload_id)})
LOG.exception(ex)
except wlm_exceptions.WorkloadAlreadyExist as ex:
self.db.workload_import_update(context, jobid, workload_id, {'status': 'skipping', 'message': 'Workload {0} Already Present.'.format(workload_id)})
except Exception as ex:
self.db.workload_import_update(context, jobid, workload_id, {'status': 'error', 'message': ex})
LOG.exception("Exception while importing Job-id: {0}, Workload: {1}".format(jobid, workload_id))
LOG.exception(ex)
raise ex
finally:
wls = self.db.workload_import_get(context, jobid)
status = []
for each_wl in wls:
status.append(each_wl.get('status'))
if "in-progress" in status or "created" in status or "importing" in status:
self.db.import_job_update(context, jobid, {'status': 'In-Progress'})
else:
self.db.import_job_update(context, jobid, {'status': 'Completed'})
LOG.info('Import Workloads Completed')
@autolog.log_method(logger=Logger)
def workload_delete(self, context, workload_id):
"""
Delete an existing workload
"""
workload = self.db.workload_get(context, workload_id)
snapshots = self.db.snapshot_get_all_by_project_workload(
context, context.project_id, workload.id)
if len(snapshots) > 0:
msg = _(
'This workload contains snapshots. Please delete all snapshots and try again..')
raise wlm_exceptions.InvalidState(reason=msg)
LOG.info(_('Deleting the data of workload %s %s %s') %
(workload.display_name, workload.id,
workload.created_at.strftime("%d-%m-%Y %H:%M:%S")))
backup_endpoint = self.db.get_metadata_value(workload.metadata,
'backup_media_target')
if backup_endpoint is not None:
backup_target = vault.get_backup_target(backup_endpoint)
if backup_target is not None:
backup_target.workload_delete(
context, {
'workload_id': workload.id, 'workload_name': workload.display_name, })
self.workload_reset(context, workload_id)
compute_service = nova.API(production=True)
workload_vms = self.db.workload_vms_get(context, workload.id)
for vm in workload_vms:
try:
compute_service.delete_meta(context, vm.vm_id,
["workload_id", 'workload_name'])
self.db.workload_vms_delete(context, vm.vm_id, workload.id)
except nova.nova_exception.NotFound as ex:
LOG.info('Instance ID:{} does not exist. Error: {}'.format(vm.vm_id, ex))
self.db.workload_vms_delete(context, vm.vm_id, workload.id)
except Exception as ex:
LOG.exception(ex)
err_msg = 'Delete operation failed, ' + str(ex)
if type(ex) == KsUnauthorized:
err_msg = err_msg + '\nTry creating wlm trust for user %s ' \
'and retry this operation' %(context.user_id)
self.db.workload_update(
context, workload_id, {
'error_msg': err_msg,
'status': 'error'})
raise wlm_exceptions.ErrorOccurred(
reason=str(ex) %
(ex.kwargs if hasattr(
ex, 'kwargs') else {}))
self.db.workload_delete(context, workload.id)
# Update the secret metadata once sure the workload is deleted from DB.
# Defensive approach with some drawbacks.
# NOTE: If Workload is deleted but could not clear the secret metadata.
# TODO: Need to think about above condition.
if workload.encryption:
barbican_service = barbican.API()
meta = json.loads(barbican_service.get_secret_metadata(
context,
workload.secret_uuid))
metadata = meta.get('metadata')
try:
if metadata.get('workload_id') == workload.id:
body = { "metadata": {} }
barbican_service.update_secret_metadata(context,
workload.secret_uuid, body)
except Exception as ex:
LOG.error('Could not clear metadata of secret uuid: {0}'.format(workload.secret_uuid))
raise ex
else:
LOG.info('Succesfully Reset Metadata of secret uuid: {0}'.format(workload.secret_uuid))
@autolog.log_method(logger=Logger)
def _validate_restore_options(self, context, restore, options):
snapshot_id = restore.snapshot_id
snapshotvms = self.db.snapshot_vms_get(context, restore.snapshot_id)
if options.get('type', "") != "openstack":
msg = _("'type' field in options is not set to 'openstack'")
raise wlm_exceptions.InvalidRestoreOptions(reason=msg)
if 'openstack' not in options:
msg = _("'openstack' field is not in options")
raise wlm_exceptions.InvalidRestoreOptions(reason=msg)
# If instances is not available should we restore entire snapshot?
if 'instances' not in options['openstack']:
msg = _("'instances' field is not in found "
"in options['instances']")
raise wlm_exceptions.InvalidRestoreOptions(reason=msg)
if options.get("restore_type", None) in ('selective'):
return
compute_service = nova.API(production=True)
volume_service = cinder.API()
flavors = compute_service.get_flavors(context)
for inst in options['openstack']['instances']:
if inst['include'] is False:
continue
vm_id = inst.get('id', None)
if not vm_id:
msg = _("'instances' contain an element that does "
"not include 'id' field")
raise wlm_exceptions.InvalidRestoreOptions(reason=msg)
try:
nova_inst = compute_service.get_server_by_id(
context, vm_id, admin=False)
if not nova_inst:
msg = _("instance '%s' in nova is not found" % vm_id)
raise wlm_exceptions.InvalidRestoreOptions(reason=msg)
except Exception as ex:
LOG.exception(ex)
msg = _("instance '%s' in nova is not found" % vm_id)
raise wlm_exceptions.InvalidRestoreOptions(reason=msg)
# get attached references
attached_devices = getattr(
nova_inst, 'os-extended-volumes:volumes_attached')
attached_devices = set([v['id'] for v in attached_devices])
snapshot_vm_resources = self.db.snapshot_vm_resources_get(
context, vm_id, snapshot_id)
vol_snaps = {}
image_id = None
for res_snap in snapshot_vm_resources:
if res_snap.resource_type != 'disk':
continue
vol_id = self._get_metadata_value(res_snap, 'volume_id')
if not image_id:
image_id = self._get_metadata_value(res_snap, 'image_id')
vol_size = self._get_metadata_value(
res_snap, 'volume_size') or "-1"
vol_size = int(vol_size)
if vol_id:
vol_snaps[vol_id] = {'size': vol_size}
if image_id and image_id != nova_inst.image['id']:
msg = _("instance '%s' image id is different than the "
"backup image id %s" % (vm_id, nova_inst.image['id']))
raise wlm_exceptions.InvalidRestoreOptions(reason=msg)
for vdisk in inst.get('vdisks', []):
# make sure that vdisk exists in cinder and
# is attached to the instance
if vdisk.get('id', None) not in attached_devices:
msg = _("'vdisks' contain an element that does "
"not include 'id' field")
raise wlm_exceptions.InvalidRestoreOptions(reason=msg)
try:
vol_obj = volume_service.get(
context, vdisk.get(
'id', None), no_translate=True)
if not vol_obj:
raise wlm_exceptions.InvalidRestoreOptions(
reason="Given disk: %s not found for restore" %(str(vdisk.get('id', None))))
except Exception as ex:
LOG.exception(ex)
msg = _("'%s' is not a valid cinder volume" %
vdisk.get('id'))
raise wlm_exceptions.InvalidRestoreOptions(reason=msg)
if vol_obj.size != vol_snaps[vol_obj.id]['size']:
msg = _("'%s' current volume size %d does not match with "
"backup volume size %d" %
(vdisk.get('id'), vol_obj.size,
vol_snaps[vol_obj.id]['size']))
raise wlm_exceptions.InvalidRestoreOptions(reason=msg)
return
@autolog.log_method(logger=Logger)
def _oneclick_restore_options(self, context, restore, options):
snapshot_id = restore.snapshot_id
snapshotvms = self.db.snapshot_vms_get(context, restore.snapshot_id)
if options['type'] == "openstack":
options['openstack']['instances'] = []
for inst in snapshotvms:
optionsinst = {
'name': inst.vm_name,
'id': inst.vm_id,
'availability_zone': self.db.get_metadata_value(
inst.metadata,
'availability_zone'),
}
options['openstack']['instances'].append(optionsinst)
return options
options['vmware']['instances'] = []
for inst in snapshotvms:
optionsinst = {
'name': inst.vm_name, 'id': inst.vm_id,
'power': {'state': 'on', 'sequence': 1},
}
snapshot_vm_resources = self.db.snapshot_vm_resources_get(
context, inst.vm_id, snapshot_id)
for snapshot_vm_resource in snapshot_vm_resources:
""" flavor """
if snapshot_vm_resource.resource_type == 'flavor':
vm_flavor = snapshot_vm_resource
optionsinst['flavor'] = {
'vcpus': self.db.get_metadata_value(
vm_flavor.metadata, 'vcpus'), 'ram': self.db.get_metadata_value(
vm_flavor.metadata, 'ram'), 'disk': self.db.get_metadata_value(
vm_flavor.metadata, 'disk'), 'ephemeral': self.db.get_metadata_value(
vm_flavor.metadata, 'ephemeral')}
instmeta = inst.metadata
for meta in inst.metadata:
if meta.key not in ['cluster', 'parent', 'networks',
'resourcepool', 'vdisks', 'datastores',
'vmxpath']:
continue
metavalue = json.loads(meta.value)
if meta.key == 'cluster' and metavalue:
optionsinst['computeresource'] = {
'moid': metavalue[0]['value'], 'name': metavalue[0]['name']}
elif meta.key == 'parent' and metavalue:
optionsinst['vmfolder'] = {
'moid': metavalue['value'], 'name': metavalue['name']}
elif meta.key == 'networks':
optionsinst['networks'] = []
for net in metavalue:
optionsinst['networks'].append(
{
'mac_address': net['macAddress'],
'network_moid': net['value'],
'network_name': net['name'],
'new_network_moid': net['value'],
'new_network_name': net['name']})
elif meta.key == 'resourcepool':
optionsinst['resourcepool'] = {
'moid': metavalue['value'], 'name': metavalue['name']}
elif meta.key == 'vdisks':
optionsinst['vdisks'] = metavalue
elif meta.key == 'vmxpath':
optionsinst['vmxpath'] = metavalue
elif meta.key == 'datastores':
optionsinst['datastores'] = []
for ds in metavalue:
optionsinst['datastores'].append({'moid': ds['value'],
'name': ds['name']})
options['vmware']['instances'].append(optionsinst)
return options
@autolog.log_method(logger=Logger)
def network_topology_restore(self, context, restore_id):
try:
import gc
gc.collect()
except Exception as ex:
LOG.exception(ex)
restore = None
snapshot = None
workload = None
try:
restore = self.db.restore_get(context, restore_id)
snapshot = self.db.snapshot_get(context, restore.snapshot_id)
workload = self.db.workload_get(context, snapshot.workload_id)
except Exception as ex:
LOG.exception(ex)
raise ex
try:
context = nova._get_tenant_context(context)
restore = self.db.restore_update(
context,
restore_id,
{
'host': self.host,
'target_platform': 'openstack',
'progress_percent': 0,
'progress_msg': 'Restore from snapshot is executing',
'status': 'executing'})
workflow_class_name = 'workloadmgr.workflows.restoreworkflow.NetworkTopologyRestoreWorkflow'
context_dict = dict([('%s' % key, value)
for (key, value) in context.to_dict().items()])
# RpcContext object looks for this during init
context_dict['conf'] = None
store = {
'connection': 'dir',
"path": CONF.taskflow_path, # save data to this directory
"max_cache_size": CONF.taskflow_max_cache_size, # keep up-to this much entries in memory
'context': context_dict, # context dictionary
'restore': jsonutils.to_primitive(restore), # restore dictionary
'target_platform': 'openstack',
}
workflow_class = workflow_lookup_class(workflow_class_name)
workflow = workflow_class(restore.display_name, store)
workflow.initflow()
workflow.execute()
self.db.restore_update(
context,
restore_id,
{
'progress_percent': 100,
'progress_msg': 'Network topology restore is complete',
'finished_at': timeutils.utcnow(),
'time_taken': int(
(timeutils.utcnow() - restore.created_at).total_seconds()),
'status': 'available'})
except WrappedFailure as ex:
flag = self.db.restore_get_metadata_cancel_flag(
context, restore_id, 1)
if flag == '1':
msg = _("%(exception)s") % {'exception': ex}
status = 'cancelled'
else:
status = 'error'
LOG.exception(ex)
msg = _("Network topology restore failed with following error(s):")
if hasattr(ex, '_causes'):
for cause in ex._causes:
if cause._exception_str not in msg:
msg = msg + ' ' + cause._exception_str
LOG.error(msg)
time_taken = 0
if 'restore' in locals() or 'restore' in globals():
if restore:
time_taken = int(
(timeutils.utcnow() - restore.created_at).total_seconds())
self.db.restore_update(
context,
restore_id,
{
'progress_percent': 100,
'progress_msg': '',
'error_msg': msg,
'finished_at': timeutils.utcnow(),
'time_taken': time_taken,
'status': status})
except Exception as ex:
flag = self.db.restore_get_metadata_cancel_flag(
context, restore_id, 1)
if flag == '1':
msg = _("%(exception)s") % {'exception': ex}
status = 'cancelled'
else:
status = 'error'
LOG.exception(ex)
msg = _("Network topology restore failed with: %(exception)s") % {
'exception': ex}
LOG.error(msg)
time_taken = 0
if 'restore' in locals() or 'restore' in globals():
if restore:
time_taken = int(
(timeutils.utcnow() - restore.created_at).total_seconds())
self.db.restore_update(
context,
restore_id,
{
'progress_percent': 100,
'progress_msg': '',
'error_msg': msg,
'finished_at': timeutils.utcnow(),
'time_taken': time_taken,
'metadata': {
'data_transfer_time': 0,
'object_store_transfer_time': 0,
},
'status': status})
finally:
try:
import gc
gc.collect()
except Exception as ex:
LOG.exception(ex)
try:
self.db.snapshot_update(
context, restore.snapshot_id, {'status': 'available'}
)
self.db.workload_update(
context, workload.id, {'status': 'available'}
)
if settings.get_settings(context).get('smtp_email_enable') == 'yes' or settings.get_settings(
context).get('smtp_email_enable') == '1':
self.send_email(context, restore, 'restore')
except Exception as ex:
LOG.exception(ex)
@autolog.log_method(logger=Logger)
def security_groups_restore(self, context, restore_id):
try:
import gc
gc.collect()
except Exception as ex:
LOG.exception(ex)
restore = None
snapshot = None
workload = None
try:
restore = self.db.restore_get(context, restore_id)
snapshot = self.db.snapshot_get(context, restore.snapshot_id)
workload = self.db.workload_get(context, snapshot.workload_id)
except Exception as ex:
LOG.exception(ex)
raise ex
try:
context = nova._get_tenant_context(context)
restore = self.db.restore_update(
context,
restore_id,
{
'host': self.host,
'target_platform': 'openstack',
'progress_percent': 0,
'progress_msg': 'Security Groups Restore from snapshot is executing',
'status': 'executing'})
workflow_class_name = 'workloadmgr.workflows.restoreworkflow.SecurityGroupsRestoreWorkflow'
context_dict = dict([('%s' % key, value)
for (key, value) in context.to_dict().items()])
# RpcContext object looks for this during init
context_dict['conf'] = None
store = {
'connection': 'dir',
"path": CONF.taskflow_path, # save data to this directory
"max_cache_size": CONF.taskflow_max_cache_size, # keep up-to this much entries in memory
'context': context_dict, # context dictionary
'restore': jsonutils.to_primitive(restore), # restore dictionary
'target_platform': 'openstack',
}
workflow_class = workflow_lookup_class(workflow_class_name)
workflow = workflow_class(restore.display_name, store)
workflow.initflow()
workflow.execute()
self.db.restore_update(
context,
restore_id,
{
'progress_percent': 100,
'progress_msg': 'Security Groups restore is complete',
'finished_at': timeutils.utcnow(),
'time_taken': int(
(timeutils.utcnow() - restore.created_at).total_seconds()),
'status': 'available'})
except WrappedFailure as ex:
flag = self.db.restore_get_metadata_cancel_flag(
context, restore_id, 1)
if flag == '1':
msg = _("%(exception)s") % {'exception': ex}
status = 'cancelled'
else:
status = 'error'
LOG.exception(ex)
msg = _("Security Groups restore failed with following error(s):")
if hasattr(ex, '_causes'):
for cause in ex._causes:
if cause._exception_str not in msg:
msg = msg + ' ' + cause._exception_str
LOG.error(msg)
time_taken = 0
if 'restore' in locals() or 'restore' in globals():
if restore:
time_taken = int(
(timeutils.utcnow() - restore.created_at).total_seconds())
self.db.restore_update(
context,
restore_id,
{
'progress_percent': 100,
'progress_msg': '',
'error_msg': msg,
'finished_at': timeutils.utcnow(),
'time_taken': time_taken,
'status': status})
except Exception as ex:
flag = self.db.restore_get_metadata_cancel_flag(
context, restore_id, 1)
if flag == '1':
msg = _("%(exception)s") % {'exception': ex}
status = 'cancelled'
else:
status = 'error'
LOG.exception(ex)
msg = _("Security Groups restore failed with: %(exception)s") % {
'exception': ex}
LOG.error(msg)
time_taken = 0
if 'restore' in locals() or 'restore' in globals():
if restore:
time_taken = int(
(timeutils.utcnow() - restore.created_at).total_seconds())
self.db.restore_update(
context,
restore_id,
{
'progress_percent': 100,
'progress_msg': '',
'error_msg': msg,
'finished_at': timeutils.utcnow(),
'time_taken': time_taken,
'metadata': {
'data_transfer_time': 0,
'object_store_transfer_time': 0,
},
'status': status})
finally:
try:
import gc
gc.collect()
except Exception as ex:
LOG.exception(ex)
try:
self.db.snapshot_update(
context, restore.snapshot_id, {'status': 'available'}
)
self.db.workload_update(
context, workload.id, {'status': 'available'}
)
if settings.get_settings(context).get('smtp_email_enable') == 'yes' or settings.get_settings(
context).get('smtp_email_enable') == '1':
self.send_email(context, restore, 'restore')
except Exception as ex:
LOG.exception(ex)
#@synchronized(workloadlock)
@autolog.log_method(logger=Logger)
def snapshot_restore(self, context, restore_id):
"""
Restore VMs and all its LUNs from a snapshot
"""
restore_type = 'restore'
restore_user_selected_value = 'Selective Restore'
try:
try:
import gc
gc.collect()
except Exception as ex:
LOG.exception(ex)
restore = self.db.restore_get(context, restore_id)
snapshot = self.db.snapshot_get(context, restore.snapshot_id)
workload = self.db.workload_get(context, snapshot.workload_id)
backup_endpoint = self.db.get_metadata_value(workload.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
try:
backup_target.purge_snapshot_from_staging_area(
context, {'workload_id': workload.id, 'snapshot_id': snapshot.id})
except Exception as ex:
LOG.exception(ex)
context = nova._get_tenant_context(context)
target_platform = 'openstack'
if hasattr(restore, 'pickle'):
options = pickle.loads(bytes(restore['pickle'], 'utf-8'))
if options and 'type' in options:
target_platform = options['type']
if target_platform != 'openstack':
msg = _("'type' field in restore options must be 'openstack'")
raise wlm_exceptions.InvalidRestoreOptions(reason=msg)
if not options:
options = {}
if options.get('oneclickrestore', False):
rtype = 'oneclick'
else:
rtype = 'selective'
rtype = options.get('restore_type', rtype)
if rtype not in ('selective', 'oneclick', 'inplace'):
msg = _("'restore_type' field in restore options must be "
"'selective' or 'inplace' or 'oneclick'")
raise wlm_exceptions.InvalidRestoreOptions(reason=msg)
restore_type = restore.restore_type
if restore_type == 'test':
restore = self.db.restore_update(
context,
restore_id,
{
'host': self.host,
'target_platform': target_platform,
'progress_percent': 0,
'progress_msg': 'Create testbubble from snapshot is starting',
'status': 'starting'})
else:
restore = self.db.restore_update(
context,
restore_id,
{
'host': self.host,
'target_platform': target_platform,
'progress_percent': 0,
'progress_msg': 'Restore from snapshot is starting',
'status': 'starting'})
values = {'status': 'executing'}
workflow_class_name = 'workloadmgr.workflows.restoreworkflow.RestoreWorkflow'
if rtype == 'oneclick':
restore_user_selected_value = 'Oneclick Restore'
# Override traget platfrom for clinets not specified on
# oneclick
if workload.source_platform != target_platform:
target_platform = workload.source_platform
# Fill the restore options from the snapshot instances metadata
options = self._oneclick_restore_options(
context, restore, options)
values['pickle'] = str(pickle.dumps(options, 0), 'utf-8')
compute_service = nova.API(production=True)
for vm in self.db.snapshot_vms_get(
context, restore.snapshot_id):
instance_options = utils.get_instance_restore_options(
options, vm.vm_id, target_platform)
if instance_options and instance_options.get(
'include', True) == False:
continue
else:
instance = compute_service.get_server_by_id(
context, vm.vm_id, admin=False)
if instance:
msg = _(
'Original instance ' + vm.vm_name + ' is still present. '
'Please delete this instance and try again.')
raise wlm_exceptions.InvalidState(reason=msg)
elif rtype == 'inplace':
workflow_class_name = 'workloadmgr.workflows.inplacerestoreworkflow.InplaceRestoreWorkflow'
self._validate_restore_options(context, restore, options)
self.workload_reset(
context,
snapshot.workload_id,
status_update=False)
elif rtype == 'selective':
self._validate_restore_options(context, restore, options)
restore = self.db.restore_update(context, restore.id, values)
restore_size = vmtasks_openstack.get_restore_data_size(
context, self.db, dict(restore))
if restore_type == 'test':
self.db.restore_update(
context, restore_id, {
'size': restore_size})
else:
if target_platform == 'openstack':
restore_size = vmtasks_openstack.get_restore_data_size(
context, self.db, dict(restore))
restore = self.db.restore_update(
context, restore_id, {'size': (restore_size)})
else:
restore_size = vmtasks_vcloud.get_restore_data_size(
context, self.db, dict(iter(restore.items())))
restore = self.db.restore_update(
context, restore_id, {'size': (restore_size)})
context_dict = dict([('%s' % key, value)
for (key, value) in context.to_dict().items()])
# RpcContext object looks for this during init
context_dict['conf'] = None
store = {
'connection': 'dir',
"path": CONF.taskflow_path, # save data to this directory
"max_cache_size": CONF.taskflow_max_cache_size, # keep up-to this much entries in memory
'context': context_dict, # context dictionary
'restore': jsonutils.to_primitive(restore), # restore dictionary
'target_platform': target_platform,
}
workflow_class = workflow_lookup_class(workflow_class_name)
workflow = workflow_class(restore.display_name, store)
workflow.initflow()
workflow.execute()
compute_service = nova.API(production=True)
restore_data_transfer_time = 0
restore_object_store_transfer_time = 0
workload_vms = self.db.workload_vms_get(context, workload.id)
if target_platform == 'openstack':
workload_def_updated = False
for restored_vm in self.db.restored_vms_get(
context, restore_id):
instance = compute_service.get_server_by_id(
context, restored_vm.vm_id, admin=False)
if instance is None:
pass
else:
instance_id = self.db.get_metadata_value(
restored_vm.metadata, 'instance_id', None)
if rtype == 'selective':
#During selective restore update the workload definition only when
#workload member doesn't exist.
workload_vms = self.db.restored_instance_get(context, instance_id)
production = None
for workload_vm in workload_vms:
if workload_vm.workload_id == workload.id:
production = compute_service.get_server_by_id(
context, workload_vm.vm_id, admin=False)
if production is not None:
break
else:
production = compute_service.get_server_by_id(
context, instance_id, admin=False)
if production is None:
production = True
else:
production = False
if production:
workload_metadata = {}
if instance_id is not None:
restored_ids, snap_ins = self.get_metadata_value_by_chain(
workload.metadata, instance_id, None)
workload_metadata[instance_id] = restored_vm.vm_id
if restored_ids is None:
self.db.workload_vms_delete(
context, instance_id, workload.id)
else:
for ins in snap_ins:
workload_metadata[ins] = restored_vm.vm_id
for restored_id in restored_ids:
self.db.workload_vms_delete(
context, restored_id, workload.id)
try:
result = compute_service.delete_meta(
context, restore_id, ["workload_id", ["workload_name"]])
except nova.nova_exception.NotFound as ex:
LOG.info('Restore ID:{} does not exist. Error: {}'.format(restore_id, ex))
except Exception as ex:
LOG.exception(ex)
raise wlm_exceptions.ErrorOccurred(
reason=str(ex) %
(ex.kwargs if hasattr(
ex, 'kwargs') else {}))
self.db.workload_update(
context, workload.id, {
'metadata': workload_metadata, })
self.db.restored_vm_update(
context, restored_vm.vm_id, restore_id, {
'metadata': instance.metadata})
values = {'workload_id': workload.id,
'vm_id': restored_vm.vm_id,
'metadata': instance.metadata,
'vm_name': instance.name,
'status': 'available'}
vm = self.db.workload_vms_create(context, values)
workload_def_updated = True
compute_service.set_meta_item(
context, vm.vm_id, "workload_id", workload.id)
compute_service.set_meta_item(
context, vm.vm_id, "workload_name", workload.display_name)
restore_data_transfer_time += int(
self.db.get_metadata_value(
restored_vm.metadata, 'data_transfer_time', '0'))
restore_object_store_transfer_time += int(self.db.get_metadata_value(
restored_vm.metadata, 'object_store_transfer_time', '0'))
if workload_def_updated:
workload_utils.upload_workload_db_entry(
context, workload.id)
if restore_type == 'test':
self.db.restore_update(
context,
restore_id,
{
'progress_percent': 100,
'progress_msg': 'Create testbubble from snapshot is complete',
'status': 'available'})
else:
self.db.restore_update(
context,
restore_id,
{
'progress_percent': 100,
'progress_msg': 'Restore from snapshot is complete',
'finished_at': timeutils.utcnow(),
'time_taken': int(
(timeutils.utcnow() - restore.created_at).total_seconds()),
'metadata': {
'data_transfer_time': restore_data_transfer_time,
'object_store_transfer_time': restore_object_store_transfer_time,
'restore_user_selected_value': restore_user_selected_value,
},
'status': 'available'})
except WrappedFailure as ex:
flag = self.db.restore_get_metadata_cancel_flag(
context, restore_id, 1)
if flag == '1':
msg = _("%(exception)s") % {'exception': ex}
status = 'cancelled'
else:
status = 'error'
LOG.exception(ex)
msg = _("Failed restoring snapshot with following error(s):")
if hasattr(ex, '_causes'):
for cause in ex._causes:
if cause._exception_str not in msg:
msg = msg + ' ' + cause._exception_str
LOG.error(msg)
time_taken = 0
if 'restore' in locals() or 'restore' in globals():
if restore:
time_taken = int(
(timeutils.utcnow() - restore.created_at).total_seconds())
self.db.restore_update(
context,
restore_id,
{
'progress_percent': 100,
'progress_msg': '',
'error_msg': msg,
'finished_at': timeutils.utcnow(),
'time_taken': time_taken,
'metadata': {
'data_transfer_time': 0,
'object_store_transfer_time': 0,
'restore_user_selected_value': restore_user_selected_value,
},
'status': status})
except Exception as ex:
flag = self.db.restore_get_metadata_cancel_flag(
context, restore_id, 1)
if flag == '1':
msg = _("%(exception)s") % {'exception': ex}
status = 'cancelled'
else:
status = 'error'
LOG.exception(ex)
if restore_type == 'test':
msg = _("Failed creating test bubble: %(exception)s") % {
'exception': ex}
else:
msg = _("Failed restoring snapshot: %(exception)s") % {
'exception': ex}
LOG.error(msg)
time_taken = 0
if 'restore' in locals() or 'restore' in globals():
if restore:
time_taken = int(
(timeutils.utcnow() - restore.created_at).total_seconds())
self.db.restore_update(
context,
restore_id,
{
'progress_percent': 100,
'progress_msg': '',
'error_msg': msg,
'finished_at': timeutils.utcnow(),
'time_taken': time_taken,
'metadata': {
'data_transfer_time': 0,
'object_store_transfer_time': 0,
'restore_user_selected_value': restore_user_selected_value,
},
'status': status})
finally:
try:
backup_target.purge_snapshot_from_staging_area(
context, {'workload_id': workload.id, 'snapshot_id': snapshot.id})
except Exception as ex:
LOG.exception(ex)
try:
import gc
gc.collect()
except Exception as ex:
LOG.exception(ex)
try:
restore = self.db.restore_get(context, restore_id)
self.db.snapshot_update(
context, restore.snapshot_id, {
'status': 'available'})
self.db.workload_update(
context, workload.id, {
'status': 'available'})
if settings.get_settings(context).get('smtp_email_enable') == 'yes' or settings.get_settings(
context).get('smtp_email_enable') == '1':
self.send_email(context, restore, 'restore')
except Exception as ex:
LOG.exception(ex)
@autolog.log_method(logger=Logger)
def snapshot_delete(self, context, snapshot_id):
"""
Delete an existing snapshot
"""
def execute(context, snapshot_id):
snapshot = self.db.snapshot_get(
context, snapshot_id, read_deleted='yes')
workload_utils.snapshot_delete(context, snapshot_id)
# unlock the workload
self.db.workload_update(
context, snapshot.workload_id, {
'status': 'available'})
self.pool.submit(execute, context, snapshot_id)
@autolog.log_method(logger=Logger)
def snapshot_mount(self, context, snapshot_id, mount_vm_id):
"""
Mount an existing snapshot
"""
def _prepare_snapshot_for_mount(cntx, db, snapshot_id):
pervmdisks = {}
snapshot_obj = db.snapshot_get(cntx, snapshot_id)
workload_obj = self.db.workload_get(
context, snapshot_obj.workload_id)
snapshotvms = self.db.snapshot_vms_get(context, snapshot_id)
for vm in snapshotvms:
pervmdisks[vm.vm_id] = {'vm_name': vm.vm_name,
'vault_path': []}
if FLAGS.vault_storage_type.lower() not in ("nfs", "local", "swift-s", "s3"):
context_dict = dict([('%s' % key, value) for (
key, value) in cntx.to_dict().items()])
# RpcContext object looks for this during init
context_dict['conf'] = None
#restore, rebase, commit & upload
LOG.info(_('Processing disks'))
_preparevmflow = lf.Flow(snapshot_id + "DownloadInstance")
store = {
'connection': 'dir',
"path": CONF.taskflow_path, # save data to this directory
"max_cache_size": CONF.taskflow_max_cache_size, # keep up-to this much entries in memory
'context': context_dict,
'snapshot_id': snapshot_id,
'mount_id': str(uuid.uuid4()),
}
for instance in snapshotvms:
snapshot_vm_resources = db.snapshot_vm_resources_get(
cntx, instance['vm_id'], snapshot_obj.id)
for snapshot_vm_resource in snapshot_vm_resources:
store[snapshot_vm_resource.id] = snapshot_vm_resource.id
store['devname_' +
snapshot_vm_resource.id] = snapshot_vm_resource.resource_name
childflow = vmtasks.LinearPrepareBackupImages(
cntx, instance, snapshot_obj)
if childflow:
_preparevmflow.add(childflow)
try:
store["path"] = os.path.join(store["path"], 'snapmount_' + (store['mount_id']))
fileutils.ensure_tree(store["path"])
# execute the workflow
result = engines.run(
_preparevmflow, engine='serial', engine_conf='serial', backend={
'connection': store['connection'], "path": store["path"],
"max_cache_size": store["max_cache_size"]}, store=store)
finally:
fileutils.remove_tree(store["path"])
snapshot_vm_resources = db.snapshot_vm_resources_get(
cntx, instance['vm_id'], snapshot_obj.id)
snapshot_vm_resources = self.db.snapshot_resources_get(
context, snapshot_id)
for snapshot_vm_resource in snapshot_vm_resources:
if snapshot_vm_resource.resource_type == 'disk':
if snapshot_vm_resource.vm_id not in pervmdisks:
pervmdisks[snapshot_vm_resource.vm_id] = []
if 'restore_file_path_' + snapshot_vm_resource.id in result:
path = result['restore_file_path_' +
snapshot_vm_resource.id]
pervmdisks[snapshot_vm_resource.vm_id].append(path)
else:
backup_endpoint = self.db.get_metadata_value(
workload_obj.metadata, 'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
snapshot_vm_resources = self.db.snapshot_resources_get(
context, snapshot_id)
for snapshot_vm_resource in snapshot_vm_resources:
if snapshot_vm_resource.resource_type == 'disk':
vm_disk_resource_snap = self.db.vm_disk_resource_snap_get_top(
context, snapshot_vm_resource.id)
vault_path = os.path.join(
backup_target.mount_path,
vm_disk_resource_snap.vault_url.lstrip(
os.sep))
pervmdisks[snapshot_vm_resource.vm_id]['vault_path'].append(
vault_path)
return pervmdisks
try:
devpaths = {}
logicalobjects = {}
snapshot_metadata = {}
snapshot = self.db.snapshot_get(context, snapshot_id)
workload = self.db.workload_get(context, snapshot.workload_id)
pervmdisks = _prepare_snapshot_for_mount(
context, self.db, snapshot_id)
if workload.source_platform == 'openstack':
virtdriver = driver.load_compute_driver(
None, 'libvirt.LibvirtDriver')
fminstance = virtdriver.snapshot_mount(
context, self.db, snapshot, pervmdisks, mount_vm_id=mount_vm_id)
urls = []
for netname, addresses in fminstance.addresses.items():
for addr in addresses:
if 'addr' in addr:
urls.append("http://" + addr['addr'])
self.db.snapshot_update(context, snapshot['id'],
{'status': 'mounted',
'metadata': {
'mount_vm_id': mount_vm_id,
'mounturl': json.dumps(urls),
'mount_error': "",
}
})
# Add metadata to recovery manager vm
try:
compute_service = nova.API(production=True)
compute_service.set_meta_item(
context, mount_vm_id, "mounted_snapshot_id", snapshot['id'])
compute_service.set_meta_item(
context,
mount_vm_id,
"mounted_snapshot__url",
"/project/workloads/snapshots/%s/detail" %
snapshot['id'])
except BaseException:
pass
return {"urls": urls}
elif workload.source_platform == 'vmware':
head, tail = os.path.split(FLAGS.mountdir + '/')
fileutils.ensure_tree(head)
virtdriver = driver.load_compute_driver(
None, 'vmwareapi.VMwareVCDriver')
for vmid, diskfiles in pervmdisks.items():
# the goal is to mount as many artifacts as possible from
# snapshot
devpaths[vmid] = virtdriver.snapshot_mount(
context, snapshot, diskfiles)
try:
partitions = {}
for diskpath, mountpath in devpaths[vmid].items():
partitions[mountpath] = mountutils.read_partition_table(
mountpath)
logicalobjects[vmid] = mountutils.discover_lvs_and_partitions(
devpaths[vmid], partitions)
mountutils.mount_logicalobjects(
FLAGS.mountdir, snapshot_id, vmid, logicalobjects[vmid])
except Exception as ex:
if vmid in logicalobjects:
for vg in logicalobjects[vmid]['vgs']:
mountutils.deactivatevgs(vg['LVM2_VG_NAME'])
logicalobjects.pop(vmid)
LOG.exception(ex)
snapshot_metadata['devpaths'] = json.dumps(devpaths)
snapshot_metadata['logicalobjects'] = json.dumps(
logicalobjects)
snapshot_metadata['fsmanagerpid'] = -1
self.db.snapshot_update(context, snapshot['id'],
{'metadata': snapshot_metadata})
# TODO: Spin up php webserver
try:
snapshot_metadata = {}
snapshot_metadata['fsmanagerpid'] = \
mountutils.start_filemanager_server(FLAGS.mountdir)
snapshot_metadata['mounturl'] = "http://" + [ip for ip in socket.gethostbyname_ex(
socket.gethostname())[2] if not ip.startswith("127.")][:1][0] + ":8888"
self.db.snapshot_update(
context, snapshot['id'], {
'status': 'mounted', 'metadata': snapshot_metadata})
except Exception as ex:
LOG.error(_("Could not start file manager server"))
LOG.exception(ex)
raise
return "http://" + [ip for ip in socket.gethostbyname_ex(
socket.gethostname())[2] if not ip.startswith("127.")][:1][0] + ":8888"
except Exception as ex:
self.db.snapshot_update(context, snapshot['id'],
{'status': 'available',
'metadata': {
'mount_error': ex,
}})
try:
self.snapshot_dismount(context, snapshot['id'])
except BaseException:
pass
LOG.exception(ex)
raise
@autolog.log_method(logger=Logger)
def snapshot_dismount(self, context, snapshot_id):
"""
Dismount an existing snapshot
"""
snapshot = self.db.snapshot_get(
context, snapshot_id, read_deleted='yes')
workload = self.db.workload_get(
context, snapshot.workload_id, read_deleted='yes')
if workload.source_platform == 'openstack':
mount_vm_id = self.db.get_metadata_value(
snapshot.metadata, 'mount_vm_id')
if mount_vm_id is None:
msg = _(
"Could not find recovery manager vm id in the snapshot metadata")
LOG.error(msg)
raise wlm_exceptions.Invalid(reason=msg)
virtdriver = driver.load_compute_driver(
None, 'libvirt.LibvirtDriver')
virtdriver.snapshot_dismount(context, snapshot, None, mount_vm_id)
self.db.snapshot_update(context, snapshot_id,
{'status': 'available', 'metadata': {}})
# Delete metadata to recovery manager vm
try:
compute_service = nova.API(production=True)
compute_service.delete_meta(context, mount_vm_id,
["mounted_snapshot_id", "mounted_snapshot__url"])
except nova.nova_exception.NotFound as ex:
LOG.info('Instance ID:{} does not exist. Error: {}'.format(mount_vm_id, ex))
except BaseException:
pass
elif workload.source_platform == 'vmware':
virtdriver = driver.load_compute_driver(
None, 'vmwareapi.VMwareVCDriver')
devpaths_json = self.db.get_metadata_value(
snapshot.metadata, 'devpaths')
if devpaths_json:
devpaths = json.loads(devpaths_json)
else:
devpaths = {}
fspid = self.db.get_metadata_value(
snapshot.metadata, 'fsmanagerpid')
if (fspid and int(fspid) != -1):
mountutils.stop_filemanager_server(FLAGS.mountdir, fspid)
logicalobjects_json = self.db.get_metadata_value(
snapshot.metadata, 'logicalobjects')
if logicalobjects_json:
logicalobjects = json.loads(logicalobjects_json)
else:
logicalobjects = {}
for vmid, objects in logicalobjects.items():
try:
mountutils.umount_logicalobjects(
FLAGS.mountdir, snapshot_id, vmid, objects)
except Exception as ex:
# always cleanup as much as possible
LOG.exception(ex)
pass
vgs = objects['vgs']
for vg in vgs:
try:
mountutils.deactivatevgs(vg['LVM2_VG_NAME'])
except Exception as ex:
# always cleanup as much as possible
LOG.exception(ex)
pass
for vmid, paths in devpaths.items():
try:
virtdriver.snapshot_dismount(context, snapshot, paths)
except Exception as ex:
# always cleanup as much as possible
LOG.exception(ex)
pass
if FLAGS.vault_storage_type.lower() not in ("nfs", "local", "swift-s", "s3"):
for vmid, paths in devpaths.items():
try:
os.remove(list(paths.keys())[0])
except BaseException:
pass
parent = os.path.dirname(list(paths.keys())[0])
parent = os.path.dirname(parent)
shutil.rmtree(parent)
snapshot_metadata = {}
snapshot_metadata['devpaths'] = ""
snapshot_metadata['logicalobjects'] = ""
snapshot_metadata['mounturl'] = ""
snapshot_metadata['fsmanagerpid'] = -1
self.db.snapshot_update(
context, snapshot_id, {
'status': 'available', 'metadata': snapshot_metadata})
@autolog.log_method(logger=Logger)
def restore_delete(self, context, restore_id):
"""
Delete an existing restore
"""
self.db.restore_delete(context, restore_id)
@autolog.log_method(logger=Logger)
def get_metadata_value_by_chain(self, metadata, key, default=None):
list_of_ids = []
list_of_snap_ins = []
while True:
key1 = self.db.get_metadata_value(metadata, key, default=None)
if key1 is None:
break
list_of_snap_ins.append(key)
list_of_ids.append(key1)
key = key1
for reverse_id in list_of_ids:
ins_id = self.get_metadata_value(metadata, reverse_id, False)
if ins_id is not None:
if ins_id not in list_of_snap_ins:
list_of_snap_ins.append(ins_id)
ins_reverse_id = self.db.get_metadata_value(metadata, reverse_id)
if ins_reverse_id is None:
list_of_snap_ins.append(reverse_id)
if len(list_of_ids) == 0:
return default, list_of_snap_ins
return list_of_ids, list_of_snap_ins
@autolog.log_method(logger=Logger)
def get_metadata_value(self, metadata, value, default=None):
for kvpair in metadata:
if kvpair['value'] == value:
return kvpair['key']
return default
@autolog.log_method(logger=Logger)
def send_email(self, context, object, type):
"""
Sends success email to administrator if snapshot/restore done
else error email
"""
try:
wlm_templates_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'templates')
keystone_client = KeystoneClient(context)
if type == 'snapshot':
workload = self.db.workload_get(context, object.workload_id)
snapshotvms = self.db.snapshot_vms_get(context, object.id)
elif type == 'restore':
snapshot = self.db.snapshot_get(context, object.snapshot_id)
workload = self.db.workload_get(context, snapshot.workload_id)
snapshotvms = self.db.snapshot_vms_get(
context, object.snapshot_id)
try:
user = keystone_client.get_user_to_get_email_address(context)
if user.email is None or user.email == '':
user.email = settings.get_settings(
context, get_smtp_settings=True).get('smtp_default_recipient')
except BaseException:
o = {'name': 'admin', 'email': settings.get_settings(
context, get_smtp_settings=True).get('smtp_default_recipient')}
user = objectview(o)
pass
with open(os.path.join(wlm_templates_path, 'vms.html'), 'r') as content_file:
vms_html = content_file.read()
if object.display_description is None:
object.display_description = str()
for inst in snapshotvms:
size_converted = utils.sizeof_fmt(inst.size)
vms_html += """\
<tr style="height: 20px">
<td style="padding-left: 5px; font-size:12px; color:black; border: 1px solid #999;">
""" + inst.vm_name + """
</td><td style="padding-left: 5px; font-size:12px; color:black; border: 1px solid #999; ">
""" + str(size_converted) + """ or """ + str(inst.size) + """ bytes </td></tr>
"""
if type == 'snapshot':
subject = workload.display_name + ' Snapshot finished successfully'
size_snap_converted = utils.sizeof_fmt(object.size)
minutes = object.time_taken / 60
seconds = object.time_taken % 60
time_unit = str(minutes) + ' Minutes and ' + \
str(seconds) + ' Seconds'
with open(os.path.join(wlm_templates_path, 'snapshot_success.html'), 'r') as content_file:
html = content_file.read()
html = html.replace(
'workload.display_name',
workload.display_name)
html = html.replace('object.display_name', object.display_name)
html = html.replace(
'object.snapshot_type',
object.snapshot_type)
html = html.replace('size_snap_kb', str(size_snap_converted))
html = html.replace('object.size', str(object.size))
html = html.replace('time_unit', str(time_unit))
html = html.replace('object.host', object.host)
html = html.replace(
'object.display_description',
object.display_description)
html = html.replace(
'object.created_at', str(
object.created_at))
html = html.replace('vms_html', vms_html)
if object.status == 'error':
subject = workload.display_name + ' Snapshot failed'
with open(os.path.join(wlm_templates_path, 'snapshot_error.html'), 'r') as content_file:
html = content_file.read()
html = html.replace(
'workload.display_name', workload.display_name)
html = html.replace(
'object.display_name', object.display_name)
html = html.replace(
'size_snap_kb', str(size_snap_converted))
html = html.replace('object.size', str(object.size))
html = html.replace('object.error_msg', object.error_msg)
html = html.replace('object.host', object.host)
html = html.replace(
'object.display_description',
object.display_description)
html = html.replace('vms_html', vms_html)
elif type == 'restore':
subject = workload.display_name + ' Restored successfully'
size_snap_converted = utils.sizeof_fmt(object.size)
minutes = object.time_taken / 60
seconds = object.time_taken % 60
time_unit = str(minutes) + ' Minutes and ' + \
str(seconds) + ' Seconds'
with open(os.path.join(wlm_templates_path, 'restore_success.html'), 'r') as content_file:
html = content_file.read()
html = html.replace(
'workload.display_name',
workload.display_name)
html = html.replace('object.display_name', object.display_name)
#html = html.replace('object.restore_type',object.restore_type)
html = html.replace('size_snap_kb', str(size_snap_converted))
html = html.replace('object.size', str(object.size))
html = html.replace('time_unit', str(time_unit))
html = html.replace('object.host', object.host)
html = html.replace(
'object.display_description',
object.display_description)
html = html.replace(
'object.created_at', str(
object.created_at))
html = html.replace('vms_html', vms_html)
if object.status == 'error':
subject = workload.display_name + ' Restore failed'
with open(os.path.join(wlm_templates_path, 'restore_error.html'), 'r') as content_file:
html = content_file.read()
html = html.replace(
'workload.display_name', workload.display_name)
html = html.replace(
'object.display_name', object.display_name)
html = html.replace(
'size_snap_kb', str(size_snap_converted))
html = html.replace('object.size', str(object.size))
html = html.replace('object.error_msg', object.error_msg)
html = html.replace('object.host', object.host)
html = html.replace(
'object.display_description',
object.display_description)
html = html.replace('vms_html', vms_html)
msg = MIMEMultipart('alternative')
msg['To'] = user.email
#msg['From'] = 'admin@'+ socket.getfqdn()+'.vsphere'
msg['From'] = settings.get_settings(
context, get_smtp_settings=True).get('smtp_default_sender')
msg['Subject'] = subject
part2 = MIMEText(html, 'html')
msg.attach(part2)
smtp_settings = settings.get_settings(context, get_smtp_settings=True)
s = smtplib.SMTP(
smtp_settings.get('smtp_server_name'), int(smtp_settings.get('smtp_port')),
timeout=int(smtp_settings.get('smtp_timeout')))
if settings.get_settings(context).get(
'smtp_server_name') != 'localhost':
try:
coded_pass = smtp_settings.get('smtp_server_password')
decryted_pass = utils.decrypt_password(coded_pass, utils.tvault_key_file_name) if coded_pass else ""
if decryted_pass:
s.ehlo()
s.starttls()
s.ehlo()
s.login(
str(smtp_settings.get('smtp_server_username')),
str(decryted_pass),
)
except smtplib.SMTPException as ex:
LOG.exception(ex)
s.sendmail(msg['From'], msg['To'], msg.as_string())
s.quit()
except smtplib.SMTPException as ex:
LOG.exception(ex)
s.quit()
except Exception as ex:
LOG.exception(ex)
pass
@autolog.log_method(logger=Logger)
def schedule_send_email(self, context, object_id, object_type):
"""
Schedules success email to administrator if snapshot/restore done
else error email
"""
try:
mail_object = None
if object_type == 'snapshot':
mail_object = self.db.snapshot_get(context, object_id)
elif object_type == 'restore':
mail_object = self.db.restore_get(context, object_id)
if not mail_object:
LOG.exception('Invalid object type for sending email. Please use snapshot/restore')
return
if settings.get_settings(context).get('smtp_email_enable') in ('yes', '1', 'true'):
context = nova._get_tenant_context(context, cloud_admin=True)
self.send_email(context, mail_object, object_type)
except Exception as ex:
LOG.exception(ex)
self.db.config_workload_update(context, {'status': 'available'})
msg = _("Failed creating config backup: %(exception)s") % {
'exception': ex}
time_taken = 0
if backup:
time_taken = int(
(timeutils.utcnow() - backup.created_at).total_seconds())
backup = self.db.config_backup_update(
context,
backup_id,
{
'progress_msg': 'Configuration backup is complete',
'error_msg': msg,
'finished_at': timeutils.utcnow(),
'status': 'error',
'time_taken': time_taken,
})
workload_utils.upload_config_backup_db_entry(context, backup_id)
@autolog.log_method(Logger, 'WorkloadMgrManager.migration_plan_create')
def migration_plan_create(self, context, migration_plan_id):
try:
migration_plan = self.db.migration_plan_get(context, migration_plan_id)
vms = self.db.migration_plan_vms_get(context, migration_plan_id)
backup_endpoint = list(vault.triliovault_backup_targets.keys())[0]
migration_plan_size = 0
si = get_vcenter_service_instance()
for vm in vms:
search_index = si.content.searchIndex
vcenter_vm = search_index.FindByUuid(None, vm.vm_id, True, True)
migration_plan_size += vcenter_vm.summary.storage.uncommitted/1024/1024/1024
migration_plan_metadata = {
'migration_plan_approx_size': migration_plan_size,
'backup_media_target': backup_endpoint}
json_wl = jsonutils.dumps(migration_plan)
json_wl_vms = jsonutils.dumps(vms)
self.db.migration_plan_update(
context,
migration_plan_id,
{
'host': self.host,
'status': 'available',
'availability_zone': self.az,
'metadata': migration_plan_metadata
})
workload_utils.upload_migration_plan_db_entry(context, migration_plan_id)
except Exception as err:
with excutils.save_and_reraise_exception():
self.db.migration_plan_update(
context, migration_plan_id,
{'status': 'error',
'error_msg': str(err)
})
@autolog.log_method(logger=Logger)
def migration_plan_delete(self, context, migration_plan_id):
migration_plan = self.db.migration_plan_get(context, migration_plan_id)
migrations = self.db.migration_get_all_by_project_migration_plan(
context, context.project_id, migration_plan.id)
if len(migrations) > 0:
msg = _(
'This migration plan contains migrations. '
'Please delete all migrations and try again..')
raise wlm_exceptions.InvalidState(reason=msg)
LOG.info(_('Deleting the data of migration plan %s %s %s') %
(migration_plan.display_name, migration_plan.id,
migration_plan.created_at.strftime("%d-%m-%Y %H:%M:%S")))
backup_endpoint = self.db.get_metadata_value(migration_plan.metadata,
'backup_media_target')
if backup_endpoint is not None:
backup_target = vault.get_backup_target(backup_endpoint)
if backup_target is not None:
backup_target.migration_plan_delete(
context, {
'migration_plan_id': migration_plan.id,
'migration_plan_name': migration_plan.display_name, })
migration_plan_vms = self.db.migration_plan_vms_get(context, migration_plan.id)
for vm in migration_plan_vms:
self.db.migration_plan_vms_delete(context, vm.vm_id, migration_plan.id)
self.db.migration_plan_delete(context, migration_plan.id)
@autolog.log_method(logger=Logger)
def migration_plan_discovervms(self, context, migration_plan_id):
try:
context = nova._get_tenant_context(context)
migration_plan = self.db.migration_plan_update(
context,
migration_plan_id,
{
'host': self.host,
'progress_percent': 0,
'progress_msg': 'Migration plan discovervms task running',
'status': 'discovering'
})
migration_plan = self.db.migration_plan_get(context, migration_plan_id)
backup_endpoint = self.db.get_metadata_value(migration_plan.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
context_dict = dict([('%s' % key, value)
for (key, value) in context.to_dict().items()])
# RpcContext object looks for this during init
context_dict['conf'] = None
migration_plan_dict = dict(migration_plan.items())
migration_plan_dict.pop('created_at')
migration_plan_dict.pop('updated_at')
store = {
'connection': 'dir',
'path': CONF.taskflow_path, # save data to this directory
'max_cache_size': CONF.taskflow_max_cache_size, # keep up-to this much entries in memory
'context': context_dict, # context dictionary
'migration_plan': migration_plan_dict,
'migration_plan_id': migration_plan_id,
'source_platform': migration_plan.source_platform,
}
migration_plan_metadata = {}
for kvpair in migration_plan.metadata:
store[kvpair['key']] = str(kvpair['value'])
migration_plan_metadata[kvpair['key']] = str(kvpair['value'])
workflow_class = migration_plan_workflow.MigrationPlanWorkFlow
workflow = workflow_class(migration_plan.display_name, store)
# Clean up existing resources
vms = self.db.migration_plan_vms_get(context, migration_plan_id)
for vm in vms:
vm_resources = self.db.migration_plan_vm_resources_get(
context, vm['vm_id'], migration_plan_id)
for res in vm_resources:
if res.resource_type == 'nic':
netresources = self.db.migration_plan_vm_network_resources_get(
context, res.id)
for netres in netresources:
self.db.migration_plan_vm_network_resource_delete(
context, netres.migration_plan_vm_network_resource_id)
elif res.resource_type == 'security_group':
secresources = self.db.migration_plan_vm_security_group_rules_get(
context, res.id)
for secres in secresources:
self.db.migration_plan_vm_security_group_rule_delete(
context, secres.id)
elif res.resource_type == 'disk':
diskresources = self.db.migration_plan_vm_disk_resources_get(
context, res.id)
for diskres in diskresources:
self.db.migration_plan_vm_disk_resource_delete(
context, diskres.id)
self.db.migration_plan_vm_resource_delete(context, res.id)
vm_resources = self.db.migration_plan_vm_resources_get(
context, migration_plan_id, migration_plan_id)
for res in vm_resources:
if res.resource_type in ['network', 'subnet']:
netresources = self.db.migration_plan_vm_network_resources_get(
context, res.id)
for netres in netresources:
self.db.migration_plan_vm_network_resource_delete(
context, netres.migration_plan_vm_network_resource_id)
self.db.migration_plan_vm_resource_delete(context, res.id)
self.db.migration_plan_update(
context,
migration_plan_id,
{'progress_msg': 'Initializing discovervms workflow',
'status': 'discovering'})
workflow.initflow()
workflow.execute()
# Upload snapshot metadata to the vault
#workload_utils.upload_snapshot_db_entry(
#context, snapshot_id, snapshot_status='available')
self.db.migration_plan_update(context,
migration_plan_id,
{'progress_percent': 100,
'progress_msg': 'DiscoverVMs of migration plan is complete',
'status': 'available',
})
except WrappedFailure as ex:
LOG.exception(ex)
msg = str(ex)
self.db.migration_plan_update(context,
migration_plan_id,
{'progress_percent': 100,
'progress_msg': '',
'warning_msg': msg,
'error_msg': 'DiscoverVMs failed with error: {}'.format(msg),
'status': 'error'
})
except Exception as ex:
LOG.exception(ex)
msg = str(ex)
self.db.migration_plan_update(context,
migration_plan_id,
{'progress_percent': 100,
'progress_msg': '',
'warning_msg': msg,
'error_msg': 'DiscoverVMs failed with error: {}'.format(msg),
'status': 'error'
})
@autolog.log_method(logger=Logger)
def migration_create(self, context, migration_id):
try:
try:
import gc
gc.collect()
except Exception as ex:
LOG.exception(ex)
context = nova._get_tenant_context(context)
migration = self.db.migration_update(
context,
migration_id,
{
'host': self.host,
'progress_percent': 0,
'progress_msg': 'Migration plan is executing',
'status': 'starting'})
migration_plan = self.db.migration_plan_get(
context,
migration.migration_plan_id)
backup_endpoint = self.db.get_metadata_value(migration_plan.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
# Upload snapshot metadata to the vault
workload_utils.upload_migration_db_entry(context, migration_id)
context_dict = dict([('%s' % key, value)
for (key, value) in context.to_dict().items()])
# RpcContext object looks for this during init
context_dict['conf'] = None
migration_dict = dict(migration.items())
migration_dict.pop('created_at')
migration_dict.pop('updated_at')
store = {
'connection': 'dir',
'path': CONF.taskflow_path, # save data to this directory
'max_cache_size': CONF.taskflow_max_cache_size, # keep up-to this much entries in memory
'context': context_dict, # context dictionary
'migration': migration_dict,
'migration_plan_id': migration.migration_plan_id, # Migration plan id
'migration_id': migration_id,
'source_platform': migration_plan.source_platform,
'target_platform': "openstack",
}
migration_metadata = {}
for kvpair in migration_plan.metadata:
store[kvpair['key']] = str(kvpair['value'])
migration_metadata[kvpair['key']] = str(kvpair['value'])
workflow_class = migrationflow.MigrationWorkflow
workflow = workflow_class(migration_plan.display_name, store)
self.db.migration_update(context,
migration_id,
{'progress_percent': 0,
'progress_msg': 'Initializing Snapshot Workflow',
'status': 'executing'})
workflow.initflow()
workflow.execute()
self.db.migration_type_time_size_update(context, migration_id)
self.db.migration_update(context,
migration_id,
{'metadata': migration_metadata})
# Upload migration metadata to the vault
workload_utils.upload_migration_db_entry(
context, migration_id, migration_status='available')
# upload the data to object store... this function will check if
# the object store is configured
backup_target.upload_migration_metatdata_to_object_store(
context, {
'migration_plan_id': migration_plan.id,
'migration_plan_name': migration_plan.display_name,
'migration_id': migration.id})
self.db.migration_update(context,
migration_id,
{'progress_percent': 100,
'progress_msg': 'Invocation of migration plan is complete',
'finished_at': timeutils.utcnow(),
'status': 'available',
'metadata': migration_metadata})
except WrappedFailure as ex:
LOG.exception(ex)
flag = self.db.migration_get_metadata_cancel_flag(
context, migration_id, 1)
if flag == '1':
msg = _("%(exception)s") % {'exception': ex}
status = 'cancelled'
for vm in self.db.migration_vms_get(context, migration_id):
self.db.migration_vm_update(
context, vm.vm_id, migration_id, {
'status': status, })
else:
msg = _("Failed creating migration from a migration plan with following error(s):")
if hasattr(ex, '_causes'):
for cause in ex._causes:
if cause._exception_str not in msg:
msg = msg + ' ' + cause._exception_str
LOG.error(msg)
status = 'error'
migration = self.db.migration_get(context, migration_id)
if migration.error_msg:
msg = migration.error_msg + '\n' + msg
self.db.migration_update(context,
migration_id,
{'progress_percent': 100,
'progress_msg': '',
'error_msg': msg,
'finished_at': timeutils.utcnow(),
'status': status
})
try:
self.db.migration_type_time_size_update(context, migration_id)
except Exception as ex:
LOG.exception(ex)
except Exception as ex:
LOG.exception(ex)
flag = self.db.migration_get_metadata_cancel_flag(
context, migration_id, 1)
if flag == '1':
msg = _("%(exception)s") % {'exception': ex}
status = 'cancelled'
for vm in self.db.migration_vms_get(context, migration_id):
self.db.migration_vm_update(
context, vm.vm_id, migration_id, {
'status': status, })
else:
if hasattr(ex, 'code') and ex.code == 401:
if hasattr(
context,
'tenant') and context.tenant != '' and context.tenant is not None:
tenant = context.tenant
else:
tenant = context.project_id
msg = _(
"Failed invoking migration plan: Make sure trustee role " +
CONF.trustee_role +
" assigned to tenant " +
tenant)
else:
msg = _("Failed invoking migration plan: %(exception)s") % {
'exception': ex}
LOG.error(msg)
status = 'error'
migration = self.db.migration_get(context, migration_id)
if migration.error_msg:
msg = migration.error_msg + '\n' + msg
self.db.migration_update(context,
migration_id,
{'progress_percent': 100,
'progress_msg': '',
'error_msg': msg,
'finished_at': timeutils.utcnow(),
'status': status
})
try:
self.db.migration_type_time_size_update(context, migration_id)
except Exception as ex:
LOG.exception(ex)
finally:
try:
migration = self.db.migration_get(context, migration_id)
self.db.migration_plan_update(
context, migration.migration_plan_id, {
'status': 'available'})
except Exception as ex:
LOG.exception(ex)
@autolog.log_method(logger=Logger)
def migration_delete(self, context, migration_id):
def execute(context, migration_id):
#workload_utils.migration_delete(context, migration_id)
migration = self.db.migration_get(
context, migration_id, read_deleted='yes')
self.db.migration_plan_update(
context, migration.migration_plan_id, {
'status': 'available'})
self.db.migration_delete(context, migration_id);
execute(context, migration_id)
#self.pool.submit(execute, context, migration_id, task_id)