Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright (c) 2015 TrilioData, Inc.
# All Rights Reserved.

import os
import re
import sys
import subprocess
import time
from threading import Thread
from tempfile import mkstemp

try:
    from Queue import Queue, Empty
except ImportError:
    from queue import Queue, Empty

try:
    from oslo_log import log as logging
except ImportError:
    from nova.openstack.common import log as logging

try:
    from oslo_config import cfg
except ImportError:
    from oslo.config import cfg

from contego import utils as contego_utils
from contego.nova.extension.driver import qemuimages


backend_opts = [
    cfg.StrOpt(
        "override_block",
        default="cinder",
        help="by default block disk_type is mapped to cinder"
        " It can be override to lvm by this option",
    ),
    cfg.StrOpt(
        "contego_staging_dir",
        default=None,
        help="by default uses CONF.instances_path"
        " It can be override depending on user configuration",
    ),
]

LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.register_opts(backend_opts, "backends")


def enqueue_output(out, queue):
    line = out.read(17)
    while line:
        line = out.read(17)
        queue.put(line)
    out.close()


class Backend(object):
    def __init__(self):
        self.BACKEND = {
            "file": "contego.nova.extension.driver."
            "backends.qcow2_backend.QCOW2Backend",
            "qcow2": "contego.nova.extension.driver."
            "backends.qcow2_backend.QCOW2Backend",
            "cinder": "contego.nova.extension.driver."
            "backends.cinder_backend.CinderBackend",
            "rbd": "contego.nova.extension.driver.backends." "rdb_backend.RBDBackend",
            "ceph": "contego.nova.extension.driver.backends." "rdb_backend.RBDBackend",
            "rbdboot": "contego.nova.extension.driver.backends."
            "rbdboot_backend.RBDBootBackend",
            "cephboot": "contego.nova.extension.driver.backends."
            "rbdboot_backend.RBDBootBackend",
            "lvm": "contego.nova.extension.driver.backends."
            "cinder_backend.LVMBackend",
        }

    def get(self, backend, **kwargs):
        if backend in ("block", "iscsi", "cinder"):
            backend = CONF.backends.override_block
        backend_class_name = self.BACKEND.get(backend)

        if not backend:
            raise RuntimeError("Unknown backend type=%s" % backend)

        parts = backend_class_name.split(".")
        module = ".".join(parts[:-1])
        backend_class = __import__(module)

        for comp in parts[1:]:
            backend_class = getattr(backend_class, comp)

        return backend_class(**kwargs)

    def configure_security_profile(self, instance_uuid, additional_files=None):
        pass

    def reset_security_profile(self, instance_uuid, additional_files=None):
        pass

    def create_snapshot(self, devices, **kwargs):
        raise NotImplementedError("create_snapshot() method must be implemented")

    def delete_snapshot(self, disk_info, **kwargs):
        raise NotImplementedError("delete_snapshot() method must be implemented")

    def check_prev_snapshot(self, disk_info, **kwargs):
        raise NotImplementedError("check_prev_snapshot() method must be implemented")

    def update_snapshot_info(self, disk_info, **kwargs):
        raise NotImplementedError("update_snapshot_info() method must be implemented")

    def upload_snapshot(self, disk_info, **kwargs):
        raise NotImplementedError("update_snapshot() method must be implemented")

    def _get_temp_file_path(self, remove=False):
        """Makes tmpfile under CONF.instances_path."""
        dirpath = CONF.backends.contego_staging_dir or CONF.instances_path
        fd, tmp_file = mkstemp(dir=dirpath)
        os.close(fd)
        if remove:
            os.remove(tmp_file)
        return tmp_file

    def get_distro_version(self):
        kargs = {}
        with open("/etc/os-release", "r") as f:
            for line in f:
                if line.strip().startswith("#"):
                    continue
                value = line.split("=")
                if len(value) == 2:
                    kargs[value[0]] = str(value[1].strip("\n").strip('"'))

        return (
            kargs["ID"],
            kargs.get("ID_LIKE", kargs["ID"]),
            kargs["VERSION_ID"],
        )

    def get_qemu_img_path(self):
        # We need to go with more specific to least specific until we find
        # the qemu-img that works for the Linux distro
        specific, likes, version = self.get_distro_version()

        virtenv_path, binary = os.path.split("/home/tvault/.virtenv/bin/python")

        virtenv_path = os.path.join(virtenv_path, "qemu-img")

        for distro in specific.split() + likes.split():
            qemu_path = os.path.join(virtenv_path, distro, version, "qemu-img")
            if os.path.exists(qemu_path):
                return qemu_path

            qemu_path = os.path.join(
                virtenv_path, distro, version.split(".")[0], "qemu-img"
            )
            if os.path.exists(qemu_path):
                return qemu_path

            qemu_path = os.path.join(virtenv_path, distro, "qemu-img")

            if os.path.exists(qemu_path):
                return qemu_path

    def update_tracking_file(
        self,
        tracking_file,
        process,
        update_queue,
        volume_path,
        backup_image_file_path,
        cmd,
        retry=False,
    ):
        """ Utility method that updates a tracking file with information dury an image
            copy.

        Args:
            tracking_file (str): Path to the tracking file to update.
            process (process): Process handle of the sub process.
            update_queue (queue): Queue of file copy percentages from process.
            volume_path (str): Volume path used for logging.
            backup_image_file_path (str): Backup image path used for logging.
            cmd (str): Comand the process is executing. Used for logging.
        """
        percentage = 0.0
        process_status = None
        tracking_dir = os.path.dirname(tracking_file)
        cancel_file = os.path.join(tracking_dir, "cancelled")

        # Keep updating the progress tracking file while the
        # process is still running and there are items the queue.
        while process_status is None or not update_queue.empty():
            process_status = process.poll()
            try:
                try:
                    output = update_queue.get(timeout=120).decode('utf-8')
                except Empty:
                    if os.path.exists(cancel_file):
                        try:
                            process.kill()
                        except Exception as ex:
                            LOG.exception(ex)
                            pass
                        process_status = process.poll()
                    contego_utils.touch_file(tracking_file)
                    continue
                except Exception as ex:
                    LOG.exception(ex)
                else:
                    if re.search(r"\d+\.\d+", output) is None:
                        continue

                    if os.path.exists(cancel_file):
                        try:
                            process.kill()
                        except Exception as ex:
                            LOG.exception(ex)
                            pass
                        process_status = process.poll()

                    percentage = re.search(r"\d+\.\d+", output).group(0)

                    LOG.debug(
                        (
                            "copying from %(backup_path)s to "
                            "%(volume_path)s %(percentage)s %% "
                            "completed\n"
                        )
                        % {
                            "backup_path": backup_image_file_path,
                            "volume_path": volume_path,
                            "percentage": str(percentage),
                        }
                    )

                    percentage = float(percentage)

                    contego_utils.update_progress(
                        tracking_file, "%s %% percentage complete\n" % str(percentage),
                    )

            except Exception as ex:
                LOG.exception(ex)
                pass

        process.stdin.close()

        _returncode = process.returncode  # pylint: disable=E1101

        # on S3 backends, this will make sure the contents are properly flushed
        with open(backup_image_file_path, "r") as f:
            pass

        if _returncode:
            LOG.error(("Result was %d" % _returncode))
            if _returncode == -9:
                msg = "User initiated cancel request"
            else:
                msg = "Execution error %(exit_code)d (%(stderr)s). cmd %(cmd)s" % {
                    "exit_code": _returncode,
                    "stderr": process.stderr.read(),
                    "cmd": cmd,
                }
            raise Exception(msg)

    def _execute_qemu_img_and_track_progress_for_backup(
        self, cmdspec, curr_snap_path, qcow2file, progress_tracker_path
    ):

        qcow2_info = qemuimages.qemu_img_info(qcow2file)
        vsize = qcow2_info.virtual_size
        for iter in range(2):
            try:
                if qcow2file and os.path.exists(qcow2file):
                    qcow2_info = qemuimages.qemu_img_info(qcow2file)

                    assert qcow2_info.cluster_size == 1024 * 64

                qemu_img_bin = self.get_qemu_img_path()
                # cmdspec = ['sleep', '1200']
                cmd = " ".join(cmdspec)
                LOG.debug(
                    ("_execute_qemu_img_and_track_progress_for_backup cmd %s " % cmd)
                )
                process = subprocess.Popen(
                    cmdspec,
                    stdin=subprocess.PIPE,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    bufsize=-1,
                    close_fds=True,
                    shell=False,
                )

                queue = Queue()
                read_thread = Thread(
                    target=enqueue_output, args=(process.stdout, queue)
                )

                read_thread.daemon = True  # thread dies with the program
                read_thread.start()

                self.update_tracking_file(
                    progress_tracker_path,
                    process,
                    queue,
                    curr_snap_path,
                    qcow2file,
                    cmd,
                    retry=iter == 1,
                )

                qcow2_info = qemuimages.qemu_img_info(qcow2file)
                if qcow2_info.virtual_size == 0:
                    time.sleep(10)
                    qemuimages.qemu_check_n_resize(qcow2file, vsize)
                return
            except Exception as ex:
                LOG.exception(ex)

                try:
                    os.remove(qcow2file)
                except Exception as ex:
                    LOG.exception(ex)
                    pass

                qemuimages.create_cow_image(None, qcow2file, size=vsize)

                if iter == 1:
                    raise

                LOG.info("Retrying qemu-img convert request again: Iteration %d" % iter)
                # for s3 backends, we may have to retry one more time
                time.sleep(30)

    def _execute_qemu_img_and_track_progress_for_restore(
        self, cmdspec, curr_snap_path, qcow2file, progress_tracker_path
    ):

        for iter in range(2):
            try:
                if qcow2file and os.path.exists(qcow2file):
                    qcow2_info = qemuimages.qemu_img_info(qcow2file)

                    assert qcow2_info.cluster_size == 1024 * 64

                qemu_img_bin = self.get_qemu_img_path()
                # cmdspec = ['sleep', '1200']
                cmd = " ".join(cmdspec)
                LOG.debug(
                    ("_execute_qemu_img_and_track_progress_for_restore cmd %s " % cmd)
                )
                process = subprocess.Popen(
                    cmdspec,
                    stdin=subprocess.PIPE,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    bufsize=-1,
                    close_fds=True,
                    shell=False,
                )

                queue = Queue()
                read_thread = Thread(
                    target=enqueue_output, args=(process.stdout, queue)
                )

                read_thread.daemon = True  # thread dies with the program
                read_thread.start()

                self.update_tracking_file(
                    progress_tracker_path,
                    process,
                    queue,
                    curr_snap_path,
                    qcow2file,
                    cmd,
                    retry=iter == 1,
                )

                return
            except Exception as ex:
                LOG.exception(ex)
                if iter == 1:
                    raise

                LOG.info("Retrying qemu-img convert request again: Iteration %d" % iter)
                # for s3 backends, we may have to retry one more time
                time.sleep(30)