Repository URL to install this package:
|
Version:
4.1.94.1.dev3 ▾
|
python3-tvault-contego
/
usr
/
lib
/
python3
/
dist-packages
/
contego
/
nova
/
extension
/
driver
/
backends
/
backend.py
|
|---|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2015 TrilioData, Inc.
# All Rights Reserved.
import os
import re
import sys
import subprocess
import time
from threading import Thread
from tempfile import mkstemp
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty
try:
from oslo_log import log as logging
except ImportError:
from nova.openstack.common import log as logging
try:
from oslo_config import cfg
except ImportError:
from oslo.config import cfg
from contego import utils as contego_utils
from contego.nova.extension.driver import qemuimages
backend_opts = [
cfg.StrOpt(
"override_block",
default="cinder",
help="by default block disk_type is mapped to cinder"
" It can be override to lvm by this option",
),
cfg.StrOpt(
"contego_staging_dir",
default=None,
help="by default uses CONF.instances_path"
" It can be override depending on user configuration",
),
]
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.register_opts(backend_opts, "backends")
def enqueue_output(out, queue):
line = out.read(17)
while line:
line = out.read(17)
queue.put(line)
out.close()
class Backend(object):
def __init__(self):
self.BACKEND = {
"file": "contego.nova.extension.driver."
"backends.qcow2_backend.QCOW2Backend",
"qcow2": "contego.nova.extension.driver."
"backends.qcow2_backend.QCOW2Backend",
"cinder": "contego.nova.extension.driver."
"backends.cinder_backend.CinderBackend",
"rbd": "contego.nova.extension.driver.backends." "rdb_backend.RBDBackend",
"ceph": "contego.nova.extension.driver.backends." "rdb_backend.RBDBackend",
"rbdboot": "contego.nova.extension.driver.backends."
"rbdboot_backend.RBDBootBackend",
"cephboot": "contego.nova.extension.driver.backends."
"rbdboot_backend.RBDBootBackend",
"lvm": "contego.nova.extension.driver.backends."
"cinder_backend.LVMBackend",
}
def get(self, backend, **kwargs):
if backend in ("block", "iscsi", "cinder"):
backend = CONF.backends.override_block
backend_class_name = self.BACKEND.get(backend)
if not backend:
raise RuntimeError("Unknown backend type=%s" % backend)
parts = backend_class_name.split(".")
module = ".".join(parts[:-1])
backend_class = __import__(module)
for comp in parts[1:]:
backend_class = getattr(backend_class, comp)
return backend_class(**kwargs)
def configure_security_profile(self, instance_uuid, additional_files=None):
pass
def reset_security_profile(self, instance_uuid, additional_files=None):
pass
def create_snapshot(self, devices, **kwargs):
raise NotImplementedError("create_snapshot() method must be implemented")
def delete_snapshot(self, disk_info, **kwargs):
raise NotImplementedError("delete_snapshot() method must be implemented")
def check_prev_snapshot(self, disk_info, **kwargs):
raise NotImplementedError("check_prev_snapshot() method must be implemented")
def update_snapshot_info(self, disk_info, **kwargs):
raise NotImplementedError("update_snapshot_info() method must be implemented")
def upload_snapshot(self, disk_info, **kwargs):
raise NotImplementedError("update_snapshot() method must be implemented")
def _get_temp_file_path(self, remove=False):
"""Makes tmpfile under CONF.instances_path."""
dirpath = CONF.backends.contego_staging_dir or CONF.instances_path
fd, tmp_file = mkstemp(dir=dirpath)
os.close(fd)
if remove:
os.remove(tmp_file)
return tmp_file
def get_distro_version(self):
kargs = {}
with open("/etc/os-release", "r") as f:
for line in f:
if line.strip().startswith("#"):
continue
value = line.split("=")
if len(value) == 2:
kargs[value[0]] = str(value[1].strip("\n").strip('"'))
return (
kargs["ID"],
kargs.get("ID_LIKE", kargs["ID"]),
kargs["VERSION_ID"],
)
def get_qemu_img_path(self):
# We need to go with more specific to least specific until we find
# the qemu-img that works for the Linux distro
specific, likes, version = self.get_distro_version()
virtenv_path, binary = os.path.split("/home/tvault/.virtenv/bin/python")
virtenv_path = os.path.join(virtenv_path, "qemu-img")
for distro in specific.split() + likes.split():
qemu_path = os.path.join(virtenv_path, distro, version, "qemu-img")
if os.path.exists(qemu_path):
return qemu_path
qemu_path = os.path.join(
virtenv_path, distro, version.split(".")[0], "qemu-img"
)
if os.path.exists(qemu_path):
return qemu_path
qemu_path = os.path.join(virtenv_path, distro, "qemu-img")
if os.path.exists(qemu_path):
return qemu_path
def update_tracking_file(
self,
tracking_file,
process,
update_queue,
volume_path,
backup_image_file_path,
cmd,
retry=False,
):
""" Utility method that updates a tracking file with information dury an image
copy.
Args:
tracking_file (str): Path to the tracking file to update.
process (process): Process handle of the sub process.
update_queue (queue): Queue of file copy percentages from process.
volume_path (str): Volume path used for logging.
backup_image_file_path (str): Backup image path used for logging.
cmd (str): Comand the process is executing. Used for logging.
"""
percentage = 0.0
process_status = None
tracking_dir = os.path.dirname(tracking_file)
cancel_file = os.path.join(tracking_dir, "cancelled")
# Keep updating the progress tracking file while the
# process is still running and there are items the queue.
while process_status is None or not update_queue.empty():
process_status = process.poll()
try:
try:
output = update_queue.get(timeout=120).decode('utf-8')
except Empty:
if os.path.exists(cancel_file):
try:
process.kill()
except Exception as ex:
LOG.exception(ex)
pass
process_status = process.poll()
contego_utils.touch_file(tracking_file)
continue
except Exception as ex:
LOG.exception(ex)
else:
if re.search(r"\d+\.\d+", output) is None:
continue
if os.path.exists(cancel_file):
try:
process.kill()
except Exception as ex:
LOG.exception(ex)
pass
process_status = process.poll()
percentage = re.search(r"\d+\.\d+", output).group(0)
LOG.debug(
(
"copying from %(backup_path)s to "
"%(volume_path)s %(percentage)s %% "
"completed\n"
)
% {
"backup_path": backup_image_file_path,
"volume_path": volume_path,
"percentage": str(percentage),
}
)
percentage = float(percentage)
contego_utils.update_progress(
tracking_file, "%s %% percentage complete\n" % str(percentage),
)
except Exception as ex:
LOG.exception(ex)
pass
process.stdin.close()
_returncode = process.returncode # pylint: disable=E1101
# on S3 backends, this will make sure the contents are properly flushed
with open(backup_image_file_path, "r") as f:
pass
if _returncode:
LOG.error(("Result was %d" % _returncode))
if _returncode == -9:
msg = "User initiated cancel request"
else:
msg = "Execution error %(exit_code)d (%(stderr)s). cmd %(cmd)s" % {
"exit_code": _returncode,
"stderr": process.stderr.read(),
"cmd": cmd,
}
raise Exception(msg)
def _execute_qemu_img_and_track_progress_for_backup(
self, cmdspec, curr_snap_path, qcow2file, progress_tracker_path
):
qcow2_info = qemuimages.qemu_img_info(qcow2file)
vsize = qcow2_info.virtual_size
for iter in range(2):
try:
if qcow2file and os.path.exists(qcow2file):
qcow2_info = qemuimages.qemu_img_info(qcow2file)
assert qcow2_info.cluster_size == 1024 * 64
qemu_img_bin = self.get_qemu_img_path()
# cmdspec = ['sleep', '1200']
cmd = " ".join(cmdspec)
LOG.debug(
("_execute_qemu_img_and_track_progress_for_backup cmd %s " % cmd)
)
process = subprocess.Popen(
cmdspec,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=-1,
close_fds=True,
shell=False,
)
queue = Queue()
read_thread = Thread(
target=enqueue_output, args=(process.stdout, queue)
)
read_thread.daemon = True # thread dies with the program
read_thread.start()
self.update_tracking_file(
progress_tracker_path,
process,
queue,
curr_snap_path,
qcow2file,
cmd,
retry=iter == 1,
)
qcow2_info = qemuimages.qemu_img_info(qcow2file)
if qcow2_info.virtual_size == 0:
time.sleep(10)
qemuimages.qemu_check_n_resize(qcow2file, vsize)
return
except Exception as ex:
LOG.exception(ex)
try:
os.remove(qcow2file)
except Exception as ex:
LOG.exception(ex)
pass
qemuimages.create_cow_image(None, qcow2file, size=vsize)
if iter == 1:
raise
LOG.info("Retrying qemu-img convert request again: Iteration %d" % iter)
# for s3 backends, we may have to retry one more time
time.sleep(30)
def _execute_qemu_img_and_track_progress_for_restore(
self, cmdspec, curr_snap_path, qcow2file, progress_tracker_path
):
for iter in range(2):
try:
if qcow2file and os.path.exists(qcow2file):
qcow2_info = qemuimages.qemu_img_info(qcow2file)
assert qcow2_info.cluster_size == 1024 * 64
qemu_img_bin = self.get_qemu_img_path()
# cmdspec = ['sleep', '1200']
cmd = " ".join(cmdspec)
LOG.debug(
("_execute_qemu_img_and_track_progress_for_restore cmd %s " % cmd)
)
process = subprocess.Popen(
cmdspec,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=-1,
close_fds=True,
shell=False,
)
queue = Queue()
read_thread = Thread(
target=enqueue_output, args=(process.stdout, queue)
)
read_thread.daemon = True # thread dies with the program
read_thread.start()
self.update_tracking_file(
progress_tracker_path,
process,
queue,
curr_snap_path,
qcow2file,
cmd,
retry=iter == 1,
)
return
except Exception as ex:
LOG.exception(ex)
if iter == 1:
raise
LOG.info("Retrying qemu-img convert request again: Iteration %d" % iter)
# for s3 backends, we may have to retry one more time
time.sleep(30)