Repository URL to install this package:
Version:
5.2.8.2.dev1 ▾
|
python3-tvault-contego
/
usr
/
lib
/
python3
/
dist-packages
/
contego
/
nova
/
extension
/
driver
/
libvirtdriver.py
|
---|
# Copyright 2014 TrilioData Inc.
# All Rights Reserved.
import base64
import json
import shutil
import time
import subprocess
import tempfile
import sys
from threading import Thread
from urllib.parse import urlparse, quote
import concurrent.futures
try:
from eventlet import sleep
except ImportError:
from time import sleep
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty
import os
import libvirt
import libvirt_qemu
from collections import defaultdict
from eventlet import greenthread
import defusedxml.ElementTree as etree
from oslo_concurrency import processutils
from xml.etree.ElementTree import Element, SubElement, tostring
import os_brick.initiator.linuxscsi as os_brick_linuxscsi
import re
try:
from oslo_config import cfg
except ImportError:
from oslo.config import cfg
try:
from oslo_log import log as logging
except ImportError:
from nova.openstack.common import log as logging
try:
from oslo_serialization import jsonutils
except ImportError:
from nova.openstack.common import jsonutils
from nova import exception
from contego.exception import wrap_exception
from nova.virt.libvirt import driver as nova_driver
from nova.virt.libvirt import config as vconfig
import nova.virt.libvirt.utils as libvirt_utils
LOG = logging.getLogger(__name__)
try:
from nova import volume as cinder
except BaseException as ex:
LOG.warn(ex)
pass
try:
from nova.volume import cinder as cinder1
except BaseException as ex:
LOG.warn(ex)
pass
from contego.nova.extension.driver import qemuimages
from contego.nova.extension.driver.diskfilesdrive import DiskfilesDriveBuilder
from contego.nova.extension.driver.backends.backend import Backend
from contego.nova.extension.driver.backends.nbd import NbdMount
from contego import utils
from . import vault
from . import loopingcall
contego_driver_opts = [
cfg.IntOpt(
"qemu_agent_ping_timeout",
default=300,
help="The number of seconds to wait to qemu agent to be up and running.",
)
]
CONF = cfg.CONF
CONF.register_opts(contego_driver_opts)
class ChunkedFile(object):
"""
We send this back to the as
something that can iterate over a large file
"""
CHUNKSIZE = 65536
def __init__(self, filepath):
self.filepath = filepath
self.fp = open(self.filepath, "rb")
def __iter__(self):
"""Return an iterator over the image file"""
try:
if self.fp:
while True:
chunk = self.fp.read(ChunkedFile.CHUNKSIZE)
if chunk:
yield chunk
else:
break
finally:
self.close()
def close(self):
"""Close the internal file pointer"""
if self.fp:
self.fp.close()
self.fp = None
def cooperative_iter(iter):
"""
Return an iterator which schedules after each
iteration. This can prevent eventlet thread starvation.
:param iter: an iterator to wrap
"""
try:
for chunk in iter:
sleep(0)
yield chunk
except Exception as err:
msg = ("Error: cooperative_iter exception %s") % err
LOG.error(msg)
raise
class LibvirtDriver(nova_driver.LibvirtDriver):
def __init__(self, virt_driver, read_only=False):
super(LibvirtDriver, self).__init__(virt_driver)
self.virt_driver = virt_driver
@wrap_exception()
def vast_prepare(self, context, instance_uuid, instance_ref, params):
try:
instance_name = self._get_instance_name_by_uuid(instance_uuid)
(
file_disk_devices,
rbd_disk_devices,
cinder_disk_devices,
) = self._get_device_categories(
context, instance_uuid, instance_ref, params
)
if len(rbd_disk_devices):
backend_driver = Backend().get(
"rbdboot",
path=rbd_disk_devices[0].find("source").get("name"),
virt_driver=self.virt_driver,
)
backend_driver.prepare_snapshot(
rbd_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
if len(file_disk_devices) > 0:
backend_driver = Backend().get("file", virt_driver=self.virt_driver)
backend_driver.prepare_snapshot(
file_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
if len(cinder_disk_devices):
backend_driver = Backend().get("cinder", virt_driver=self.virt_driver)
backend_driver.prepare_snapshot(
cinder_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
except Exception as ex:
LOG.exception(ex)
raise
@wrap_exception()
def vast_freeze(self, context, instance_uuid, instance_ref, params):
self._wait_for_guest_agent(context, instance_uuid)
return self._quiesce(context, instance_uuid, True)
@wrap_exception()
def vast_thaw(self, context, instance_uuid, instance_ref, params):
return self._quiesce(context, instance_uuid, False)
@wrap_exception()
def vast_instance(self, context, instance_uuid, instance_ref, params):
try:
instance_name = self._get_instance_name_by_uuid(instance_uuid)
disks_info = []
(
file_disk_devices,
rbd_disk_devices,
cinder_disk_devices,
) = self._get_device_categories(
context, instance_uuid, instance_ref, params
)
if len(rbd_disk_devices) > 0:
backend_driver = Backend().get(
"rbdboot",
path=rbd_disk_devices[0].find("source").get("name"),
virt_driver=self.virt_driver,
)
rbd_disks_info = backend_driver.create_snapshot(
rbd_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
disks_info.extend(rbd_disks_info)
if len(file_disk_devices) > 0:
backend_driver = Backend().get("file", virt_driver=self.virt_driver)
file_disks_info = backend_driver.create_snapshot(
file_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
disks_info.extend(file_disks_info)
if len(cinder_disk_devices) > 0:
backend_driver = Backend().get("cinder", virt_driver=self.virt_driver)
cinder_disks_info = backend_driver.create_snapshot(
cinder_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
disks_info.extend(cinder_disks_info)
except Exception as ex:
LOG.exception(ex)
if hasattr(ex, "response"):
payload = json.loads(ex.response.text)
if "overLimit" in payload:
msg = "Quota Exceeded - " + payload["overLimit"]["message"]
raise exception.QuotaError(msg)
raise
return {"disks_info": disks_info}
@wrap_exception()
def vast_get_info(self, context, instance_uuid, instance_ref, params):
try:
updated_disks_info = []
if params and "disks_info" in params:
disks_info = params["disks_info"]
for disk_info in disks_info:
backend_driver = Backend().get(
disk_info["backend"],
path=disk_info["path"],
virt_driver=self.virt_driver,
)
updated_disks_info.append(
backend_driver.update_snapshot_info(disk_info)
)
except Exception as ex:
LOG.exception(ex)
raise
return {"disks_info": updated_disks_info}
@wrap_exception()
def vast_data_transfer(self, context, instance_uuid, instance_ref, params):
try:
instance_name = self._get_instance_name_by_uuid(instance_uuid)
disk_info = params["disk_info"]
backend_driver = Backend().get(
disk_info["backend"],
path=disk_info["path"],
virt_driver=self.virt_driver,
)
backend_driver.upload_snapshot(
disk_info,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
context=context,
params=params,
)
except Exception as ex:
LOG.exception(ex)
if hasattr(ex, "response"):
payload = json.loads(ex.response.text)
if "overLimit" in payload:
msg = "Quota Exceeded - " + payload["overLimit"]["message"]
raise exception.QuotaError(msg)
raise
return {"result": "success"}
@wrap_exception()
def vast_commit_image(self, context, instance_uuid, instance_ref, params):
commit_image_list = params["commit_image_list"]
secret_uuid = params['metadata'].get('secret_uuid')
for commit_images in commit_image_list:
for path, backing_path in zip(*[iter(commit_images)] * 2):
try:
vault_path = path
backing_vault_path = backing_path
image_info = qemuimages.qemu_img_info(vault_path)
image_backing_info = qemuimages.qemu_img_info(backing_vault_path)
# crosscheck if backing file of the image exists
if not image_info.backing_file:
continue
# increase the size of the base image
if image_backing_info.virtual_size < image_info.virtual_size:
qemuimages.resize_image(
backing_vault_path, image_info.virtual_size
)
qemuimages.commit_qcow2(vault_path, secret_uuid=secret_uuid, run_as_root=False)
# moving/renaming backing_vault_path to vault_path
# as vault_path is now the new Full snapshot image
shutil.move(backing_vault_path, vault_path)
except Exception as ex:
LOG.exception(ex)
raise
return {"result": "success"}
@wrap_exception()
def vast_check_prev_snapshot(self, context, instance_uuid, instance_ref, params):
try:
instance_name = self._get_instance_name_by_uuid(instance_uuid)
disk_info = params["disk_info"]
backend_driver = Backend().get(
disk_info["backend"],
path=disk_info["path"],
virt_driver=self.virt_driver,
)
status = backend_driver.check_prev_snapshot(
disk_info,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
context=context,
params=params,
)
except Exception as ex:
LOG.exception(ex)
raise
return status
@wrap_exception()
def vast_async_task_status(self, context, instance_uuid, instance_ref, params):
try:
result = vault.get_async_task_status(context, params["metadata"])
except Exception as ex:
LOG.exception(ex)
raise
return result
@wrap_exception()
def vast_finalize(self, context, instance_uuid, instance_ref, params):
instance_name = self._get_instance_name_by_uuid(instance_uuid)
if "disks_info" in params:
disks_info = params["disks_info"]
for disk_info in disks_info:
try:
backend_driver = Backend().get(
disk_info["backend"],
path=disk_info["path"],
virt_driver=self.virt_driver,
)
backend_driver.delete_snapshot(
disk_info,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
except Exception as ex:
LOG.debug(("Cannot delete snapshot %s"), disk_info["path"])
LOG.exception(ex)
else:
LOG.warning(
(
"Cannot delete snapshot. No Disk Information for instance: %s, UUID %s, ref %s"
),
instance_name,
instance_uuid,
instance_ref,
)
return
@wrap_exception()
def vast_reset(self, context, instance_uuid, instance_ref, params):
try:
instance_name = self._get_instance_name_by_uuid(instance_uuid)
disks_info = []
if not instance_ref:
# at the least cleanup boot device
backend_driver = Backend().get(
"rbdboot",
path=os.path.join(
CONF.libvirt.images_rbd_pool, instance_uuid + "_disk"
),
virt_driver=self.virt_driver,
)
rbd_disks_info = backend_driver.reset_snapshot(
[],
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
else:
(
file_disk_devices,
rbd_disk_devices,
cinder_disk_devices,
) = self._get_device_categories(
context, instance_uuid, instance_ref, params
)
if len(rbd_disk_devices) > 0:
backend_driver = Backend().get(
"rbdboot",
path=rbd_disk_devices[0].find("source").get("name"),
virt_driver=self.virt_driver,
)
rbd_disks_info = backend_driver.reset_snapshot(
rbd_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
disks_info.extend(rbd_disks_info)
if len(file_disk_devices) > 0:
backend_driver = Backend().get("file", virt_driver=self.virt_driver)
file_disks_info = backend_driver.reset_snapshot(
file_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
disks_info.extend(file_disks_info)
if len(cinder_disk_devices) > 0:
backend_driver = Backend().get(
"cinder", virt_driver=self.virt_driver
)
cinder_disks_info = backend_driver.reset_snapshot(
cinder_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
disks_info.extend(cinder_disks_info)
except Exception as ex:
LOG.exception(ex)
raise
return {"disks_info": disks_info}
@wrap_exception()
def map_snapshot_files(self, context, instance_uuid, instance_ref, params):
class open_guest_file:
def __init__(self, path, mode, timeout=60):
self.path = path
self.mode = mode
self.timeout = timeout
def __enter__(self):
# open the /etc/os-release for reading
command = {
"execute": "guest-file-open",
"arguments": {"path": self.path, "mode": self.mode},
}
command = json.dumps(command)
status = "Reading " + self.path
LOG.debug(("%s in %s") % (status, domain_name))
ret = libvirt_qemu.qemuAgentCommand(virt_dom, command, self.timeout, 0)
result = jsonutils.loads(ret)
if result.get("error", None):
msg = (
"Error from qemu-guest-agent while " + status + " of "
"%(instance_name)s: %(error)s"
) % {"instance_name": domain_name, "error": ret}
LOG.debug(msg)
raise Exception("File not found")
self.file_handle = result["return"]
return self
def __exit__(self, type, value, traceback):
try:
command = {
"execute": "guest-file-close",
"arguments": {"handle": self.file_handle},
}
command = json.dumps(command)
ret = libvirt_qemu.qemuAgentCommand(
virt_dom, command, self.timeout, 0
)
except BaseException as bex:
LOG.warn(bex)
pass
def write(self, data):
command = {
"execute": "guest-file-write",
"arguments": {"handle": self.file_handle, "buf-b64": data},
}
command = json.dumps(command)
ret = libvirt_qemu.qemuAgentCommand(virt_dom, command, self.timeout, 0)
result = jsonutils.loads(ret)["return"]
return result
def _get_power_state(virt_dom):
dom_info = virt_dom.info()
state = nova_driver.LIBVIRT_POWER_STATE[dom_info[0]]
return state
def _is_guest_linux():
# This function will determine if the guest os is linux or windows
# Since qemu guest agent does not have a command to get this info
# we will try to read /etc/os-release file. If the file exists then
# we will determine that the OS is linux, otherwise Windows
#
try:
with open_guest_file("/etc/os-release", "r") as f:
return True
except Exception as ex:
error_code = -1
if hasattr(ex, "get_error_code"):
error_code = ex.get_error_code()
msg = (
"Error from libvirt while reading /etc/os-release from "
"%(instance_name)s: [Error Code %(error_code)s] %(ex)s"
) % {"instance_name": domain_name, "error_code": error_code, "ex": ex, }
LOG.debug(msg)
return False
def _copy_diskdrives_fileto_guest(diskfiles):
# This function will write /mnt/tvault-mounts/metadata/diskfiles
# this file defines how backup images map to each VMs in the backup
# job
try:
# open the /mnt/tvault-mounts/metadata/diskfiles for writing
with open_guest_file("/mnt/tvault-mounts/metadata/diskfiles", "w+") as f:
# encode the diskfiles data into base64
diskfiles_b64 = base64.b64encode(
json.dumps(disksmetadata).encode("utf-8")
).decode("utf-8")
result = f.write(diskfiles_b64)
assert result["eof"] is False
if result["count"] < len(json.dumps(diskfiles)):
msg = (
"the amount of data written to /mnt/tvault-mounts/metadata/diskfiles is less than "
"len of diskfiles. Expected %s, actual %s"
) % (str(len(diskfiles)), str(result["count"]))
raise exception.NovaException(msg)
except Exception as ex:
error_code = -1
if hasattr(ex, "get_error_code"):
error_code = ex.get_error_code()
msg = (
"Error from libvirt while writing /mnt/tvault-mounts/metadata/diskfiles of "
"%(instance_name)s: [Error Code %(error_code)s] %(ex)s"
) % {"instance_name": domain_name, "error_code": error_code, "ex": ex, }
LOG.info(msg)
raise
def _copy_filemanager_scripts_to_guest():
try:
scripts_path = os.path.join(
os.path.dirname(__file__), "filemanager_scripts"
)
contego_version = "#2.5.0"
n = "fm_scripts_version.py"
with open(os.path.join(scripts_path, n), "r") as lf:
contego_version = lf.read().strip()
try:
with open_guest_file(
os.path.join("/mnt/tvault-mounts/metadata", contego_version), "r"
) as f:
return
except BaseException as bex:
LOG.warn(bex)
pass
for n in os.listdir(scripts_path):
# including only py files. pyc files will create problem
# during encoding
if n.endswith(".py"):
# open the /mnt/tvault-mounts/metadata'n' for writing
with open(os.path.join(scripts_path, n), "r") as lf:
script = lf.read()
with open_guest_file(
os.path.join("/mnt/tvault-mounts/metadata", n), "w+"
) as f:
# encode the diskfiles data into base64
scripts_b64 = base64.b64encode(script.encode("utf-8"))
result = f.write(scripts_b64.decode("utf-8"))
assert result["eof"] is False
if result["count"] < len(script):
msg = (
"the amount of data written to %s is less than "
"len of local script. Expected %s, actual %s"
) % (
os.path.join("/mnt/tvault-mounts/metadata", n),
str(len(script)),
str(result["count"]),
)
raise exception.NovaException(msg)
with open_guest_file(
os.path.join("/mnt/tvault-mounts/metadata", contego_version), "w+"
) as f:
pass
except Exception as ex:
error_code = -1
if hasattr(ex, "get_error_code"):
error_code = ex.get_error_code()
msg = (
"Error from libvirt while writing /mnt/tvault-mounts/metadata/%(n)s of "
"%(instance_name)s: [Error Code %(error_code)s] %(ex)s"
) % {
"n": n,
"instance_name": domain_name,
"error_code": error_code,
"ex": ex,
}
LOG.info(msg)
raise
def _ensure_overlay_directory():
utils.ensure_tree(
os.path.join(CONF.instances_path, "contego_mount_overlays")
)
def _create_overlay_file(disk, secret_uuid):
_ensure_overlay_directory()
overlay_filename = os.path.split(disk)[1] + ".overlay"
abs_overlay_filename = os.path.join(
CONF.instances_path, "contego_mount_overlays", overlay_filename
)
try:
os.remove(abs_overlay_filename)
except BaseException as bex:
LOG.warn(bex)
pass
info = qemuimages.qemu_img_info(disk)
if secret_uuid:
qemuimages.create_cow_image(disk, abs_overlay_filename, size=info.virtual_size,payload=secret_uuid)
else:
qemuimages.create_cow_image(disk, abs_overlay_filename, info.virtual_size)
return abs_overlay_filename
def _get_guest_agent_script_path():
# This method will determine the guest agent script based on known paths
# In few distributions this path is "/etc/qemu-ga/" or "/etc/qemu/"
guest_agent_script_path = None
try:
with open_guest_file("/etc/qemu-ga/fsfreeze-hook", 'r') as fil:
return "/etc/qemu-ga/fsfreeze-hook"
except Exception as ex:
LOG.warn(ex)
pass
try:
with open_guest_file("/etc/qemu/fsfreeze-hook", 'r') as fil:
return "/etc/qemu/fsfreeze-hook"
except Exception as ex:
LOG.warn(ex)
pass
if guest_agent_script_path is None:
raise Exception('Not able to detect qemu guest agent hook path')
def _generate_device_xml(virt_dom, disknum, hostbus, overlay_file, secret_uuid=None, partition=None):
diskelement = Element("disk")
diskelement.set("type", "block")
diskelement.set("device", "disk")
driver = SubElement(diskelement, "driver")
driver.set("name", "qemu")
driver.set("type", "raw")
source = SubElement(diskelement, "source")
if partition:
source.set("dev", "/dev/" + partition)
target = SubElement(diskelement, "target")
target.set("dev", disknum)
target.set("bus", hostbus)
devxml = etree.tostring(diskelement).decode()
status = virt_dom.attachDeviceFlags(
devxml,
libvirt.VIR_DOMAIN_AFFECT_CONFIG
| libvirt.VIR_DOMAIN_AFFECT_LIVE,
)
return status
def _update_fsfreeze_script_on_guest():
try:
guest_script_path = _get_guest_agent_script_path()
scripts_path = os.path.join(
os.path.dirname(__file__), "filemanager_scripts"
)
script_path = os.path.join(scripts_path, 'fsfreeze.sh')
with open(script_path, 'r') as script_fil:
script_data = script_fil.read()
with open_guest_file(guest_script_path, "w+") as guest_file:
script_data_b64 = base64.b64encode(script_data.encode("utf-8"))
result = guest_file.write(script_data_b64.decode("utf-8"))
assert result["eof"] is False
if result["count"] < len(script_data):
msg = (
"the amount of data written to %s is less than "
"len of local script. Expected %s, actual %s"
) % (
guest_script_path,
str(len(script_data)),
str(result["count"]),
)
raise exception.NovaException(msg)
except Exception as ex:
error_code = -1
if hasattr(ex, "get_error_code"):
error_code = ex.get_error_code()
msg = (
"Error from libvirt while writing to "
"%(instance_name)s: [Error Code %(error_code)s] %(ex)s"
) % {
"instance_name": domain_name,
"error_code": error_code,
"ex": ex,
}
LOG.info(msg)
raise
def _write_mount_metadata(mount_metadata, instance_uuid):
overlay_file_dir = os.path.join(
CONF.instances_path, "contego_mount_overlays"
)
if os.path.isdir(overlay_file_dir):
frm_metadata_path = os.path.join(
overlay_file_dir, "frm_{}".format(instance_uuid)
)
with open(frm_metadata_path, "w") as fh:
fh.write(json.dumps(mount_metadata))
try:
backend_driver = Backend().get("file", virt_driver=self.virt_driver)
pervmdisks = params["diskfiles"]
domain_name = instance_ref['OS-EXT-SRV-ATTR:instance_name']
virt_dom = self.virt_driver._conn.lookupByName(domain_name)
if not virt_dom:
raise Exception("Cannot find virt_dom")
assert virt_dom.UUIDString() == instance_uuid
# Wait for guest agent to be pingable
success = self._wait_for_guest_agent(context, instance_uuid)
_update_fsfreeze_script_on_guest()
try:
# Freeze to make sure everything is clean and update FsFreeze hooks
self._quiesce(context, instance_uuid, True)
sleep(20)
# Thaw VM
self._quiesce(context, instance_uuid, False)
except Exception as ex:
LOG.exception(ex)
xml = virt_dom.XMLDesc(0)
doc = etree.fromstring(xml)
# detach all disks
disk_found = False
disks = doc.findall("devices/disk")
for disk in disks:
if disk.get("device") != 'disk':
continue
target = disk.find("target")
disk_prefix = target.get("dev")[:2]
# Since, we know `ide` will always comes for the directly attached
# physical device, so, we are validating only for this category.
# Possible values are (ide, scsi, virtio, xen, usb, sata, or sd).
# TODO: Identify rest of the supported hostbus options for us.
hostbus = target.get("bus")
if hostbus == "ide":
continue
disk_found = True
break
if not disk_found:
raise Exception("Unable to mount, valid disk not found")
diskstr = []
for i in range(0, 26):
diskstr.append(disk_prefix + chr(ord("a") + i))
for j in range(0, 20):
for i in range(0, 26):
diskstr.append(disk_prefix + chr(ord("a") + j) + chr(ord("a") + i))
disk_targets = doc.findall("devices/disk/target")
for d in disk_targets:
try:
diskstr.remove(d.get("dev"))
except BaseException as ex:
LOG.warn(ex)
pass
# Wait for guest agent to be pingable
success = self._wait_for_guest_agent(context, instance_uuid)
if not success:
msg = (
(
"Error: Waiting for guest agent timedout \
for instance %s"
)
% domain_name
)
raise exception.NovaException(msg)
linux_guest = _is_guest_linux()
snapshot_id = params.get("snapshot_id")
mount_metadata = defaultdict(dict)
# add any new disk images here
try:
directories = set()
for vm, disks in pervmdisks.items():
for disk in disks["vault_path"]:
directories.add(os.path.dirname(disk))
backend_driver.configure_security_profile(
instance_uuid, list(directories)
)
disksmetadata = {}
diskstr.reverse()
disknum = diskstr.pop()
vdxp = re.compile("_vd[a-z]")
for vm, disks in pervmdisks.items():
if vm not in disksmetadata:
disksmetadata[vm] = {"vm_name": disks["vm_name"]}
for disk in disks["vault_path"]:
secret_uuid = params.get('metadata').get('secret_uuid')
overlay_file = _create_overlay_file(disk, secret_uuid)
unused_nbd_dev = None
info = qemuimages.qemu_img_info(overlay_file)
if not secret_uuid:
secret_uuid = ''
if not (utils.parse_encrypted_image_backing_file(info.backing_file) == disk):
LOG.exception('Backing file of overlay file does not match with Target.')
raise
nbd_obj = NbdMount(overlay_file, None)
unused_nbd_dev = nbd_obj._find_unused(nbd_obj._detect_nbd_devices())
status = nbd_obj._create_nbd_device_as_fdisk(unused_nbd_dev, secret_uuid, overlay_file)
if status:
LOG.info('successfully created nbd device with overlay file.')
status = _generate_device_xml(virt_dom, disknum, hostbus, overlay_file, secret_uuid,
unused_nbd_dev)
LOG.info('Status of Libvirt attaching Disk to FRM: {0}'.format(status))
time.sleep(5)
else:
# if nbd device is failed to connect, then
# perform a cleanup and raise error
if "metadata" not in mount_metadata[snapshot_id]:
mount_metadata[snapshot_id] = {"metadata": {}}
mount_metadata[snapshot_id]["metadata"].update(
{overlay_file: unused_nbd_dev}
)
if mount_metadata:
_write_mount_metadata(mount_metadata, instance_uuid)
self._frm_mount_cleanup(instance_uuid, snapshot_id)
raise Exception("Failed to connect nbd device: {} and overlayfile: {}".format(
unused_nbd_dev, overlay_file)
)
if "metadata" not in mount_metadata[snapshot_id]:
mount_metadata[snapshot_id] = {"metadata": {}}
mount_metadata[snapshot_id]["metadata"].update(
{overlay_file: unused_nbd_dev}
)
disksmetadata[vm][disk] = disknum
disknum = diskstr.pop()
if mount_metadata:
mount_metadata[snapshot_id].update({"mount_status": True})
_write_mount_metadata(mount_metadata, instance_uuid)
# create metadata associated with all the attached disk
if linux_guest:
_copy_diskdrives_fileto_guest(disksmetadata)
_copy_filemanager_scripts_to_guest()
except Exception as err:
msg = ("Error: Cannot map snapshot volume images - %s %s") % (
disk,
err,
)
LOG.error(msg)
raise err
finally:
try:
backend_driver.reset_security_profile(
instance_uuid, list(directories)
)
except Exception as err2:
msg = ("Error: resetting app armor profile %s") % err
LOG.error(msg)
raise err2
try:
# Unfreeze to make sure everything is clean
self._quiesce(context, instance_uuid, False)
except BaseException:
sleep(60)
sleep(30)
try:
self._quiesce(
context,
instance_uuid,
True,
timeout=libvirt_qemu.VIR_DOMAIN_QEMU_AGENT_COMMAND_BLOCK,
)
except BaseException:
msg = (
"Error calling fsfreeze freeze hook successfully. Continuing..."
)
LOG.error(msg)
sleep(10)
try:
self._quiesce(context, instance_uuid, False)
except BaseException:
msg = "Error calling fsfreeze thaw hook successfully. Continuing..."
LOG.error(msg)
pass
except Exception as ex:
LOG.exception(ex)
raise
return
def _ping_guest_agent(self, context, instance_uuid):
instance_name = self._get_instance_name_by_uuid(instance_uuid)
command = '{"execute":"guest-ping"}'
status = "pinging guest agent"
LOG.debug(("pinging guest agent in %s"), instance_name)
try:
domain = self.virt_driver._conn.lookupByName(instance_name)
ret = libvirt_qemu.qemuAgentCommand(domain, command, 60, 0)
except Exception as ex:
error_code = ex.get_error_code()
if error_code == 74:
# the guest agent is not configured
return
raise
result = jsonutils.loads(ret)
if result.get("error", None):
msg = (
"Error from qemu-guest-agent while " + status + " of "
"%(instance_name)s: %(error)s"
) % {"instance_name": instance_name, "error": ret}
raise exception.NovaException(msg)
def _wait_for_guest_agent(self, context, instance_uuid):
instance_name = self._get_instance_name_by_uuid(instance_uuid)
def _wait_for_ping():
"""Called at an interval until the VM is running again."""
try:
self._ping_guest_agent(context, instance_uuid)
raise loopingcall.LoopingCallDone()
except loopingcall.LoopingCallDone:
raise
except libvirt.libvirtError as ex:
error_code = ex.get_error_code()
if error_code == 86:
msg = "Still waiting for guest agent to be up and running"
LOG.debug(msg)
else:
msg = (
"Error from libvirt while pinging guest agent of "
"%(instance_name)s: [Error Code %(error_code)s] %(ex)s"
) % {
"instance_name": instance_name,
"error_code": error_code,
"ex": ex,
}
LOG.error(msg)
LOG.exception(ex)
raise
try:
timer = loopingcall.FixedIntervalLoopingCall(_wait_for_ping)
timer.start(
interval=5, max_duration=max(CONF.qemu_agent_ping_timeout, 300)
).wait()
return True
except Exception:
return False
def _quiesce(self, context, instance_uuid, quiesce, timeout=60):
instance_name = self._get_instance_name_by_uuid(instance_uuid)
if quiesce:
command = '{"execute": "guest-fsfreeze-freeze"}'
status = "freezing guest filesystems"
LOG.debug(("freezing guest filesystems of %s"), instance_name)
else:
command = '{"execute": "guest-fsfreeze-thaw"}'
status = "thawing guest filesystems"
LOG.debug(("Thawing guest filesystems of %s"), instance_name)
try:
domain = self.virt_driver._conn.lookupByName(instance_name)
ret = libvirt_qemu.qemuAgentCommand(domain, command, timeout, 0)
except Exception as ex:
error_code = ex.get_error_code()
msg = (
"Error from libvirt while " + status + " of "
"%(instance_name)s: [Error Code %(error_code)s] %(ex)s"
) % {"instance_name": instance_name, "error_code": error_code, "ex": ex, }
LOG.warning(msg)
return
result = jsonutils.loads(ret)
if result.get("error", None):
msg = (
"Error from qemu-guest-agent while " + status + " of "
"%(instance_name)s: %(error)s"
) % {"instance_name": instance_name, "error": ret}
LOG.warning(msg)
def _get_device_categories(self, context, instance_uuid, instance_ref, params):
instance_name = self._get_instance_name_by_uuid(instance_uuid)
domain = self.virt_driver._conn.lookupByName(instance_name)
domain_xml = domain.XMLDesc(0)
doc = etree.fromstring(domain_xml)
lun_devices = doc.findall("devices/disk[@device='lun']")
if lun_devices and len(lun_devices) > 1:
msg = "LUN devices are not supported"
raise exception.NovaException(msg)
disk_devices = doc.findall("devices/disk[@device='disk']")
if not disk_devices or len(disk_devices) <= 0:
msg = "Did not find any disks attached to the instance"
raise exception.NovaException(msg)
file_disk_devices = []
rbd_disk_devices = []
cinder_disk_devices = []
for device in disk_devices:
disk_type = device.get("type")
if device.find("serial") is not None:
cinder_volume_id = device.find("serial").text
try:
_volume_api = cinder.API()
except BaseException:
_volume_api = cinder1.API()
try:
volume = _volume_api.get(context, cinder_volume_id)
if volume:
cinder_disk_devices.append(device)
else:
msg = (
"Unknown disk type %s mapped to the " "instance"
) % disk_type
raise exception.NovaException(msg)
except exception.CinderConnectionFailed as ex:
LOG.exception(ex)
raise ex
except Exception as ex:
LOG.exception(ex)
msg = ("Unknown disk type %s mapped to the instance") % disk_type
raise exception.NovaException(msg)
elif disk_type == "volume" and device.find("serial") is None:
source = device.find("source")
if "name" in list(source.keys()):
name = source.get("name")
if (
name.endswith("config")
or name.endswith("swap")
or "disk.eph" in name
):
continue
backend = device.find("source").get("pool")
if backend == "rbd" or backend == "ceph":
rbd_disk_devices.append(device)
elif disk_type == "network" and device.find("serial") is None:
source = device.find("source")
if "name" in list(source.keys()):
name = source.get("name")
if (
name.endswith("config")
or name.endswith("swap")
or "disk.eph" in name
):
continue
backend = device.find("source").get("protocol")
if backend == "rbd" or backend == "ceph":
rbd_disk_devices.append(device)
elif disk_type == "file" and device.find("serial") is None:
# we only support boot disks booting off of glance
# image on local disk
# Adding check for "Force config drive" and neglecting it.
source = device.find("source")
if "file" in list(source.keys()):
file = source.get("file")
if (
file.endswith("config")
or file.endswith("swap")
or "disk.eph" in file
):
continue
file_disk_devices.append(device)
else:
# block device
msg = "Unknown disk type %s mapped to the instance" % disk_type
raise exception.NovaException(msg)
return (file_disk_devices, rbd_disk_devices, cinder_disk_devices)
def _get_instance_name_by_uuid(self, instance_uuid):
for name in self.virt_driver.list_instances():
iuuid = self.virt_driver._conn.lookupByName(name).UUIDString()
if iuuid == instance_uuid:
return name
return None
def _frm_mount_cleanup(self, frminstance_id, snapshot_id):
def _cleanup(metadata):
failed_devices = {}
for overlay_file, nbd_device in metadata.items():
if nbd_device:
nbd_obj = NbdMount(overlay_file, None)
res = nbd_obj._disconnect_nbd_devices([nbd_device])
if not res:
LOG.error('Failed to disconnect nbd device: {0}'.format(nbd_device))
# if nbd device is not disconnected, do not delete overlay file
failed_devices.update({overlay_file: nbd_device})
# if nbd device is disconnected, delete overlay file
elif os.path.exists(overlay_file):
os.remove(overlay_file)
# no nbd device is associated with overlay file, delete the file
elif os.path.exists(overlay_file):
os.remove(overlay_file)
return failed_devices
is_cleaned = True
frm_metadata_path = os.path.join(
CONF.instances_path, "contego_mount_overlays",
"frm_{}".format(frminstance_id)
)
if not os.path.exists(frm_metadata_path):
LOG.error("The FRM metadata file is not present, skipping a nbd device cleanup")
return False
frm_metadata = {}
with open(frm_metadata_path, "r") as fh:
frm_metadata = json.loads(fh.read())
metadata = frm_metadata.get(snapshot_id, {}).get("metadata", {})
failed_devices = {}
for i in range(2):
failed_devices = _cleanup(metadata)
if failed_devices:
metadata = failed_devices
sleep(20)
continue
else:
break
clean_metadata_file = True
if failed_devices:
nbd_obj = NbdMount(None, None)
for overlay_file, nbd_device in failed_devices.items():
free_device = nbd_obj._find_unused([nbd_device])
if not free_device:
clean_metadata_file = False
is_cleaned = False
LOG.error('Failed to disconnect nbd device: {} and overlay file: {}'.format(
nbd_device, overlay_file))
# if nbd device cleanup is successful then remove the overlay file
elif os.path.exists(overlay_file):
os.remove(overlay_file)
if clean_metadata_file:
frm_metadata.pop(snapshot_id, None)
if not frm_metadata:
frm_metadata_path = os.path.join(
CONF.instances_path, "contego_mount_overlays",
"frm_{}".format(frminstance_id))
if os.path.exists(frm_metadata_path):
os.remove(frm_metadata_path)
return is_cleaned
@wrap_exception()
def copy_backup_image_to_volume(self, context, instance_uuid, instance_ref, params):
try:
domain_name = instance_ref['OS-EXT-SRV-ATTR:instance_name']
virt_dom = self.virt_driver._conn.lookupByName(domain_name)
if not virt_dom:
raise Exception("Cannot find virt_dom")
assert virt_dom.UUIDString() == instance_uuid
xml = virt_dom.XMLDesc(0)
xml_doc = etree.fromstring(xml)
device_info = vconfig.LibvirtConfigGuest()
device_info.parse_dom(xml_doc)
for guest_disk in device_info.devices:
if guest_disk.root_name != "disk":
continue
if guest_disk.target_dev is None:
continue
source_types = ("block", "lvm", "file")
if (
guest_disk.serial is not None
and guest_disk.serial == params["volume_id"]
): # noqa
if guest_disk.source_type == "network":
backend = guest_disk.source_protocol
elif guest_disk.source_type == "volume":
backend = guest_disk.source_pool
elif (
guest_disk.source_type in source_types
and guest_disk.source_device == "disk"
and guest_disk.serial
):
backend = "cinder"
path = guest_disk.source_name or guest_disk.source_path
backend_driver = Backend().get(
backend, path=path, virt_driver=self.virt_driver
)
backend_driver.copy_backup_image_to_volume(
context, instance_uuid, instance_ref["name"], params=params,
)
break
if (
guest_disk.target_dev in ("vda", "sda")
and guest_disk.source_type == "file"
and guest_disk.driver_format == "qcow2"
and guest_disk.serial is None
and params["image_id"] is not None
and params["image_overlay_file_path"] is not None
): # noqa
params["path"] = guest_disk.source_name or guest_disk.source_path
backend_driver = Backend().get(
"qcow2", virt_driver=self.virt_driver
)
backend_driver.copy_backup_image_to_volume(
context, instance_uuid, instance_ref["name"], params=params,
)
break
elif (
guest_disk.target_dev in ("vda", "sda")
and guest_disk.source_type == "network"
and guest_disk.source_protocol == "rbd"
and guest_disk.source_device == "disk"
and guest_disk.driver_format == "raw"
and guest_disk.serial is None
and params["volume_id"] is None
and params["image_id"] is not None
and params["image_overlay_file_path"] is not None
):
params["path"] = guest_disk.source_name or guest_disk.source_path
backend_driver = Backend().get(
"rbdboot", path=params["path"], virt_driver=self.virt_driver,
)
backend_driver.copy_backup_image_to_volume(
context, instance_uuid, instance_ref["name"], params=params,
)
break
except Exception as ex:
LOG.exception(ex)
raise
return {"result": "success"}
@wrap_exception()
def vast_disk_check(self, context, params):
disk_check = {'status': False}
try:
for disk in params['vm_disks']:
disk_info = qemuimages.qemu_img_info(disk)
if disk_info.file_format != 'qcow2':
continue
status = qemuimages.qemu_integrity_check(disk, params)
if status is False:
LOG.exception("Disk Integrity check failed for disk: %s" % (disk))
return disk_check
if status is None:
disk_check['status'] = None
return disk_check
disk_check['status'] = True
return disk_check
except Exception as ex:
LOG.exception(ex)
return disk_check
@wrap_exception()
def vast_clean_nbd_devices(self, context, instance_uuid, params):
if params.get("snapshot_id"):
return {"result": self._frm_mount_cleanup(instance_uuid, params["snapshot_id"])}
return {"result": True}
@wrap_exception()
def populate_instance_with_virt_v2v(self, context, instance_uuid,
instance_ref, params):
def list_devices():
devices = []
for dev in os.listdir('/dev'):
if dev.startswith('nbd'):
# see if it's a partition
if len(dev.split('p')) == 1:
devices.append(dev)
return devices
def find_available():
devices = []
busy = []
for x in os.popen("cat /proc/partitions |\
grep nbd | \
awk '{print $4}'").readlines():
busy.append(x.strip())
for d in list_devices():
if d not in busy:
devices.append('/dev/%s' %d)
return devices
def run_dd_command(dd_command):
try:
subprocess.run(dd_command, shell=False, check=True)
except subprocess.CalledProcessError as e:
LOG.error(f"Error running command: {dd_command}\nError: {e}")
raise
def parse_line(line):
# Split the line into tokens using space as a delimiter
tokens = line.split()
# Extract sector and length from the tokens
sector = int(tokens[0].split(":")[1])
length = int(tokens[1].split(":")[1])
return sector, length
def read_extentfile(file_path):
data = []
with open(file_path, 'r') as file:
for line in file:
# Ignore empty lines or lines that do not match the expected format
if line.strip() and line.startswith("sector:") and "length:" in line:
sector, length = parse_line(line)
data.append((sector, length))
return data
def _enqueue_output(out, queue):
line = out.read(17)
while line:
queue.put(line)
line = out.read(17)
out.close()
def _create_softlinks(v2v_tempdir, vmname, devices):
a_z = range(ord('a'), ord('z')+1)
_ret = {}
for idx, d in enumerate(devices):
diskname = "sd%s" % (chr(a_z[idx]))
devpath = os.path.join(v2v_tempdir, "%s-%s" % (
vmname, diskname))
os.symlink(d, devpath)
_ret[diskname] = devpath
return _ret
def _execute_cmd(cmd, tracking_file, env={}, retry=False):
max_tries = 3 if retry else 1
ret_exception = None
while max_tries > 0:
ret_exception = None
queue = Queue()
read_thread = None
my_env = os.environ.copy()
my_env.update(env)
try:
with subprocess.Popen(
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
close_fds=True, shell=False, env=my_env) as process:
read_thread = Thread(
target=_enqueue_output, args=(process.stdout, queue))
read_thread.daemon = True # thread dies with the program
read_thread.start()
_update_tracking_file(tracking_file, process, queue, cmd)
read_thread.join()
break
except Exception as ex:
ret_exception = ex
LOG.exception(ex)
max_tries -= 1
if read_thread:
read_thread.join()
if ret_exception:
raise(ret_exception)
def _update_tracking_file(tracking_file, process, update_queue,
cmd):
""" Utility method that updates a tracking file with
information dury an image copy.
Args:
tracking_file (str): Path to the tracking file to update.
process (process): Process handle of the sub process.
update_queue (queue): Queue of file copy percentages
from process.
cmd (str): Comand the process is executing. Used for logging.
"""
percentage = 0.0
process_status = None
tracking_dir = os.path.dirname(tracking_file)
cancel_file = os.path.join(tracking_dir, "cancelled")
COPY_DISK_RE = '^.*Copying disk (\d+)/(\d+)'
DISK_PROGRESS_RE = '^..\s*(\d+)% \[.*\]'
FINISHED_RE = '^\[[ .0-9]*\] Finishing off'
cd = re.compile(COPY_DISK_RE)
dp = re.compile(DISK_PROGRESS_RE)
fd = re.compile(FINISHED_RE)
# Keep updating the progress tracking file while the
# process is still running and there are items the queue.
while process_status is None:
time.sleep(10)
process_status = process.poll()
try:
try:
output = update_queue.get(timeout=10).decode('utf-8')
except Empty:
if os.path.exists(cancel_file):
try:
process.kill()
except Exception as ex:
LOG.exception(ex)
pass
process_status = process.poll()
continue
except Exception as ex:
LOG.exception(ex)
else:
line = output
m = cd.match(line)
utils.update_progress(tracking_file, "In Progress\n")
if m:
_msg = "COPY_DISK found: %s" % m.group()
LOG.debug("_msg")
utils.update_progress(tracking_file, _msg+"\n")
m = dp.match(line)
if m:
_msg = "DISK_PROGRESS found: %s" % m.group()
LOG.debug("_msg")
utils.update_progress(tracking_file, _msg+"\n")
m = dp.match(line)
if m:
_msg = "FINISHED found: %s" % m.group()
LOG.debug("_msg")
utils.update_progress(tracking_file, _msg+"\n")
if os.path.exists(cancel_file):
try:
process.kill()
except Exception as ex:
LOG.exception(ex)
pass
utils.update_progress(tracking_file,
"Cancelling\n")
process_status = process.poll()
except Exception as ex:
LOG.exception(ex)
pass
process.stdin.close()
process.stdout.close()
_returncode = process.returncode # pylint: disable=E1101
if _returncode:
LOG.info(("Result was %d" % _returncode))
if _returncode == -9:
msg = "User initiated cancel request"
else:
msg = "Execution error %(exit_code)d. cmd %(cmd)s" % {
"exit_code": _returncode,
"cmd": utils.sanitize_message(cmd)}
utils.update_progress(tracking_file, msg+"\n")
raise Exception(msg)
utils.update_progress(tracking_file, "Finishing off\n")
try:
devpaths = []
rbdvolumes = []
v2v_base_tempdir = '/tmp/trilio_v2v'
vddk_lib_path = '/opt/vmware-vix-disklib-distrib'
vddk_lib64_path = os.path.join(vddk_lib_path, "lib64")
domain_name = self._get_instance_name_by_uuid(instance_uuid)
virt_dom = self.virt_driver._conn.lookupByName(domain_name)
if not virt_dom:
raise Exception("Cannot find virt_dom")
assert virt_dom.UUIDString() == instance_uuid
vmname = params['vcenter_vm_name']
os.makedirs(v2v_base_tempdir, exist_ok=True)
v2v_tempdir = tempfile.mkdtemp(prefix=vmname, dir=v2v_base_tempdir)
virt_v2v_exe = 'sudo VIRT_V2V_TMPDIR=%s LIBGUESTFS_BACKEND=direct'\
' /usr/bin/nova-rootwrap /etc/nova/rootwrap.conf'\
' virt-v2v' % (v2v_tempdir)
nbdkit_cmd = 'nbdkit --newstyle --readonly '\
'--filter retry '\
'--filter exitlast '\
'vddk '
migration_type = params.get('migration_type', 'cold')
guestvm_info = params.get('guestvm_info', {})
convert_only = params.get('convert_only', False)
vcenter_user = params['user'] # original username before quote
params['user'] = quote(params['user'])
userpassword = params['password']
o = urlparse(params['vcenter_url'])
thumbprint = params['thumbprint']
# Currently there is no way to provide ssl certificate
# when the ssl verification is enabled.
# no_verify = 1 if params['disable_ssl_verification'] else 0
no_verify = 1
vcenter_url = "vpx://%s@%s/%s/%s?no_verify=%d" % \
(params['user'], o.hostname,
params['vcenter_datacenter'],
params['vcenter_hostname'],
no_verify)
tracking_file = params['progress_tracking_file_path']
# create password file
fd, pwpath = tempfile.mkstemp(dir=v2v_tempdir, text=True)
os.write(fd, bytes(userpassword, 'utf-8'))
os.close(fd)
xml = virt_dom.XMLDesc(0)
xml_doc = etree.fromstring(xml)
device_info = vconfig.LibvirtConfigGuest()
device_info.parse_dom(xml_doc)
supported_source_types = ('file', 'block', 'lvm', 'network')
next_nbd_dev = None
for disk in device_info.devices:
if disk.root_name != "disk":
continue
if disk.target_dev is None:
continue
if disk.source_type not in supported_source_types:
raise Exception("Cannot yet support disk " \
"type %s" % disk.source_type)
elif disk.source_type == 'network':
if disk.source_protocol == 'rbd':
rbdvolumes.append(disk.source_name or disk.source_path)
else:
raise Exception("Cannot yet support disk " \
"type %s with protocol %s" % (
disk.source_type, disk.source_protocol))
elif disk.source_type == 'block':
disk_path = disk.source_name or disk.source_path
try:
scsi = os_brick_linuxscsi.LinuxSCSI(None)
scsi_wwn = scsi.get_scsi_wwn(disk_path)
if scsi_wwn:
disk_path = os.path.join('/dev', 'mapper',
scsi_wwn)
except:
pass
devpaths.append(disk_path)
else:
devpaths.append(disk.source_name or disk.source_path)
for r in rbdvolumes:
backend_driver = Backend().get(
"rbdboot",
path=rbdvolumes[0],
virt_driver=self.virt_driver,
)
args = [r]
kwargs = {}
key_user, key_file, cephdir, out, err = \
backend_driver.rbd_keyring_search_and_execute(
"map", *args, **kwargs)
devpaths.append(out.strip())
disk_dev_map = _create_softlinks(v2v_tempdir, vmname, devpaths)
if migration_type == 'warm' or migration_type == 'dryrun':
# Convert the boot disk without copying the data
if convert_only:
for disk, dev in disk_dev_map.items():
# if not boot disk(first disk), then skip
if disk != 'sda':
continue
cmd = tuple(virt_v2v_exe.split())
# Add below options if trying to debug
# cmd = cmd + ("-x", "-v")
cmd = cmd + ("-i", "disk",
"--no-copy", "-o", "null",
"--debug-overlays", dev)
_execute_cmd(cmd, tracking_file,
env={'VIRT_V2V_TMPDIR': v2v_tempdir,
'LIBGUESTFS_BACKEND': 'direct'},
retry=True)
# commiting the overlay image
overlay_filepath = "%s.qcow2" % (dev)
commit_cmd = ("sudo", "/usr/bin/nova-rootwrap",
"/etc/nova/rootwrap.conf")
commit_cmd += ("qemu-img", "commit", "-p", "-f",
"qcow2", overlay_filepath)
utils.update_progress(tracking_file,
"Committing overlay\n")
_execute_cmd(commit_cmd, tracking_file)
utils.update_progress(tracking_file,
"Commit finished\n")
else:
for disk, dev in disk_dev_map.items():
# create nbdkit socket file path
nbdkit_sock_file = os.path.join(tempfile.mkdtemp(
dir=v2v_tempdir), 'nbdkit.sock')
guest_disk_path = guestvm_info[vmname]['disks'][
disk]['diskpath']
vm_moref = guestvm_info[vmname]['moref']
extentfile = guestvm_info[vmname]['disks'][
disk].get('extentfile', None)
# Create an NBD server locally
source_file = 'nbd+unix:///?socket=%s'%(
nbdkit_sock_file)
target_file = dev
cmd = tuple(nbdkit_cmd.split())
cmd = cmd + ("-U", nbdkit_sock_file,
"server=%s" % o.hostname,
"user=%s" % vcenter_user,
"vm=%s" % ('moref='+vm_moref),
"file=%s" % guest_disk_path,
"libdir=%s" % vddk_lib64_path,
"thumbprint=%s" % thumbprint,
"password=%s" % userpassword)
LOG.info('Executing command: {}'.format(
utils.sanitize_message(cmd)))
_execute_cmd(cmd, tracking_file)
LOG.info('NBD server running..')
if not extentfile:
convert_cmd = ("sudo", "/usr/bin/nova-rootwrap",
"/etc/nova/rootwrap.conf")
convert_cmd += ("qemu-img", "convert", "-p",
source_file, target_file)
utils.update_progress(tracking_file,
"Uploading disk\n")
LOG.info('Executing command: {}'.format(
utils.sanitize_message(convert_cmd)))
_execute_cmd(convert_cmd, tracking_file)
utils.update_progress(tracking_file,
"Upload finished\n")
else:
# connect the NBD and mount to local dev
nbd_devices = find_available()
if len(nbd_devices) == 0:
msg_ = "Could not find available NBD"\
" devices for mapping\n"
LOG.exception(msg_)
raise Exception(msg_)
next_nbd_dev = nbd_devices[0]
nbd_cmd = "sudo /usr/bin/nova-rootwrap "\
"/etc/nova/rootwrap.conf "\
"qemu-nbd -c %s %s -r" %(
next_nbd_dev, source_file)
nbd_cmd = tuple(nbd_cmd.split())
utils.update_progress(tracking_file,
"Connecting NBD device\n")
LOG.info('Executing command: {}'.format(
utils.sanitize_message(nbd_cmd)))
_execute_cmd(nbd_cmd, tracking_file)
utils.update_progress(tracking_file,
"Connected to NBD device\n")
# copying the incremental bits to the volume
changearea = read_extentfile(extentfile)
max_workers = 16
# Create a ThreadPool with the specified number of workers
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers) as executor:
futures = []
for start_byte, byte_count in changearea:
dd_cmd = "dd if=%s of=%s conv=notrunc "\
"bs=512 seek=%s skip=%s "\
"count=%s" % (
next_nbd_dev,
target_file,
start_byte,
start_byte,
byte_count)
dd_cmd = dd_cmd.split()
# Submit dd commands as tasks to the ThreadPool
futures.append(executor.submit(
run_dd_command, dd_cmd))
# Wait for all tasks to complete
concurrent.futures.wait(futures)
LOG.info("All dd commands completed.")
LOG.info("Disconnecting NBD device %s" % str(
next_nbd_dev))
nbd_cmd = "sudo /usr/bin/nova-rootwrap "\
"/etc/nova/rootwrap.conf "\
"qemu-nbd -d %s" % next_nbd_dev
nbd_cmd = tuple(nbd_cmd.split())
utils.update_progress(tracking_file,
"Disonnecting NBD device\n")
LOG.info('Executing command: {}'.format(
utils.sanitize_message(nbd_cmd)))
_execute_cmd(nbd_cmd, tracking_file)
utils.update_progress(tracking_file,
"Disconnected to NBD device\n")
next_nbd_dev = None
else:
LOG.debug(CONF.rootwrap_config)
cmd = tuple(virt_v2v_exe.split())
# Add below options if trying to debug
# cmd = cmd + ("-x", "-v")
cmd = cmd + ("-i", "libvirt",
"-ic", vcenter_url, "-o", "local", "-os",
v2v_tempdir, "-ip", pwpath,)
if os.path.exists(vddk_lib_path):
cmd += ("-it", "vddk",
"-io", "vddk-libdir=%s" % vddk_lib_path,
"-io", "vddk-thumbprint=%s" % thumbprint,)
cmd += ("--", vmname,)
LOG.info('Executing command: {}'.format(utils.sanitize_message(cmd)))
_execute_cmd(cmd, tracking_file, env={
'LIBGUESTFS_BACKEND': 'direct'})
except Exception as ex:
LOG.exception(ex)
raise
finally:
shutil.rmtree(v2v_tempdir, ignore_errors=True)
# disconnect NBD
if next_nbd_dev:
LOG.info("Disconnecting NBD device %s" % str(next_nbd_dev))
nbd_cmd = "sudo /usr/bin/nova-rootwrap "\
"/etc/nova/rootwrap.conf "\
"qemu-nbd -d %s" % next_nbd_dev
nbd_cmd = tuple(nbd_cmd.split())
utils.update_progress(tracking_file,
"Disonnecting NBD device\n")
LOG.info('Executing command: {}'.format(
utils.sanitize_message(nbd_cmd)))
_execute_cmd(nbd_cmd, tracking_file)
utils.update_progress(tracking_file,
"Disconnected to NBD device\n")
LOG.info("Unmapping %s" % str(rbdvolumes))
for r in rbdvolumes:
backend_driver = Backend().get(
"rbdboot",
path=rbdvolumes[0],
virt_driver=self.virt_driver,
)
args = [r]
kwargs = {}
key_user, key_file, cephdir, out, err = \
backend_driver.rbd_keyring_search_and_execute(
"unmap", *args, **kwargs)
return {"result": "success"}