Repository URL to install this package:
|
Version:
4.1.94.1.dev5 ▾
|
import os
import uuid
from queue import Queue
import pickle as pickle
import json
import shutil
import math
import re
import time
import datetime
import subprocess
from subprocess import check_output
from oslo_config import cfg
from oslo_messaging._drivers import amqp
from taskflow import engines
from taskflow import task
from taskflow.listeners import printing
from taskflow.patterns import unordered_flow as uf
from taskflow.patterns import linear_flow as lf
from novaclient.exceptions import Unauthorized as nova_unauthorized
from workloadmgr.db.workloadmgrdb import WorkloadMgrDB
from workloadmgr.virt import driver
from workloadmgr.virt import qemuimages
from workloadmgr.virt import power_state
from workloadmgr.virt import driver
from workloadmgr.openstack.common import log as logging
from workloadmgr.openstack.common import fileutils
from workloadmgr.openstack.common import jsonutils
from workloadmgr.openstack.common import timeutils
from workloadmgr.image import glance
from workloadmgr.volume import cinder
from workloadmgr.compute import nova
from workloadmgr.datamover import contego
from workloadmgr.network import neutron
from workloadmgr.workloads import workload_utils
from workloadmgr.workflows.vmtasks import FreezeVM, ThawVM
from workloadmgr.virt.libvirt.restore_vm_flow import PowerOnInstance
from workloadmgr.vault import vault
from workloadmgr import utils
from workloadmgr import flags
from workloadmgr import autolog
from workloadmgr import exception
LOG = logging.getLogger(__name__)
Logger = autolog.Logger(LOG)
FLAGS = flags.FLAGS
CONF = cfg.CONF
class CopyBackupImageToVolume(task.Task):
"""
If the volume is SAN volume, initiate copy of backup data
to volume on contego.
SAN volumes include iscsi and fc channel volumes
"""
def execute(self, context, restored_instance_id,
restore_id, restore_type,
restored_file_path,
progress_tracking_file_path,
image_overlay_file_path,
volume_id=None, volume_type=None,
image_id=None, image_type=None):
return self.execute_with_log(context, restored_instance_id,
volume_id, volume_type,
image_id, image_type,
restore_id, restore_type,
restored_file_path,
image_overlay_file_path,
progress_tracking_file_path)
def revert(self, result, flow_failures, *args, **kwargs):
return self.revert_with_log(*args, **kwargs)
@autolog.log_method(Logger, 'CopyBackupImageToVolume.execute')
def execute_with_log(self, context, restored_instance_id,
volume_id, volume_type,
image_id, image_type,
restore_id, restore_type,
restored_file_path,
image_overlay_file_path,
progress_tracking_file_path):
# Call into contego to copy the data from backend to volume
compute_service = nova.API(production=True)
contego_service = contego.API(production=True)
db = WorkloadMgrDB().db
# Get a new token, just to be safe
cntx = amqp.RpcContext.from_dict(context)
cntx = nova._get_tenant_context(cntx)
server_obj = compute_service.get_server(cntx, restored_instance_id)
vast_params = {
'volume_id': volume_id,
'volume_type': volume_type,
'image_id': image_id,
'image_type': image_type,
'backup_image_file_path': restored_file_path,
'image_overlay_file_path': image_overlay_file_path,
'progress_tracking_file_path': progress_tracking_file_path,
'server_obj': server_obj.to_dict()}
contego_service.copy_backup_image_to_volume(
cntx, restored_instance_id, vast_params)
image_info = qemuimages.qemu_img_info(
restored_file_path or image_overlay_file_path)
prev_copied_size_incremental = 0
basestat = os.stat(progress_tracking_file_path)
restore_obj = db.restore_get(cntx, restore_id)
snapshot_obj = db.snapshot_get(cntx, restore_obj.snapshot_id)
workload_obj = db.workload_get(cntx, snapshot_obj.workload_id)
backup_endpoint = db.get_metadata_value(workload_obj.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
basetime = time.time()
while True:
try:
time.sleep(10)
async_task_status = {}
if progress_tracking_file_path:
try:
async_task_status['status'] = backup_target.read_progress_tracking_file(
progress_tracking_file_path
)
except Exception as ex:
LOG.exception(ex)
# if the modified timestamp of progress file hasn't for a while
# throw an exception
progstat = os.stat(progress_tracking_file_path)
# if we don't see any update to file time for 5 minutes, something is wrong
# deal with it.
if progstat.st_mtime > basestat.st_mtime:
basestat = progstat
basetime = time.time()
elif time.time() - basetime > CONF.progress_tracking_update_interval:
raise Exception(
"No update to %s modified time for last %d minutes. "
"Contego may have errored. Aborting Operation" %
(progress_tracking_file_path, CONF.progress_tracking_update_interval / 60))
else:
# For swift based backup media
params = {'metadata': progress_tracker_metadata,
'server_obj:': server_obj.to_dict()
}
async_task_status = contego_service.vast_async_task_status(
cntx, restored_instance_id, params)
data_transfer_completed = False
percentage = "0.0"
if async_task_status and 'status' in async_task_status and \
len(async_task_status['status']):
for line in async_task_status['status']:
if 'percentage complete' in line:
percentage = re.search(r'\d+\.\d+', line).group(0)
if 'Error' in line:
raise Exception(
"Data transfer failed - Contego Exception:" + line)
if 'Completed' in line:
data_transfer_completed = True
percentage = "100.0"
break
copied_size_incremental = int(float(percentage) *
image_info.virtual_size / 100)
restore_obj = db.restore_update(
cntx, restore_id, {
'uploaded_size_incremental': copied_size_incremental - prev_copied_size_incremental})
prev_copied_size_incremental = copied_size_incremental
if data_transfer_completed:
break
except nova_unauthorized as ex:
LOG.exception(ex)
# recreate the token here
cntx = nova._get_tenant_context(cntx)
except Exception as ex:
LOG.exception(ex)
raise ex
restore_obj = db.restore_update(
cntx, restore_id, {
'uploaded_size_incremental': image_info.virtual_size})
@autolog.log_method(Logger, 'CopyBackupImageToVolume.revert')
def revert_with_log(self, *args, **kwargs):
pass
class PrepareBackupImage(task.Task):
"""
Downloads objects in the backup chain and creates linked qcow2 image
"""
def execute(self, context, restore_id, vm_resource_id, volume_type):
return self.execute_with_log(
context, restore_id, vm_resource_id, volume_type)
def revert(self, context, restore_id, vm_resource_id, volume_type, result, flow_failures):
return self.revert_with_log(context)
@autolog.log_method(Logger, 'PrepareBackupImage.execute')
def execute_with_log(self, context, restore_id,
vm_resource_id, volume_type):
db = WorkloadMgrDB().db
self.cntx = amqp.RpcContext.from_dict(context)
restore_obj = db.restore_get(self.cntx, restore_id)
snapshot_obj = db.snapshot_get(self.cntx, restore_obj.snapshot_id)
workload_obj = db.workload_get(self.cntx, snapshot_obj.workload_id)
backup_endpoint = db.get_metadata_value(workload_obj.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
snapshot_vm_resource = db.snapshot_vm_resource_get(
self.cntx, vm_resource_id)
vm_disk_resource_snap = db.vm_disk_resource_snap_get_top(
self.cntx, snapshot_vm_resource.id)
resource_snap_path = os.path.join(
backup_target.mount_path,
vm_disk_resource_snap.vault_url.strip(
os.sep))
image_info = qemuimages.qemu_img_info(resource_snap_path)
if snapshot_vm_resource.resource_name in ('vda', 'sda') \
and db.get_metadata_value(
snapshot_vm_resource.metadata, 'image_id') is not None:
# upload the bottom of the chain to glance
while image_info.backing_file:
image_info = qemuimages.qemu_img_info(image_info.backing_file)
restore_file_path = image_info.image
image_overlay_file_path = resource_snap_path
image_virtual_size = image_info.virtual_size
else:
restore_file_path = resource_snap_path
image_overlay_file_path = 'not-applicable'
image_virtual_size = image_info.virtual_size
return restore_file_path, image_overlay_file_path, image_virtual_size
@autolog.log_method(Logger, 'PrepareBackupImage.revert')
def revert_with_log(self, *args, **kwargs):
pass
def CopyBackupImagesToVolumes(context, instance, snapshot_obj, restore_id,
volumes_to_restore, restore_boot_disk):
flow = lf.Flow("copybackupimagestovolumeslf")
db = WorkloadMgrDB().db
snapshot_vm_resources = db.snapshot_vm_resources_get(
context, instance['vm_id'], snapshot_obj.id)
for snapshot_vm_resource in snapshot_vm_resources:
if snapshot_vm_resource.resource_type != 'disk':
continue
volume_id = db.get_metadata_value(
snapshot_vm_resource.metadata, 'volume_id')
image_id = db.get_metadata_value(
snapshot_vm_resource.metadata, 'image_id')
if volume_id and volume_id in volumes_to_restore:
flow.add(CopyBackupImageToVolume("CopyBackupImageToVolume" + snapshot_vm_resource.id,
rebind=dict(volume_id='volume_id_' + str(snapshot_vm_resource.id),
volume_type='volume_type_' +
str(snapshot_vm_resource.id),
restored_file_path='restore_file_path_' +
str(snapshot_vm_resource.id),
progress_tracking_file_path='progress_tracking_file_path_' +
str(snapshot_vm_resource.id),
image_overlay_file_path='image_overlay_file_path_' +
str(snapshot_vm_resource.id),
)))
if db.get_metadata_value(
snapshot_vm_resource.metadata,
'image_id') and restore_boot_disk:
flow.add(CopyBackupImageToVolume("CopyBackupImageToVolume" + snapshot_vm_resource.id,
rebind=dict(image_id='image_id_' + str(snapshot_vm_resource.id),
image_type='image_type_' +
str(snapshot_vm_resource.id),
restored_file_path='restore_file_path_' +
str(snapshot_vm_resource.id),
progress_tracking_file_path='progress_tracking_file_path_' +
str(snapshot_vm_resource.id),
image_overlay_file_path='image_overlay_file_path_' +
str(snapshot_vm_resource.id),
)))
return flow
class CinderSnapshot(task.Task):
"""
Create a cinder snapshot before overwriting the data
"""
def execute(self, context, restore_id, vm_resource_id, volume_id):
return self.execute_with_log(
context, restore_id, vm_resource_id, volume_id)
def revert(self, context, restore_id, vm_resource_id, volume_id, result, flow_failures):
return self.revert_with_log(context)
@autolog.log_method(Logger, 'CreateSnapshot.execute')
def execute_with_log(self, context, restore_id, vm_resource_id, volume_id):
db = WorkloadMgrDB().db
self.cntx = amqp.RpcContext.from_dict(context)
restore_obj = db.restore_get(self.cntx, restore_id)
snapshot_obj = db.snapshot_get(self.cntx, restore_obj.snapshot_id)
volume_service = cinder.API()
volume = volume_service.get(self.cntx, volume_id)
if not volume:
raise exception.VolumeNotFound(volume_id=volume_id)
volume_service.create_snapshot_force(
self.cntx,
volume,
"TriliVault-Inplace_Snapshot",
"Snapshot created as a result of inplace restore '%s'" %
restore_id)
@autolog.log_method(Logger, 'CreateSnapshot.revert')
def revert_with_log(self, *args, **kwargs):
pass
def LinearCinderSnapshots(
context,
instance,
instance_options,
snapshotobj,
restore_id,
volumes_to_restore):
flow = lf.Flow("createsnapshotslf")
db = WorkloadMgrDB().db
snapshot_vm_resources = db.snapshot_vm_resources_get(
context, instance['vm_id'], snapshotobj.id)
for snapshot_vm_resource in snapshot_vm_resources:
if snapshot_vm_resource.resource_type != 'disk':
continue
volume_id = db.get_metadata_value(
snapshot_vm_resource.metadata, 'volume_id')
image_id = db.get_metadata_value(
snapshot_vm_resource.metadata, 'image_id')
if volume_id and volume_id in volumes_to_restore:
flow.add(
CinderSnapshot(
"CinderSnapshot" +
snapshot_vm_resource.id,
rebind=dict(
vm_resource_id=snapshot_vm_resource.id,
volume_id='volume_id_' +
snapshot_vm_resource.id)))
return flow
def LinearPrepareBackupImages(
context,
instance,
instance_options,
snapshotobj,
restore_id,
volumes_to_restore,
restore_boot_disk):
flow = lf.Flow("processbackupimageslf")
db = WorkloadMgrDB().db
snapshot_vm_resources = db.snapshot_vm_resources_get(
context, instance['vm_id'], snapshotobj.id)
for snapshot_vm_resource in snapshot_vm_resources:
if snapshot_vm_resource.resource_type != 'disk':
continue
volume_id = db.get_metadata_value(
snapshot_vm_resource.metadata, 'volume_id')
image_id = db.get_metadata_value(
snapshot_vm_resource.metadata, 'image_id')
if (volume_id and volume_id in volumes_to_restore) or \
image_id and restore_boot_disk:
flow.add(PrepareBackupImage("PrepareBackupImage" + snapshot_vm_resource.id,
rebind=dict(vm_resource_id=snapshot_vm_resource.id,
volume_type='volume_type_' + snapshot_vm_resource.id),
provides=('restore_file_path_' + str(snapshot_vm_resource.id),
'image_overlay_file_path_' +
str(snapshot_vm_resource.id),
'image_virtual_size_' + str(snapshot_vm_resource.id))))
return flow
def PowerOnInstanceFlow(context):
flow = lf.Flow("poweroninstancelf")
flow.add(PowerOnInstance("PowerOnInstance"))
return flow
def FreezeNThawFlow(context):
flow = lf.Flow("freezenthawlf")
flow.add(
FreezeVM(
"FreezeVM",
rebind={
'instance': 'restored_instance_id',
'snapshot': 'restored_instance_id',
'source_platform': 'restored_instance_id',
'restored_instance_id': 'restored_instance_id'}))
flow.add(
ThawVM(
"ThawVM",
rebind={
'instance': 'restored_instance_id',
'snapshot': 'restored_instance_id',
'source_platform': 'restored_instance_id',
'restored_instance_id': 'restored_instance_id'}))
return flow
def restore_vm_data(cntx, db, instance, restore, instance_options):
restore_obj = db.restore_get(cntx, restore['id'])
snapshot_obj = db.snapshot_get(cntx, restore_obj.snapshot_id)
workload_obj = db.workload_get(cntx, snapshot_obj.workload_id)
backup_endpoint = db.get_metadata_value(workload_obj.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
msg = 'Replaces VM "' + instance['vm_id'] + \
'" data from snapshot ' + snapshot_obj.id
db.restore_update(cntx, restore_obj.id, {'progress_msg': msg})
# refresh the token so we are attempting each VM restore with a new token
cntx = nova._get_tenant_context(cntx)
context_dict = dict([('%s' % key, value)
for (key, value) in cntx.to_dict().items()])
context_dict['conf'] = None # RpcContext object looks for this during init
snapshot_vm_resources = db.snapshot_vm_resources_get(
cntx, instance['vm_id'], snapshot_obj.id)
volumes_to_restore = []
for vdisk in instance_options.get('vdisks', []):
volumes_to_restore.append(vdisk['id'])
volumes_to_restore = set(volumes_to_restore)
restore_boot_disk = instance_options.get('restore_boot_disk', False)
# remove items that cannot be jsoned
restore_dict = dict(iter(restore.items()))
restore_dict.pop('created_at')
restore_dict.pop('updated_at')
store = {
'connection': FLAGS.sql_connection,
'context': context_dict,
'restore': restore_dict,
'restore_id': restore['id'],
'vmid': instance['vm_id'],
'restored_instance_id': instance['vm_id'],
'vmname': instance['vm_name'],
'keyname': 'keyname' in instance and instance['keyname'] or None,
'snapshot_id': snapshot_obj.id,
'restore_type': restore['restore_type'],
'instance_options': instance_options,
}
for snapshot_vm_resource in snapshot_vm_resources:
store[snapshot_vm_resource.id] = snapshot_vm_resource.id
store['devname_' + snapshot_vm_resource.id] = snapshot_vm_resource.resource_name
if snapshot_vm_resource.resource_type != 'disk':
continue
volume_id = db.get_metadata_value(
snapshot_vm_resource.metadata, 'volume_id')
image_id = db.get_metadata_value(
snapshot_vm_resource.metadata, 'image_id')
volume_type = db.get_metadata_value(
snapshot_vm_resource.metadata, 'volume_type')
progress_tracker_metadata = {'snapshot_id': snapshot_obj.id,
'resource_id': snapshot_vm_resource.id}
progress_tracking_file_path = backup_target.get_progress_tracker_path(
progress_tracker_metadata)
if volume_id in volumes_to_restore:
store['progress_tracking_file_path_' +
snapshot_vm_resource.id] = progress_tracking_file_path
store['volume_id_' + snapshot_vm_resource.id] = volume_id
store['volume_type_' + snapshot_vm_resource.id] = volume_type
if image_id and restore_boot_disk:
store['progress_tracking_file_path_' +
snapshot_vm_resource.id] = progress_tracking_file_path
store['image_id_' + snapshot_vm_resource.id] = image_id
store['image_type_' + snapshot_vm_resource.id] = 'qcow2'
store['volume_type_' + snapshot_vm_resource.id] = None
LOG.info(_('Processing disks'))
_restorevmflow = lf.Flow(instance['vm_id'] + "RestoreInstance")
childflow = LinearPrepareBackupImages(cntx, instance, instance_options,
snapshot_obj, restore['id'],
volumes_to_restore,
restore_boot_disk)
if childflow:
_restorevmflow.add(childflow)
childflow = LinearCinderSnapshots(cntx, instance, instance_options,
snapshot_obj, restore['id'],
volumes_to_restore)
if childflow:
_restorevmflow.add(childflow)
# copy data if the volumes are iscsi volumes
childflow = CopyBackupImagesToVolumes(cntx, instance, snapshot_obj,
restore['id'], volumes_to_restore,
restore_boot_disk)
if childflow:
_restorevmflow.add(childflow)
# power on the restored instance until all volumes are attached
childflow = PowerOnInstanceFlow(cntx)
if childflow:
_restorevmflow.add(childflow)
childflow = FreezeNThawFlow(cntx)
if childflow:
_restorevmflow.add(childflow)
result = engines.run(_restorevmflow, engine='serial', engine_conf='serial', backend={
'connection': store['connection']}, store=store)
if result and 'restored_instance_id' in result:
restored_instance_id = result['restored_instance_id']
compute_service = nova.API(production=True)
restored_instance = compute_service.get_server_by_id(
cntx, restored_instance_id)
restored_vm_values = {'vm_id': restored_instance_id,
'vm_name': restored_instance.name,
'restore_id': restore['id'],
'metadata': {'instance_id': instance['vm_id']},
'status': 'available'
}
restored_vm = db.restored_vm_create(cntx, restored_vm_values)
LOG.debug(_("VM Data Restore Completed"))
db.restore_update(cntx, restore_obj.id,
{'progress_msg': 'Restored VM "' + instance['vm_id'] +
'" data from snapshot ' + snapshot_obj.id,
'status': 'executing'
})
db.restore_update(cntx, restore_obj.id,
{'progress_msg': 'Restored VM:' + restored_vm['vm_id'],
'status': 'executing'})
return restored_vm
else:
raise Exception("Restoring VM data failed")