Repository URL to install this package:
Version:
4.2.64.9 ▾
|
python3-tvault-contego
/
usr
/
lib
/
python3
/
dist-packages
/
contego
/
nova
/
extension
/
driver
/
libvirtdriver.py
|
---|
# Copyright 2014 TrilioData Inc.
# All Rights Reserved.
import base64
import json
import shutil
import time
try:
from eventlet import sleep
except ImportError:
from time import sleep
import os
import libvirt
import libvirt_qemu
from collections import defaultdict
from eventlet import greenthread
import defusedxml.ElementTree as etree
from oslo_concurrency import processutils
from xml.etree.ElementTree import Element, SubElement, tostring
import re
try:
from oslo_config import cfg
except ImportError:
from oslo.config import cfg
try:
from oslo_log import log as logging
except ImportError:
from nova.openstack.common import log as logging
try:
from oslo_serialization import jsonutils
except ImportError:
from nova.openstack.common import jsonutils
from nova import exception
from contego.exception import wrap_exception
from nova.virt.libvirt import driver as nova_driver
from nova.virt.libvirt import config as vconfig
import nova.virt.libvirt.utils as libvirt_utils
LOG = logging.getLogger(__name__)
try:
from nova import volume as cinder
except BaseException as ex:
LOG.warn(ex)
pass
try:
from nova.volume import cinder as cinder1
except BaseException as ex:
LOG.warn(ex)
pass
from contego.nova.extension.driver import qemuimages
from contego.nova.extension.driver.diskfilesdrive import DiskfilesDriveBuilder
from contego.nova.extension.driver.backends.backend import Backend
from contego.nova.extension.driver.backends.nbd import NbdMount
from contego import utils
from . import vault
from . import loopingcall
import subprocess
contego_driver_opts = [
cfg.IntOpt(
"qemu_agent_ping_timeout",
default=300,
help="The number of seconds to wait to qemu agent to be up and running.",
)
]
CONF = cfg.CONF
CONF.register_opts(contego_driver_opts)
class ChunkedFile(object):
"""
We send this back to the as
something that can iterate over a large file
"""
CHUNKSIZE = 65536
def __init__(self, filepath):
self.filepath = filepath
self.fp = open(self.filepath, "rb")
def __iter__(self):
"""Return an iterator over the image file"""
try:
if self.fp:
while True:
chunk = self.fp.read(ChunkedFile.CHUNKSIZE)
if chunk:
yield chunk
else:
break
finally:
self.close()
def close(self):
"""Close the internal file pointer"""
if self.fp:
self.fp.close()
self.fp = None
def cooperative_iter(iter):
"""
Return an iterator which schedules after each
iteration. This can prevent eventlet thread starvation.
:param iter: an iterator to wrap
"""
try:
for chunk in iter:
sleep(0)
yield chunk
except Exception as err:
msg = ("Error: cooperative_iter exception %s") % err
LOG.error(msg)
raise
class LibvirtDriver(nova_driver.LibvirtDriver):
def __init__(self, virt_driver, read_only=False):
super(LibvirtDriver, self).__init__(virt_driver)
self.virt_driver = virt_driver
@wrap_exception()
def vast_prepare(self, context, instance_uuid, instance_ref, params):
try:
instance_name = self._get_instance_name_by_uuid(instance_uuid)
(
file_disk_devices,
rbd_disk_devices,
cinder_disk_devices,
) = self._get_device_categories(
context, instance_uuid, instance_ref, params
)
if len(rbd_disk_devices):
backend_driver = Backend().get(
"rbdboot",
path=rbd_disk_devices[0].find("source").get("name"),
virt_driver=self.virt_driver,
)
backend_driver.prepare_snapshot(
rbd_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
if len(file_disk_devices) > 0:
backend_driver = Backend().get("file", virt_driver=self.virt_driver)
backend_driver.prepare_snapshot(
file_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
if len(cinder_disk_devices):
backend_driver = Backend().get("cinder", virt_driver=self.virt_driver)
backend_driver.prepare_snapshot(
cinder_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
except Exception as ex:
LOG.exception(ex)
raise
@wrap_exception()
def vast_freeze(self, context, instance_uuid, instance_ref, params):
self._wait_for_guest_agent(context, instance_uuid)
return self._quiesce(context, instance_uuid, True)
@wrap_exception()
def vast_thaw(self, context, instance_uuid, instance_ref, params):
return self._quiesce(context, instance_uuid, False)
@wrap_exception()
def vast_instance(self, context, instance_uuid, instance_ref, params):
try:
instance_name = self._get_instance_name_by_uuid(instance_uuid)
disks_info = []
(
file_disk_devices,
rbd_disk_devices,
cinder_disk_devices,
) = self._get_device_categories(
context, instance_uuid, instance_ref, params
)
if len(rbd_disk_devices) > 0:
backend_driver = Backend().get(
"rbdboot",
path=rbd_disk_devices[0].find("source").get("name"),
virt_driver=self.virt_driver,
)
rbd_disks_info = backend_driver.create_snapshot(
rbd_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
disks_info.extend(rbd_disks_info)
if len(file_disk_devices) > 0:
backend_driver = Backend().get("file", virt_driver=self.virt_driver)
file_disks_info = backend_driver.create_snapshot(
file_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
disks_info.extend(file_disks_info)
if len(cinder_disk_devices) > 0:
backend_driver = Backend().get("cinder", virt_driver=self.virt_driver)
cinder_disks_info = backend_driver.create_snapshot(
cinder_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
disks_info.extend(cinder_disks_info)
except Exception as ex:
LOG.exception(ex)
if hasattr(ex, "response"):
payload = json.loads(ex.response.text)
if "overLimit" in payload:
msg = "Quota Exceeded - " + payload["overLimit"]["message"]
raise exception.QuotaError(msg)
raise
return {"disks_info": disks_info}
@wrap_exception()
def vast_get_info(self, context, instance_uuid, instance_ref, params):
try:
updated_disks_info = []
if params and "disks_info" in params:
disks_info = params["disks_info"]
for disk_info in disks_info:
backend_driver = Backend().get(
disk_info["backend"],
path=disk_info["path"],
virt_driver=self.virt_driver,
)
updated_disks_info.append(
backend_driver.update_snapshot_info(disk_info)
)
except Exception as ex:
LOG.exception(ex)
raise
return {"disks_info": updated_disks_info}
@wrap_exception()
def vast_data_transfer(self, context, instance_uuid, instance_ref, params):
try:
instance_name = self._get_instance_name_by_uuid(instance_uuid)
disk_info = params["disk_info"]
backend_driver = Backend().get(
disk_info["backend"],
path=disk_info["path"],
virt_driver=self.virt_driver,
)
backend_driver.upload_snapshot(
disk_info,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
context=context,
params=params,
)
except Exception as ex:
LOG.exception(ex)
if hasattr(ex, "response"):
payload = json.loads(ex.response.text)
if "overLimit" in payload:
msg = "Quota Exceeded - " + payload["overLimit"]["message"]
raise exception.QuotaError(msg)
raise
return {"result": "success"}
@wrap_exception()
def vast_commit_image(self, context, instance_uuid, instance_ref, params):
commit_image_list = params["commit_image_list"]
secret_uuid = params['metadata'].get('secret_uuid')
for commit_images in commit_image_list:
for path, backing_path in zip(*[iter(commit_images)] * 2):
try:
vault_path = path
backing_vault_path = backing_path
image_info = qemuimages.qemu_img_info(vault_path)
image_backing_info = qemuimages.qemu_img_info(backing_vault_path)
# crosscheck if backing file of the image exists
if not image_info.backing_file:
continue
# increase the size of the base image
if image_backing_info.virtual_size < image_info.virtual_size:
qemuimages.resize_image(
backing_vault_path, image_info.virtual_size
)
qemuimages.commit_qcow2(vault_path, secret_uuid=secret_uuid, run_as_root=False)
except Exception as ex:
LOG.exception(ex)
raise
return {"result": "success"}
@wrap_exception()
def vast_check_prev_snapshot(self, context, instance_uuid, instance_ref, params):
try:
instance_name = self._get_instance_name_by_uuid(instance_uuid)
disk_info = params["disk_info"]
backend_driver = Backend().get(
disk_info["backend"],
path=disk_info["path"],
virt_driver=self.virt_driver,
)
status = backend_driver.check_prev_snapshot(
disk_info,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
context=context,
params=params,
)
except Exception as ex:
LOG.exception(ex)
raise
return status
@wrap_exception()
def vast_async_task_status(self, context, instance_uuid, instance_ref, params):
try:
result = vault.get_async_task_status(context, params["metadata"])
except Exception as ex:
LOG.exception(ex)
raise
return result
@wrap_exception()
def vast_finalize(self, context, instance_uuid, instance_ref, params):
instance_name = self._get_instance_name_by_uuid(instance_uuid)
if "disks_info" in params:
disks_info = params["disks_info"]
for disk_info in disks_info:
try:
backend_driver = Backend().get(
disk_info["backend"],
path=disk_info["path"],
virt_driver=self.virt_driver,
)
backend_driver.delete_snapshot(
disk_info,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
except Exception as ex:
LOG.debug(("Cannot delete snapshot %s"), disk_info["path"])
LOG.exception(ex)
else:
LOG.warning(
(
"Cannot delete snapshot. No Disk Information for instance: %s, UUID %s, ref %s"
),
instance_name,
instance_uuid,
instance_ref,
)
return
@wrap_exception()
def vast_reset(self, context, instance_uuid, instance_ref, params):
try:
instance_name = self._get_instance_name_by_uuid(instance_uuid)
disks_info = []
if not instance_ref:
# at the least cleanup boot device
backend_driver = Backend().get(
"rbdboot",
path=os.path.join(
CONF.libvirt.images_rbd_pool, instance_uuid + "_disk"
),
virt_driver=self.virt_driver,
)
rbd_disks_info = backend_driver.reset_snapshot(
[],
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
else:
(
file_disk_devices,
rbd_disk_devices,
cinder_disk_devices,
) = self._get_device_categories(
context, instance_uuid, instance_ref, params
)
if len(rbd_disk_devices) > 0:
backend_driver = Backend().get(
"rbdboot",
path=rbd_disk_devices[0].find("source").get("name"),
virt_driver=self.virt_driver,
)
rbd_disks_info = backend_driver.reset_snapshot(
rbd_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
disks_info.extend(rbd_disks_info)
if len(file_disk_devices) > 0:
backend_driver = Backend().get("file", virt_driver=self.virt_driver)
file_disks_info = backend_driver.reset_snapshot(
file_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
disks_info.extend(file_disks_info)
if len(cinder_disk_devices) > 0:
backend_driver = Backend().get(
"cinder", virt_driver=self.virt_driver
)
cinder_disks_info = backend_driver.reset_snapshot(
cinder_disk_devices,
context=context,
instance_uuid=instance_uuid,
instance_name=instance_name,
instance_ref=instance_ref,
params=params,
)
disks_info.extend(cinder_disks_info)
except Exception as ex:
LOG.exception(ex)
raise
return {"disks_info": disks_info}
@wrap_exception()
def map_snapshot_files(self, context, instance_uuid, instance_ref, params):
class open_guest_file:
def __init__(self, path, mode, timeout=60):
self.path = path
self.mode = mode
self.timeout = timeout
def __enter__(self):
# open the /etc/os-release for reading
command = {
"execute": "guest-file-open",
"arguments": {"path": self.path, "mode": self.mode},
}
command = json.dumps(command)
status = "Reading " + self.path
LOG.debug(("%s in %s") % (status, domain_name))
ret = libvirt_qemu.qemuAgentCommand(virt_dom, command, self.timeout, 0)
result = jsonutils.loads(ret)
if result.get("error", None):
msg = (
"Error from qemu-guest-agent while " + status + " of "
"%(instance_name)s: %(error)s"
) % {"instance_name": domain_name, "error": ret}
LOG.debug(msg)
raise Exception("File not found")
self.file_handle = result["return"]
return self
def __exit__(self, type, value, traceback):
try:
command = {
"execute": "guest-file-close",
"arguments": {"handle": self.file_handle},
}
command = json.dumps(command)
ret = libvirt_qemu.qemuAgentCommand(
virt_dom, command, self.timeout, 0
)
except BaseException as bex:
LOG.warn(bex)
pass
def write(self, data):
command = {
"execute": "guest-file-write",
"arguments": {"handle": self.file_handle, "buf-b64": data},
}
command = json.dumps(command)
ret = libvirt_qemu.qemuAgentCommand(virt_dom, command, self.timeout, 0)
result = jsonutils.loads(ret)["return"]
return result
def _get_power_state(virt_dom):
dom_info = virt_dom.info()
state = nova_driver.LIBVIRT_POWER_STATE[dom_info[0]]
return state
def _is_guest_linux():
# This function will determine if the guest os is linux or windows
# Since qemu guest agent does not have a command to get this info
# we will try to read /etc/os-release file. If the file exists then
# we will determine that the OS is linux, otherwise Windows
#
try:
with open_guest_file("/etc/os-release", "r") as f:
return True
except Exception as ex:
error_code = -1
if hasattr(ex, "get_error_code"):
error_code = ex.get_error_code()
msg = (
"Error from libvirt while reading /etc/os-release from "
"%(instance_name)s: [Error Code %(error_code)s] %(ex)s"
) % {"instance_name": domain_name, "error_code": error_code, "ex": ex, }
LOG.debug(msg)
return False
def _copy_diskdrives_fileto_guest(diskfiles):
# This function will write /mnt/tvault-mounts/metadata/diskfiles
# this file defines how backup images map to each VMs in the backup
# job
try:
# open the /mnt/tvault-mounts/metadata/diskfiles for writing
with open_guest_file("/mnt/tvault-mounts/metadata/diskfiles", "w+") as f:
# encode the diskfiles data into base64
diskfiles_b64 = base64.b64encode(
json.dumps(disksmetadata).encode("utf-8")
).decode("utf-8")
result = f.write(diskfiles_b64)
assert result["eof"] is False
if result["count"] < len(json.dumps(diskfiles)):
msg = (
"the amount of data written to /mnt/tvault-mounts/metadata/diskfiles is less than "
"len of diskfiles. Expected %s, actual %s"
) % (str(len(diskfiles)), str(result["count"]))
raise exception.NovaException(msg)
except Exception as ex:
error_code = -1
if hasattr(ex, "get_error_code"):
error_code = ex.get_error_code()
msg = (
"Error from libvirt while writing /mnt/tvault-mounts/metadata/diskfiles of "
"%(instance_name)s: [Error Code %(error_code)s] %(ex)s"
) % {"instance_name": domain_name, "error_code": error_code, "ex": ex, }
LOG.info(msg)
raise
def _copy_filemanager_scripts_to_guest():
try:
scripts_path = os.path.join(
os.path.dirname(__file__), "filemanager_scripts"
)
contego_version = "#2.5.0"
n = "fm_scripts_version.py"
with open(os.path.join(scripts_path, n), "r") as lf:
contego_version = lf.read().strip()
try:
with open_guest_file(
os.path.join("/mnt/tvault-mounts/metadata", contego_version), "r"
) as f:
return
except BaseException as bex:
LOG.warn(bex)
pass
for n in os.listdir(scripts_path):
# including only py files. pyc files will create problem
# during encoding
if n.endswith(".py"):
# open the /mnt/tvault-mounts/metadata'n' for writing
with open(os.path.join(scripts_path, n), "r") as lf:
script = lf.read()
with open_guest_file(
os.path.join("/mnt/tvault-mounts/metadata", n), "w+"
) as f:
# encode the diskfiles data into base64
scripts_b64 = base64.b64encode(script.encode("utf-8"))
result = f.write(scripts_b64.decode("utf-8"))
assert result["eof"] is False
if result["count"] < len(script):
msg = (
"the amount of data written to %s is less than "
"len of local script. Expected %s, actual %s"
) % (
os.path.join("/mnt/tvault-mounts/metadata", n),
str(len(script)),
str(result["count"]),
)
raise exception.NovaException(msg)
with open_guest_file(
os.path.join("/mnt/tvault-mounts/metadata", contego_version), "w+"
) as f:
pass
except Exception as ex:
error_code = -1
if hasattr(ex, "get_error_code"):
error_code = ex.get_error_code()
msg = (
"Error from libvirt while writing /mnt/tvault-mounts/metadata/%(n)s of "
"%(instance_name)s: [Error Code %(error_code)s] %(ex)s"
) % {
"n": n,
"instance_name": domain_name,
"error_code": error_code,
"ex": ex,
}
LOG.info(msg)
raise
def _ensure_overlay_directory():
utils.ensure_tree(
os.path.join(CONF.instances_path, "contego_mount_overlays")
)
def _create_overlay_file(disk, secret_uuid):
_ensure_overlay_directory()
overlay_filename = os.path.split(disk)[1] + ".overlay"
abs_overlay_filename = os.path.join(
CONF.instances_path, "contego_mount_overlays", overlay_filename
)
try:
os.remove(abs_overlay_filename)
except BaseException as bex:
LOG.warn(bex)
pass
info = qemuimages.qemu_img_info(disk)
if secret_uuid:
qemuimages.create_cow_image(disk, abs_overlay_filename, size=info.virtual_size,payload=secret_uuid)
else:
qemuimages.create_cow_image(disk, abs_overlay_filename, info.virtual_size)
return abs_overlay_filename
def _get_guest_agent_script_path():
# This method will determine the guest agent script based on known paths
# In few distributions this path is "/etc/qemu-ga/" or "/etc/qemu/"
guest_agent_script_path = None
try:
with open_guest_file("/etc/qemu-ga/fsfreeze-hook", 'r') as fil:
return "/etc/qemu-ga/fsfreeze-hook"
except Exception as ex:
LOG.warn(ex)
pass
try:
with open_guest_file("/etc/qemu/fsfreeze-hook", 'r') as fil:
return "/etc/qemu/fsfreeze-hook"
except Exception as ex:
LOG.warn(ex)
pass
if guest_agent_script_path is None:
raise Exception('Not able to detect qemu guest agent hook path')
def _generate_device_xml(virt_dom, disknum, hostbus, overlay_file, secret_uuid=None, partition=None):
diskelement = Element("disk")
diskelement.set("type", "block" if secret_uuid else "file")
diskelement.set("device", "disk")
driver = SubElement(diskelement, "driver")
driver.set("name", "qemu")
driver.set("type", "raw" if secret_uuid else "qcow2")
source = SubElement(diskelement, "source")
if secret_uuid and partition:
source.set("dev", "/dev/"+partition)
else:
source.set("file", overlay_file)
# SubElement(diskelement, "readonly")
target = SubElement(diskelement, "target")
target.set("dev", disknum)
target.set("bus", hostbus)
devxml = etree.tostring(diskelement).decode()
status = virt_dom.attachDeviceFlags(
devxml,
libvirt.VIR_DOMAIN_AFFECT_CONFIG
| libvirt.VIR_DOMAIN_AFFECT_LIVE,
)
return status
def _update_fsfreeze_script_on_guest():
try:
guest_script_path = _get_guest_agent_script_path()
scripts_path = os.path.join(
os.path.dirname(__file__), "filemanager_scripts"
)
script_path = os.path.join(scripts_path, 'fsfreeze.sh')
with open(script_path, 'r') as script_fil:
script_data = script_fil.read()
with open_guest_file(guest_script_path, "w+") as guest_file:
script_data_b64 = base64.b64encode(script_data.encode("utf-8"))
result = guest_file.write(script_data_b64.decode("utf-8"))
assert result["eof"] is False
if result["count"] < len(script_data):
msg = (
"the amount of data written to %s is less than "
"len of local script. Expected %s, actual %s"
) % (
guest_script_path,
str(len(script_data)),
str(result["count"]),
)
raise exception.NovaException(msg)
except Exception as ex:
error_code = -1
if hasattr(ex, "get_error_code"):
error_code = ex.get_error_code()
msg = (
"Error from libvirt while writing to "
"%(instance_name)s: [Error Code %(error_code)s] %(ex)s"
) % {
"instance_name": domain_name,
"error_code": error_code,
"ex": ex,
}
LOG.info(msg)
raise
def _write_mount_metadata(mount_metadata, instance_uuid):
overlay_file_dir = os.path.join(
CONF.instances_path, "contego_mount_overlays"
)
if os.path.isdir(overlay_file_dir):
frm_metadata_path = os.path.join(
overlay_file_dir, "frm_{}".format(instance_uuid)
)
with open(frm_metadata_path, "w") as fh:
fh.write(json.dumps(mount_metadata))
try:
backend_driver = Backend().get("file", virt_driver=self.virt_driver)
pervmdisks = params["diskfiles"]
domain_name = instance_ref['OS-EXT-SRV-ATTR:instance_name']
virt_dom = self.virt_driver._conn.lookupByName(domain_name)
if not virt_dom:
raise Exception("Cannot find virt_dom")
assert virt_dom.UUIDString() == instance_uuid
# Wait for guest agent to be pingable
success = self._wait_for_guest_agent(context, instance_uuid)
_update_fsfreeze_script_on_guest()
try:
# Freeze to make sure everything is clean and update FsFreeze hooks
self._quiesce(context, instance_uuid, True)
sleep(20)
# Thaw VM
self._quiesce(context, instance_uuid, False)
except Exception as ex:
LOG.exception(ex)
xml = virt_dom.XMLDesc(0)
doc = etree.fromstring(xml)
# detach all disks
disks = doc.findall("devices/disk/target")
disk_prefix = disks[0].get("dev")[:2]
hostbus = disks[0].get("bus")
diskstr = []
for i in range(0, 26):
diskstr.append(disk_prefix + chr(ord("a") + i))
for j in range(0, 20):
for i in range(0, 26):
diskstr.append(disk_prefix + chr(ord("a") + j) + chr(ord("a") + i))
for d in disks:
try:
diskstr.remove(d.get("dev"))
except BaseException as ex:
LOG.warn(ex)
pass
# Wait for guest agent to be pingable
success = self._wait_for_guest_agent(context, instance_uuid)
if not success:
msg = (
(
"Error: Waiting for guest agent timedout \
for instance %s"
)
% domain_name
)
raise exception.NovaException(msg)
linux_guest = _is_guest_linux()
snapshot_id = params.get("snapshot_id")
mount_metadata = defaultdict(dict)
# add any new disk images here
try:
directories = set()
for vm, disks in pervmdisks.items():
for disk in disks["vault_path"]:
directories.add(os.path.dirname(disk))
backend_driver.configure_security_profile(
instance_uuid, list(directories)
)
disksmetadata = {}
diskstr.reverse()
disknum = diskstr.pop()
vdxp = re.compile("_vd[a-z]")
for vm, disks in pervmdisks.items():
if vm not in disksmetadata:
disksmetadata[vm] = {"vm_name": disks["vm_name"]}
for disk in disks["vault_path"]:
secret_uuid = params.get('metadata').get('secret_uuid')
overlay_file = _create_overlay_file(disk, secret_uuid)
unused_nbd_dev = None
info = qemuimages.qemu_img_info(overlay_file)
if secret_uuid:
if not (utils.parse_encrypted_image_backing_file(info.backing_file) == disk):
LOG.exception('Backing file of overlay file does not match with Target.')
raise
nbd_obj = NbdMount(overlay_file, None)
unused_nbd_dev = nbd_obj._find_unused(nbd_obj._detect_nbd_devices())
status = nbd_obj._create_nbd_device_as_fdisk(unused_nbd_dev, secret_uuid, overlay_file)
if status:
LOG.info('successfully created nbd device with overlay file.')
status = _generate_device_xml(virt_dom, disknum, hostbus, overlay_file, secret_uuid,
unused_nbd_dev)
LOG.info('Status of Libvirt attaching Disk to FRM: {0}'.format(status))
time.sleep(5)
else:
# if nbd device is failed to connect, then
# perform a cleanup and raise error
if "metadata" not in mount_metadata[snapshot_id]:
mount_metadata[snapshot_id] = {"metadata": {}}
mount_metadata[snapshot_id]["metadata"].update(
{overlay_file: unused_nbd_dev}
)
if mount_metadata:
_write_mount_metadata(mount_metadata, instance_uuid)
self._frm_mount_cleanup(instance_uuid, snapshot_id)
raise Exception("Failed to connect nbd device: {} and overlayfile: {}".format(
unused_nbd_dev, overlay_file)
)
else:
if not (info.backing_file == disk):
LOG.exception('Backing file of overlay file does not match with Target.')
raise
devxml = _generate_device_xml(virt_dom, disknum, hostbus, overlay_file)
if "metadata" not in mount_metadata[snapshot_id]:
mount_metadata[snapshot_id] = {"metadata": {}}
mount_metadata[snapshot_id]["metadata"].update(
{overlay_file: unused_nbd_dev}
)
disksmetadata[vm][disk] = disknum
disknum = diskstr.pop()
if mount_metadata:
mount_metadata[snapshot_id].update({"mount_status": True})
_write_mount_metadata(mount_metadata, instance_uuid)
# create metadata associated with all the attached disk
if linux_guest:
_copy_diskdrives_fileto_guest(disksmetadata)
_copy_filemanager_scripts_to_guest()
except Exception as err:
msg = ("Error: Cannot map snapshot volume images - %s %s") % (
disk,
err,
)
LOG.error(msg)
raise err
finally:
try:
backend_driver.reset_security_profile(
instance_uuid, list(directories)
)
except Exception as err2:
msg = ("Error: resetting app armor profile %s") % err
LOG.error(msg)
raise err2
try:
# Unfreeze to make sure everything is clean
self._quiesce(context, instance_uuid, False)
except BaseException:
sleep(60)
sleep(30)
try:
self._quiesce(
context,
instance_uuid,
True,
timeout=libvirt_qemu.VIR_DOMAIN_QEMU_AGENT_COMMAND_BLOCK,
)
except BaseException:
msg = (
"Error calling fsfreeze freeze hook successfully. Continuing..."
)
LOG.error(msg)
sleep(10)
try:
self._quiesce(context, instance_uuid, False)
except BaseException:
msg = "Error calling fsfreeze thaw hook successfully. Continuing..."
LOG.error(msg)
pass
except Exception as ex:
LOG.exception(ex)
raise
return
def _ping_guest_agent(self, context, instance_uuid):
instance_name = self._get_instance_name_by_uuid(instance_uuid)
command = '{"execute":"guest-ping"}'
status = "pinging guest agent"
LOG.debug(("pinging guest agent in %s"), instance_name)
try:
domain = self.virt_driver._conn.lookupByName(instance_name)
ret = libvirt_qemu.qemuAgentCommand(domain, command, 60, 0)
except Exception as ex:
error_code = ex.get_error_code()
if error_code == 74:
# the guest agent is not configured
return
raise
result = jsonutils.loads(ret)
if result.get("error", None):
msg = (
"Error from qemu-guest-agent while " + status + " of "
"%(instance_name)s: %(error)s"
) % {"instance_name": instance_name, "error": ret}
raise exception.NovaException(msg)
def _wait_for_guest_agent(self, context, instance_uuid):
instance_name = self._get_instance_name_by_uuid(instance_uuid)
def _wait_for_ping():
"""Called at an interval until the VM is running again."""
try:
self._ping_guest_agent(context, instance_uuid)
raise loopingcall.LoopingCallDone()
except loopingcall.LoopingCallDone:
raise
except libvirt.libvirtError as ex:
error_code = ex.get_error_code()
if error_code == 86:
msg = "Still waiting for guest agent to be up and running"
LOG.debug(msg)
else:
msg = (
"Error from libvirt while pinging guest agent of "
"%(instance_name)s: [Error Code %(error_code)s] %(ex)s"
) % {
"instance_name": instance_name,
"error_code": error_code,
"ex": ex,
}
LOG.error(msg)
LOG.exception(ex)
raise
try:
timer = loopingcall.FixedIntervalLoopingCall(_wait_for_ping)
timer.start(
interval=5, max_duration=max(CONF.qemu_agent_ping_timeout, 300)
).wait()
return True
except Exception:
return False
def _quiesce(self, context, instance_uuid, quiesce, timeout=60):
instance_name = self._get_instance_name_by_uuid(instance_uuid)
if quiesce:
command = '{"execute": "guest-fsfreeze-freeze"}'
status = "freezing guest filesystems"
LOG.debug(("freezing guest filesystems of %s"), instance_name)
else:
command = '{"execute": "guest-fsfreeze-thaw"}'
status = "thawing guest filesystems"
LOG.debug(("Thawing guest filesystems of %s"), instance_name)
try:
domain = self.virt_driver._conn.lookupByName(instance_name)
ret = libvirt_qemu.qemuAgentCommand(domain, command, timeout, 0)
except Exception as ex:
error_code = ex.get_error_code()
msg = (
"Error from libvirt while " + status + " of "
"%(instance_name)s: [Error Code %(error_code)s] %(ex)s"
) % {"instance_name": instance_name, "error_code": error_code, "ex": ex, }
LOG.warning(msg)
return
result = jsonutils.loads(ret)
if result.get("error", None):
msg = (
"Error from qemu-guest-agent while " + status + " of "
"%(instance_name)s: %(error)s"
) % {"instance_name": instance_name, "error": ret}
LOG.warning(msg)
def _get_device_categories(self, context, instance_uuid, instance_ref, params):
instance_name = self._get_instance_name_by_uuid(instance_uuid)
domain = self.virt_driver._conn.lookupByName(instance_name)
domain_xml = domain.XMLDesc(0)
doc = etree.fromstring(domain_xml)
lun_devices = doc.findall("devices/disk[@device='lun']")
if lun_devices and len(lun_devices) > 1:
msg = "LUN devices are not supported"
raise exception.NovaException(msg)
disk_devices = doc.findall("devices/disk[@device='disk']")
if not disk_devices or len(disk_devices) <= 0:
msg = "Did not find any disks attached to the instance"
raise exception.NovaException(msg)
file_disk_devices = []
rbd_disk_devices = []
cinder_disk_devices = []
for device in disk_devices:
disk_type = device.get("type")
if device.find("serial") is not None:
cinder_volume_id = device.find("serial").text
try:
_volume_api = cinder.API()
except BaseException:
_volume_api = cinder1.API()
try:
volume = _volume_api.get(context, cinder_volume_id)
if volume:
cinder_disk_devices.append(device)
else:
msg = (
"Unknown disk type %s mapped to the " "instance"
) % disk_type
raise exception.NovaException(msg)
except exception.CinderConnectionFailed as ex:
LOG.exception(ex)
raise ex
except Exception as ex:
LOG.exception(ex)
msg = ("Unknown disk type %s mapped to the instance") % disk_type
raise exception.NovaException(msg)
elif disk_type == "volume" and device.find("serial") is None:
source = device.find("source")
if "name" in list(source.keys()):
name = source.get("name")
if (
name.endswith("config")
or name.endswith("swap")
or "disk.eph" in name
):
continue
backend = device.find("source").get("pool")
if backend == "rbd" or backend == "ceph":
rbd_disk_devices.append(device)
elif disk_type == "network" and device.find("serial") is None:
source = device.find("source")
if "name" in list(source.keys()):
name = source.get("name")
if (
name.endswith("config")
or name.endswith("swap")
or "disk.eph" in name
):
continue
backend = device.find("source").get("protocol")
if backend == "rbd" or backend == "ceph":
rbd_disk_devices.append(device)
elif disk_type == "file" and device.find("serial") is None:
# we only support boot disks booting off of glance
# image on local disk
# Adding check for "Force config drive" and neglecting it.
source = device.find("source")
if "file" in list(source.keys()):
file = source.get("file")
if (
file.endswith("config")
or file.endswith("swap")
or "disk.eph" in file
):
continue
file_disk_devices.append(device)
else:
# block device
msg = "Unknown disk type %s mapped to the instance" % disk_type
raise exception.NovaException(msg)
return (file_disk_devices, rbd_disk_devices, cinder_disk_devices)
def _get_instance_name_by_uuid(self, instance_uuid):
for name in self.virt_driver.list_instances():
iuuid = self.virt_driver._conn.lookupByName(name).UUIDString()
if iuuid == instance_uuid:
return name
return None
def _frm_mount_cleanup(self, frminstance_id, snapshot_id):
def _cleanup(metadata):
failed_devices = {}
for overlay_file, nbd_device in metadata.items():
if nbd_device:
nbd_obj = NbdMount(overlay_file, None)
res = nbd_obj._disconnect_nbd_devices([nbd_device])
if not res:
LOG.error('Failed to disconnect nbd device: {0}'.format(nbd_device))
# if nbd device is not disconnected, do not delete overlay file
failed_devices.update({overlay_file: nbd_device})
# if nbd device is disconnected, delete overlay file
elif os.path.exists(overlay_file):
os.remove(overlay_file)
# no nbd device is associated with overlay file, delete the file
elif os.path.exists(overlay_file):
os.remove(overlay_file)
return failed_devices
is_cleaned = True
frm_metadata_path = os.path.join(
CONF.instances_path, "contego_mount_overlays",
"frm_{}".format(frminstance_id)
)
if not os.path.exists(frm_metadata_path):
LOG.error("The FRM metadata file is not present, skipping a nbd device cleanup")
return False
frm_metadata = {}
with open(frm_metadata_path, "r") as fh:
frm_metadata = json.loads(fh.read())
metadata = frm_metadata.get(snapshot_id, {}).get("metadata", {})
failed_devices = {}
for i in range(2):
failed_devices = _cleanup(metadata)
if failed_devices:
metadata = failed_devices
sleep(20)
continue
else:
break
clean_metadata_file = True
if failed_devices:
nbd_obj = NbdMount(None, None)
for overlay_file, nbd_device in failed_devices.items():
free_device = nbd_obj._find_unused([nbd_device])
if not free_device:
clean_metadata_file = False
is_cleaned = False
LOG.error('Failed to disconnect nbd device: {} and overlay file: {}'.format(
nbd_device, overlay_file))
# if nbd device cleanup is successful then remove the overlay file
elif os.path.exists(overlay_file):
os.remove(overlay_file)
if clean_metadata_file:
frm_metadata.pop(snapshot_id, None)
if not frm_metadata:
frm_metadata_path = os.path.join(
CONF.instances_path, "contego_mount_overlays",
"frm_{}".format(frminstance_id))
if os.path.exists(frm_metadata_path):
os.remove(frm_metadata_path)
return is_cleaned
@wrap_exception()
def copy_backup_image_to_volume(self, context, instance_uuid, instance_ref, params):
try:
domain_name = instance_ref['OS-EXT-SRV-ATTR:instance_name']
virt_dom = self.virt_driver._conn.lookupByName(domain_name)
if not virt_dom:
raise Exception("Cannot find virt_dom")
assert virt_dom.UUIDString() == instance_uuid
xml = virt_dom.XMLDesc(0)
xml_doc = etree.fromstring(xml)
device_info = vconfig.LibvirtConfigGuest()
device_info.parse_dom(xml_doc)
for guest_disk in device_info.devices:
if guest_disk.root_name != "disk":
continue
if guest_disk.target_dev is None:
continue
source_types = ("block", "lvm", "file")
if (
guest_disk.serial is not None
and guest_disk.serial == params["volume_id"]
): # noqa
if guest_disk.source_type == "network":
backend = guest_disk.source_protocol
elif guest_disk.source_type == "volume":
backend = guest_disk.source_pool
elif (
guest_disk.source_type in source_types
and guest_disk.source_device == "disk"
and guest_disk.serial
):
backend = "cinder"
path = guest_disk.source_name or guest_disk.source_path
backend_driver = Backend().get(
backend, path=path, virt_driver=self.virt_driver
)
backend_driver.copy_backup_image_to_volume(
context, instance_uuid, instance_ref["name"], params=params,
)
break
if (
guest_disk.target_dev in ("vda", "sda")
and guest_disk.source_type == "file"
and guest_disk.driver_format == "qcow2"
and guest_disk.serial is None
and params["image_id"] is not None
and params["image_overlay_file_path"] is not None
): # noqa
params["path"] = guest_disk.source_name or guest_disk.source_path
backend_driver = Backend().get(
"qcow2", virt_driver=self.virt_driver
)
backend_driver.copy_backup_image_to_volume(
context, instance_uuid, instance_ref["name"], params=params,
)
break
elif (
guest_disk.target_dev in ("vda", "sda")
and guest_disk.source_type == "network"
and guest_disk.source_protocol == "rbd"
and guest_disk.source_device == "disk"
and guest_disk.driver_format == "raw"
and guest_disk.serial is None
and params["volume_id"] is None
and params["image_id"] is not None
and params["image_overlay_file_path"] is not None
):
params["path"] = guest_disk.source_name or guest_disk.source_path
backend_driver = Backend().get(
"rbdboot", path=params["path"], virt_driver=self.virt_driver,
)
backend_driver.copy_backup_image_to_volume(
context, instance_uuid, instance_ref["name"], params=params,
)
break
except Exception as ex:
LOG.exception(ex)
raise
return {"result": "success"}
@wrap_exception()
def vast_disk_check(self, context, params):
disk_check = {'status': False}
try:
for disk in params['vm_disks']:
disk_info = qemuimages.qemu_img_info(disk)
if disk_info.file_format != 'qcow2':
continue
status = qemuimages.qemu_integrity_check(disk, params)
if status is False:
LOG.exception("Disk Integrity check failed for disk: %s" % (disk))
return disk_check
if status is None:
disk_check['status'] = None
return disk_check
disk_check['status'] = True
return disk_check
except Exception as ex:
LOG.exception(ex)
return disk_check
@wrap_exception()
def vast_clean_nbd_devices(self, context, instance_uuid, params):
if params.get("snapshot_id"):
return {"result": self._frm_mount_cleanup(instance_uuid, params["snapshot_id"])}
return {"result": True}