Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright (c) 2018 Trilio Data, Inc.
# All Rights Reserved.

"""Implementation of Vault backend

"""
import uuid
import base64
import os
import stat
import time
import subprocess
import socket
from os.path import isfile, isdir, join
from os import walk
from six.moves.urllib.parse import urlparse

try:
    from oslo_log import log as logging
except ImportError:
    from nova.openstack.common import log as logging

try:
    from oslo_config import cfg
except ImportError:
    from oslo.config import cfg

from nova import exception
from contego import autolog
from contego import utils
from contego import exception as contego_exception


LOG = logging.getLogger(__name__)
Logger = autolog.Logger(LOG)


contego_vault_opts = [
    cfg.StrOpt(
        "vault_storage_type",
        default="nfs",
        help="Storage type: nfs, s3",
    ),
    # swift-i: integrated(keystone), swift-s: standalone
    cfg.StrOpt(
        "vault_data_directory",
        default="/var/triliovault-mounts",
        help="Location where snapshots will be stored",
    ),
    cfg.StrOpt(
        "vault_data_directory_old",
        default="/var/triliovault",
        help="Location where snapshots will be stored",
    ),
    cfg.StrOpt("vault_storage_nfs_export", default="local", help="NFS Export"),
    cfg.StrOpt("vault_storage_nfs_options", default="nolock", help="NFS Options"),
    cfg.StrOpt(
        "vault_swift_auth_version",
        default="KEYSTONE_V2",
        help="KEYSTONE_V2 KEYSTONE_V3 TEMPAUTH",
    ),
    cfg.StrOpt(
        "vault_swift_auth_url",
        default="http://localhost:5000/v2.0",
        help="Keystone Authorization URL",
    ),
    cfg.StrOpt("vault_swift_tenant", default="admin", help="Swift tenant"),
    cfg.StrOpt("vault_swift_username", default="admin", help="Swift username"),
    cfg.StrOpt("vault_swift_password", default="password", help="Swift password"),
    cfg.StrOpt(
        "vault_swift_container_prefix",
        default="TrilioVault",
        help="Swift Container Prefix",
    ),
    cfg.StrOpt(
        "vault_swift_segment_size",
        default="524288000",  # 500MB
        help="Default segment size 500MB",
    ),
    cfg.IntOpt(
        "vault_retry_count", default=2, help="The number of times we retry on failures",
    ),
    cfg.StrOpt(
        "vault_swift_url_template",
        default="http://localhost:8080/v1/AUTH_%(project_id)s",
        help="The URL of the Swift endpoint",
    ),
    cfg.StrOpt("guestfs_backend", default="direct", help="Guestfs backend"),
    cfg.StrOpt(
        "guestfs_backend_settings",
        default="",
        help="Qemu tcg mode on or off or other settings",
    ),
]

CONF = cfg.CONF
CONF.register_opts(contego_vault_opts)
try:
    CONF.import_opt("state_path", "nova.paths")
except BaseException:
    pass


def ensure_nfs_mounted():
    """Make sure NFS share is mounted at CONF.vault_data_directory."""

    def wrap(func):
        def new_function(*args, **kw):
            if (
                CONF.vault_storage_type.lower() != "nfs"
                and CONF.vault_storage_type.lower() != "swift-s"
                and CONF.vault_storage_type.lower() != "s3"
            ):
                return

            if CONF.vault_storage_type.lower() == "nfs":
                base64encode = base64.b64encode(urlparse(
                    args[0]["backend_endpoint"]).path.encode("utf-8")
                )
                mountpath = os.path.join(
                    CONF.vault_data_directory, base64encode.decode("utf-8")
                )
            else:
                mountpath = CONF.vault_data_directory

            if (
                not os.path.ismount(mountpath)
                and CONF.vault_storage_type.lower() == "nfs"
            ):
                raise contego_exception.InvalidNFSMountPoint(
                    reason="'%s' is not a valid mount point" % mountpath
                )

            with open("/proc/mounts", "r") as f:
                mounts = [
                    {line.split()[1]: line.split()[0]}
                    for line in f.readlines()
                    if line.split()[1] == mountpath
                ]

            if len(mounts) == 0 or mounts[0].get(
                mountpath, None
            ) not in CONF.vault_storage_nfs_export.split(","):
                raise contego_exception.InvalidNFSMountPoint(
                    reason="'%s' is not '%s' mounted"
                    % (mountpath, CONF.vault_storage_nfs_export)
                )

            return func(*args, **kw)

        return new_function

    return wrap


def _samefile(src, dst):
    # Macintosh, Unix.
    if hasattr(os.path, "samefile"):
        try:
            return os.path.samefile(src, dst)
        except OSError:
            return False

    # All other platforms: check for same pathname.
    return os.path.normcase(os.path.abspath(src)) == os.path.normcase(
        os.path.abspath(dst)
    )


def _copyfile(
    src,
    dst,
    progress_tracking_file_path,
    follow_symlinks=True,
    chunk_size=64 * 1024 * 1024,
):
    """Copy data from src to dst.

    If follow_symlinks is not set and src is a symbolic link, a new
    symlink will be created instead of copying the file it points to.

    """
    if _samefile(src, dst):
        raise exception.NovaException(
            msg=("{!r} and {!r} are the same file".format(src, dst))
        )

    tracking_dir = os.path.dirname(progress_tracking_file_path)
    cancel_file = os.path.join(tracking_dir, "cancelled")

    for fn in [src, dst]:
        try:
            st = os.stat(fn)
        except OSError:
            # File most likely does not exist
            pass
        else:
            # XXX What about other special files? (sockets, devices...)
            if stat.S_ISFIFO(st.st_mode):
                raise exception.NovaException(msg=("`%s` is a named pipe" % fn))

    if not follow_symlinks and os.path.islink(src):
        os.symlink(os.readlink(src), dst)
    else:
        total_bytes = (os.stat(src)).st_size
        with open(src, "rb") as fsrc:
            with open(dst, "wb") as fdst:
                copied_bytes = 0
                while True:
                    buf = fsrc.read(chunk_size)
                    if not buf:
                        break
                    fdst.write(buf)
                    copied_bytes += len(buf)
                    percentage_copied = (float(copied_bytes) / total_bytes) * 100
                    if os.path.exists(cancel_file):
                        raise Exception("Error: User initiated backup cancel")
                    os.utime(progress_tracking_file_path, None)
                    if ((percentage_copied % 5) == 0) or (
                        copied_bytes % (1024 * 1024 * 1024) == 0
                    ):  # noqa
                        utils.update_progress(
                            progress_tracking_file_path,
                            "%s %% percentage complete\n" % str(percentage_copied),
                        )
                        utils.update_progress(
                            progress_tracking_file_path,
                            "%s bytes copied\n" % str(copied_bytes),
                        )
    return dst


def _copymode(src, dst, follow_symlinks=True):
    """Copy mode bits from src to dst.

    If follow_symlinks is not set, symlinks aren't followed if and only
    if both `src` and `dst` are symlinks.  If `lchmod` isn't available
    (e.g. Linux) this method does nothing.

    """
    if not follow_symlinks and os.path.islink(src) and os.path.islink(dst):
        if hasattr(os, "lchmod"):
            stat_func, chmod_func = os.lstat, os.lchmod
        else:
            return
    elif hasattr(os, "chmod"):
        stat_func, chmod_func = os.stat, os.chmod
    else:
        return

    st = stat_func(src)
    chmod_func(dst, stat.S_IMODE(st.st_mode))


def copy_file(src, dst, progress_tracking_file_path, follow_symlinks=True):
    """Copy data and mode bits ("cp src dst"). Return the file's destination.

    The destination may be a directory.

    If source and destination are the same file, a SameFileError will be
    raised.
    """
    if os.path.isdir(dst):
        dst = os.path.join(dst, os.path.basename(src))
    _copyfile(src, dst, progress_tracking_file_path, follow_symlinks=follow_symlinks)
    _copymode(src, dst, follow_symlinks=follow_symlinks)
    return dst


def unmount_backup_media():
    for idx, nfsshare in enumerate(CONF.vault_storage_nfs_export.split(",")):
        base64encode = base64.b64encode(urlparse(
                        nfsshare).path.encode("utf-8"))
        mountpath = os.path.join(
            CONF.vault_data_directory, base64encode.decode("utf-8")
        )
        try:
            command = [
                "sudo",
                "nova-rootwrap",
                CONF.rootwrap_config,
                "umount",
                "-l",
                mountpath,
            ]
            subprocess.call(command, shell=False)
        except Exception as ex:
            LOG.exception(ex)
            pass


def mount_backup_media():
    """ mounts storage """

    if (
        CONF.vault_storage_type.lower() == "swift-s"
        or CONF.vault_storage_type.lower() == "s3"
    ):
        CONF.vault_using_fuse = True
    else:
        CONF.vault_using_fuse = False

    if CONF.vault_storage_type.lower() == "local":
        pass
    elif CONF.vault_storage_type.lower() == "vault":
        pass
    elif CONF.vault_storage_type.lower() == "nfs":
        unmount_backup_media()
        nfsoptions = CONF.vault_storage_nfs_options
        for idx, nfsshare in enumerate(CONF.vault_storage_nfs_export.split(",")):
            base64encode = base64.b64encode(urlparse(
                            nfsshare).path.encode("utf-8"))
            mountpath = os.path.join(
                CONF.vault_data_directory, base64encode.decode("utf-8")
            )

            utils.ensure_tree(mountpath)
            command = [
                "timeout",
                "-s",
                "9",
                "30s",
                "sudo",
                "nova-rootwrap",
                CONF.rootwrap_config,
                "mount",
                "-o",
                nfsoptions,
                nfsshare,
                mountpath,
            ]

            subprocess.check_call(command, shell=False)
            """
            if idx == 0:
                command = ['timeout', '-sKILL', '30', 'sudo', 'mount',
                           '--bind', mountpath, CONF.vault_data_directory_old]
                subprocess.check_call(command, shell=False)
            """
    elif CONF.vault_using_fuse:
        unmount_backup_media()
    else:
        # das, swift-i, swift-s, s3
        if CONF.vault_storage_das_device != "none":
            try:
                command = [
                    "sudo",
                    "nova-rootwrap",
                    CONF.rootwrap_config,
                    "mount",
                    CONF.vault_storage_das_device,
                    CONF.vault_data_directory,
                ]
                subprocess.check_call(command, shell=False)
            except Exception as ex:
                LOG.exception(ex)
                pass


def update_in_progress_files_on_exit():
    ip_address = CONF.my_ip
    if (
        CONF.vault_storage_type.lower() != "nfs"
        and CONF.vault_storage_type.lower() != "swift-s"
        and CONF.vault_storage_type.lower() != "s3"
    ):
        return

    for idx, nfsshare in enumerate(CONF.vault_storage_nfs_export.split(",")):
        try:
            if CONF.vault_storage_type.lower() == "nfs":
                base64encode = base64.b64encode(urlparse(
                                nfsshare).path.encode("utf-8"))
                mountpath = os.path.join(
                    CONF.vault_data_directory, base64encode.decode("utf-8")
                )
            elif CONF.vault_using_fuse:
                mountpath = CONF.vault_data_directory

            progress_tracker_directory = os.path.join(mountpath, "contego_tasks")

            if not os.path.exists(progress_tracker_directory):
                continue

            for x in os.listdir(progress_tracker_directory):
                snapshot_dirs = os.path.join(progress_tracker_directory, x)
                if isdir(snapshot_dirs):
                    for z in os.listdir(snapshot_dirs):
                        snapshot_resource_file = os.path.join(snapshot_dirs, z)
                        if isfile(snapshot_resource_file):
                            async_task_status = {}
                            with open(
                                snapshot_resource_file, "r"
                            ) as progress_tracking_file:  # noqa
                                async_task_status[
                                    "status"
                                ] = progress_tracking_file.readlines()
                            if (
                                async_task_status
                                and "status" in async_task_status
                                and len(async_task_status["status"])
                            ):
                                update_file_or_not = True
                                is_running_on_same_node = False
                                for line in async_task_status["status"]:
                                    if "Completed" in line:
                                        update_file_or_not = False
                                    if ip_address in line:
                                        is_running_on_same_node = True
                                if (
                                    update_file_or_not is True
                                    and is_running_on_same_node is True
                                ):
                                    utils.update_progress(
                                        snapshot_resource_file, "Down \n"
                                    )
        except Exception as ex:
            LOG.exception(ex)
            pass


def flush_cache():
    if CONF.vault_using_fuse:
        shutil.rmtree(CONF.vault_data_directory_old)


@ensure_nfs_mounted()
def ensure_progress_written(snapshot_resource_file):
    async_task_status = {}
    updated_file_or_not = False
    with open(snapshot_resource_file, "r") as progress_tracking_file:
        async_task_status["status"] = progress_tracking_file.readlines()
        if (
            async_task_status
            and "status" in async_task_status
            and len(async_task_status["status"])
        ):
            for line in async_task_status["status"]:
                if "Completed" in line:
                    updated_file_or_not = True
    return updated_file_or_not


def get_progress_tracker_directory(tracker_metadata):
    progress_tracker_directory = ""
    if CONF.vault_storage_type.lower() == "nfs":
        base64encode = base64.b64encode(urlparse(
            tracker_metadata["backend_endpoint"]).path.encode("utf-8")
        )
        mountpath = os.path.join(
            CONF.vault_data_directory, base64encode.decode("utf-8")
        )

    if CONF.vault_storage_type.lower() == "nfs":
        progress_tracker_directory = os.path.join(
            CONF.vault_data_directory,
            base64encode.decode("utf-8"),
            "contego_tasks",
            "snapshot_%s" % (tracker_metadata["snapshot_id"]),
        )
    else:
        progress_tracker_directory = os.path.join(
            CONF.vault_data_directory,
            "contego_tasks",
            "snapshot_%s" % (tracker_metadata["snapshot_id"]),
        )

    utils.ensure_tree(progress_tracker_directory)
    return progress_tracker_directory


@ensure_nfs_mounted()
def get_progress_tracker_path(tracker_metadata):
    progress_tracker_directory = get_progress_tracker_directory(tracker_metadata)
    if progress_tracker_directory:
        progress_tracking_file_path = os.path.join(
            progress_tracker_directory, tracker_metadata["resource_id"]
        )
        return progress_tracking_file_path
    else:
        return None


def get_vault_data_directory():
    vault_data_directory = ""
    if CONF.vault_storage_type.lower() == "nfs" or CONF.vault_using_fuse:
        vault_data_directory = CONF.vault_data_directory
    else:
        vault_data_directory = os.path.join(
            CONF.vault_data_directory, "staging", socket.gethostname()
        )

    head, tail = os.path.split(vault_data_directory + "/")
    utils.ensure_tree(head)
    return vault_data_directory


def get_workload_path(workload_metadata):
    if CONF.vault_storage_type.lower() == "nfs":
        base64encode = base64.b64encode(urlparse(
            workload_metadata["backend_endpoint"]).path.encode("utf-8")
        )
        workload_path = os.path.join(
            get_vault_data_directory(),
            base64encode.decode("utf-8"),
            "workload_%s" % (workload_metadata["workload_id"]),
        )
    else:
        workload_path = os.path.join(
            get_vault_data_directory(),
            "workload_%s" % (workload_metadata["workload_id"]),
        )

    return workload_path


def get_snapshot_path(snapshot_metadata):
    workload_path = get_workload_path(snapshot_metadata)
    snapshot_path = os.path.join(
        workload_path, "snapshot_%s" % (snapshot_metadata["snapshot_id"])
    )
    return snapshot_path


def get_snapshot_vm_path(snapshot_vm_metadata):
    snapshot_path = get_snapshot_path(snapshot_vm_metadata)
    snapshot_vm_path = os.path.join(
        snapshot_path, "vm_id_%s" % (snapshot_vm_metadata["snapshot_vm_id"])
    )
    return snapshot_vm_path


def get_snapshot_vm_resource_path(snapshot_vm_resource_metadata):
    snapshot_vm_path = get_snapshot_vm_path(snapshot_vm_resource_metadata)
    snapshot_vm_resource_path = os.path.join(
        snapshot_vm_path,
        "vm_res_id_%s_%s"
        % (
            snapshot_vm_resource_metadata["snapshot_vm_resource_id"],
            snapshot_vm_resource_metadata["snapshot_vm_resource_name"].replace(" ", ""),
        ),
    )
    return snapshot_vm_resource_path


@ensure_nfs_mounted()
def get_snapshot_vm_disk_resource_path(snapshot_vm_disk_resource_metadata):
    snapshot_vm_resource_path = get_snapshot_vm_resource_path(
        snapshot_vm_disk_resource_metadata
    )
    snapshot_vm_disk_resource_path = os.path.join(
        snapshot_vm_resource_path,
        snapshot_vm_disk_resource_metadata["vm_disk_resource_snap_id"],
    )
    return snapshot_vm_disk_resource_path


def get_swift_container(context, workload_metadata):
    swift_list_all(context, container=None)
    if os.path.isfile("/tmp/swift.out"):
        with open("/tmp/swift.out") as f:
            content = f.readlines()
        for container in content:
            container = container.replace("\n", "")
            if container.endswith("_" + workload_metadata["workload_id"]):
                return container

    if len(CONF.vault_swift_container_prefix):
        container = CONF.vault_swift_container_prefix + "_"
    else:
        container = ""
    container = (
        container
        + workload_metadata["workload_name"]
        + "_"
        + workload_metadata["workload_id"]
    )
    return container


@autolog.log_method(logger=Logger)
def upload_snapshot_vm_disk_resource(
    context, snapshot_vm_disk_resource_metadata, snapshot_vm_disk_resource_path
):

    if CONF.vault_storage_type.lower() == "nfs" or CONF.vault_using_fuse:

        # TODO: Turn vault.py into backup media classes and make this a wrapper
        # function
        copy_to_file_path = get_snapshot_vm_disk_resource_path(
            snapshot_vm_disk_resource_metadata
        )
        head, tail = os.path.split(copy_to_file_path)
        utils.ensure_tree(head)
        progress_msg = (
            "Uploading '"
            + snapshot_vm_disk_resource_metadata["snapshot_vm_resource_name"]
            + "' of '"
            + snapshot_vm_disk_resource_metadata["snapshot_vm_name"]
            + "' to backup media"
        )
        LOG.info(progress_msg)
        copy_file(
            snapshot_vm_disk_resource_path,
            copy_to_file_path,
            snapshot_vm_disk_resource_metadata["progress_tracking_file_path"],
        )
        progress_msg = (
            "Finished Uploading '"
            + snapshot_vm_disk_resource_metadata["snapshot_vm_resource_name"]
            + "' of '"
            + snapshot_vm_disk_resource_metadata["snapshot_vm_name"]
            + "' to backup media"
        )
        LOG.info(progress_msg)

    elif CONF.vault_storage_type.lower() == "swift-i" or CONF.vault_using_fuse:
        progress_msg = (
            "Uploading '"
            + snapshot_vm_disk_resource_metadata["snapshot_vm_resource_name"]
            + "' of '"
            + snapshot_vm_disk_resource_metadata["snapshot_vm_name"]
            + "' to object store"
        )
        LOG.info(progress_msg)

        object_name = get_snapshot_vm_disk_resource_path(
            snapshot_vm_disk_resource_metadata
        )
        object_name = object_name.replace(get_vault_data_directory(), "", 1)
        container = get_swift_container(context, snapshot_vm_disk_resource_metadata)
        try:
            swift_upload_files(
                context,
                [snapshot_vm_disk_resource_path],
                container,
                object_name=object_name,
            )
        except Exception as ex:
            LOG.exception(ex)
            rname = "snapshot_vm_resource_name"
            progress_msg = (
                "Retrying to upload '"
                + snapshot_vm_disk_resource_metadata[rname]
                + "' of '"
                + snapshot_vm_disk_resource_metadata["snapshot_vm_name"]
                + "' to object store"
            )
            LOG.info(progress_msg)
            swift_upload_files(
                context,
                [snapshot_vm_disk_resource_path],
                container,
                object_name=object_name,
            )
    elif CONF.vault_storage_type == "s3":
        pass
    else:
        pass


@autolog.log_method(logger=Logger)
def swift_upload_files(context, files, container, object_name=None):
    """ upload a files or directories to swift """
    options = {}
    if CONF.vault_storage_type == "swift-i":
        if not context:
            raise exception.NovaException(
                msg=("Error: context is none with integrated swift")
            )
        auth_token = context.auth_token
        object_storage_url = CONF.vault_swift_url_template % context.to_dict()
        auth_url = None
        user_name = None
        password = None
        tenant_name = None
    else:
        auth_token = None
        object_storage_url = None
        auth_url = CONF.vault_swift_auth_url
        user_name = CONF.vault_swift_username
        password = CONF.vault_swift_password
        tenant_name = CONF.vault_swift_tenant

    if CONF.vault_swift_auth_version == "TEMPAUTH":
        options = {
            "use_slo": False,
            "verbose": 1,
            "os_username": None,
            "os_user_domain_name": None,
            "os_cacert": None,
            "os_tenant_name": None,
            "os_user_domain_id": None,
            "header": [],
            "auth_version": "1.0",
            "ssl_compression": True,
            "os_password": None,
            "os_user_id": None,
            "skip_identical": True,
            "segment_container": None,
            "os_project_id": None,
            "snet": False,
            "object_uu_threads": 10,
            "object_name": object_name,
            "os_tenant_id": None,
            "os_project_name": None,
            "os_service_type": None,
            "segment_size": CONF.vault_swift_segment_size,
            "os_help": None,
            "object_threads": 10,
            "os_storage_url": object_storage_url,
            "insecure": False,
            "segment_threads": 10,
            "auth": auth_url,
            "os_auth_url": None,
            "user": user_name,
            "key": password,
            "os_region_name": None,
            "info": False,
            "retries": 5,
            "os_project_domain_id": None,
            "checksum": True,
            "changed": True,
            "leave_segments": False,
            "os_auth_token": auth_token,
            "os_options": {
                "project_name": None,
                "region_name": None,
                "tenant_name": None,
                "user_domain_name": None,
                "endpoint_type": None,
                "object_storage_url": object_storage_url,
                "project_domain_id": None,
                "user_id": None,
                "user_domain_id": None,
                "tenant_id": None,
                "service_type": None,
                "project_id": None,
                "auth_token": auth_token,
                "project_domain_name": None,
            },
            "debug": False,
            "os_project_domain_name": None,
            "os_endpoint_type": None,
            "verbose": 1,
        }

    else:
        if CONF.vault_swift_auth_version == "KEYSTONE_V2":
            auth_version = "2.0"
        else:
            auth_version = "3"
        options = {
            "use_slo": False,
            "verbose": 1,
            "os_username": user_name,
            "os_user_domain_name": None,
            "os_cacert": None,
            "os_tenant_name": tenant_name,
            "os_user_domain_id": None,
            "header": [],
            "auth_version": auth_version,
            "ssl_compression": True,
            "os_password": password,
            "os_user_id": None,
            "skip_identical": True,
            "segment_container": None,
            "os_project_id": None,
            "snet": False,
            "object_uu_threads": 10,
            "object_name": object_name,
            "os_tenant_id": None,
            "os_project_name": None,
            "os_service_type": None,
            "segment_size": CONF.vault_swift_segment_size,
            "os_help": None,
            "object_threads": 10,
            "os_storage_url": object_storage_url,
            "insecure": False,
            "segment_threads": 10,
            "auth": auth_url,
            "os_auth_url": auth_url,
            "user": user_name,
            "key": password,
            "os_region_name": None,
            "info": False,
            "retries": 5,
            "os_project_domain_id": None,
            "checksum": True,
            "changed": True,
            "leave_segments": False,
            "os_auth_token": auth_token,
            "os_options": {
                "project_name": None,
                "region_name": None,
                "tenant_name": tenant_name,
                "user_domain_name": None,
                "endpoint_type": None,
                "object_storage_url": object_storage_url,
                "project_domain_id": None,
                "user_id": None,
                "user_domain_id": None,
                "tenant_id": None,
                "service_type": None,
                "project_id": None,
                "auth_token": auth_token,
                "project_domain_name": None,
            },
            "debug": False,
            "os_project_domain_name": None,
            "os_endpoint_type": None,
            "verbose": 1,
        }

    if options["object_name"] is not None:
        if len(files) > 1:
            raise exception.NovaException(
                msg="object-name only be used with 1 file or dir"
            )
        else:
            orig_path = files[0]

    if options["segment_size"]:
        try:
            # If segment size only has digits assume it is bytes
            int(options["segment_size"])
        except ValueError:
            try:
                size_mod = "BKMG".index(options["segment_size"][-1].upper())
                multiplier = int(options["segment_size"][:-1])
            except ValueError:
                raise exception.NovaException(msg="Invalid segment size")

            options.segment_size = str((1024 ** size_mod) * multiplier)

    with SwiftService(options=options) as swift:
        try:
            objs = []
            dir_markers = []
            for f in files:
                if isfile(f):
                    objs.append(f)
                elif isdir(f):
                    for (_dir, _ds, _fs) in walk(f):
                        if not (_ds + _fs):
                            dir_markers.append(_dir)
                        else:
                            objs.extend([join(_dir, _f) for _f in _fs])
                else:
                    raise exception.NovaException(msg="Local file '%s' not found." % f)

            # Now that we've collected all the required files and dir markers
            # build the tuples for the call to upload
            if options["object_name"] is not None:
                objs = [
                    SwiftUploadObject(
                        o, object_name=o.replace(orig_path, options["object_name"], 1),
                    )
                    for o in objs
                ]
                dir_markers = [
                    SwiftUploadObject(
                        None,
                        object_name=d.replace(orig_path, options["object_name"], 1),
                        options={"dir_marker": True},
                    )
                    for d in dir_markers
                ]
            else:
                objs = [
                    SwiftUploadObject(
                        o, object_name=o.replace(get_vault_data_directory(), "", 1),
                    )
                    for o in objs
                ]
                dir_markers = [
                    SwiftUploadObject(
                        None,
                        object_name=d.replace(get_vault_data_directory(), "", 1),
                        options={"dir_marker": True},
                    )
                    for d in dir_markers
                ]

            for r in swift.upload(container, objs + dir_markers):
                if r["success"]:
                    if options["verbose"]:
                        if "attempts" in r and r["attempts"] > 1:
                            if "object" in r:
                                LOG.info(
                                    "%s [after %d attempts]"
                                    % (r["object"], r["attempts"])
                                )
                        else:
                            if "object" in r:
                                LOG.info(r["object"])
                            elif "for_object" in r:
                                LOG.info(
                                    "%s segment %s"
                                    % (r["for_object"], r["segment_index"])
                                )
                else:
                    error = r["error"]
                    if "action" in r and r["action"] == "create_container":
                        # it is not an error to be unable to create the
                        # container so print a warning and carry on
                        if isinstance(error, ClientException):
                            if r["headers"] and "X-Storage-Policy" in r["headers"]:
                                msg = (
                                    " with Storage Policy %s"
                                    % r["headers"]["X-Storage-Policy"].strip()
                                )
                            else:
                                msg = " ".join(
                                    str(x)
                                    for x in (error.http_status, error.http_reason,)
                                )
                                if error.http_response_content:
                                    if msg:
                                        msg += ": "
                                    msg += error.http_response_content[:60]
                                msg = ": %s" % msg
                        else:
                            msg = ": %s" % error
                        LOG.warning(
                            "Warning: failed to create container %r%s", container, msg,
                        )
                        raise exception.NovaException(
                            msg=(
                                "Warning: failed to create container %r%s",
                                container,
                                msg,
                            )
                        )
                    else:
                        LOG.warning("%s" % error)
                        too_large = (
                            isinstance(error, ClientException)
                            and error.http_status == 413
                        )
                        if too_large and options["verbose"] > 0:
                            LOG.error(
                                "Consider using the --segment-size "
                                "option to chunk the object"
                            )
                        raise exception.NovaException(msg=error)

        except SwiftError as ex:
            LOG.exception(ex)
            raise
        except Exception as ex:
            LOG.exception(ex)
            raise


@autolog.log_method(logger=Logger)
def get_swift_base_cmd(context):
    if CONF.vault_storage_type == "swift-i":
        if not context:
            raise exception.NovaException(
                msg=("Error: context is none with integrated swift")
            )
        object_storage_url = CONF.vault_swift_url_template % context.to_dict()
        if CONF.vault_swift_auth_version == "KEYSTONE_V2":
            cmd = [
                "swift",
                "--auth-version",
                "2",
                "--os-auth-token",
                context.auth_token,
                "--os-storage-url",
                object_storage_url,
            ]
        else:
            cmd = [
                "swift",
                "--auth-version",
                "3",
                "--os-auth-token",
                context.auth_token,
                "--os-storage-url",
                object_storage_url,
            ]

    else:
        if CONF.vault_swift_auth_version == "TEMPAUTH":
            cmd = [
                "swift",
                "-A",
                CONF.vault_swift_auth_url,
                "-U",
                CONF.vault_swift_username,
                "-K",
                "******",
            ]
        else:
            if CONF.vault_swift_auth_version == "KEYSTONE_V2":
                cmd = [
                    "swift",
                    "--auth-version",
                    "2",
                    "--os-auth-url",
                    CONF.vault_swift_auth_url,
                    "--os-tenant-name",
                    CONF.vault_swift_tenant,
                    "--os-username",
                    CONF.vault_swift_username,
                    "--os-password",
                    "******",
                ]
            else:
                cmd = [
                    "swift",
                    "--auth-version",
                    "3",
                    "--os-auth-url",
                    CONF.vault_swift_auth_url,
                    "--os-tenant-name",
                    CONF.vault_swift_tenant,
                    "--os-username",
                    CONF.vault_swift_username,
                    "--os-password",
                    "******",
                ]
    return cmd


@autolog.log_method(logger=Logger)
def swift_list_all(context, container):
    cmd = get_swift_base_cmd(context)
    if container:
        cmd_list = cmd + ["list", container]
    else:
        cmd_list = cmd + ["list"]

    cmd_list_str = " ".join(cmd_list)
    for idx, opt in enumerate(cmd_list):
        if opt == "--os-password":
            cmd_list[idx + 1] = CONF.vault_swift_password
            break
        if opt == "-K":
            cmd_list[idx + 1] = CONF.vault_swift_password
            break
    if os.path.isfile("/tmp/swift.out"):
        os.remove("/tmp/swift.out")

    LOG.debug(cmd_list_str)
    for i in range(0, CONF.vault_retry_count):
        try:
            with open("/tmp/swift.out", "w") as f:
                subprocess.check_call(cmd_list, shell=False, stdout=f)
                break
        except Exception as ex:
            LOG.exception(ex)
            if i == CONF.vault_retry_count:
                raise ex


@autolog.log_method(logger=Logger)
def backup_config_files(context, progress_tracking_file_path, params):
    """copy all config path for given services on target backup media
       given by the user, For ex:

       service_to_backup :
           compute:
                  config_dir:
                     - /etc/nova
                     - /var/lib/nova

                  log_dir:
                     - /var/log/nova

           glance:
                 config_dir:
                     - /etc/glance
                     - /var/lib/glance

                 log_dir:
                     - /var/log/glance
    """
    try:
        host = params["host"]
        user = params["trusted_user"]["username"]
        authorized_key = params["authorized_key"]

        if params["target"] == "controller":
            remote_host = params.get("remote_host")
            host = remote_host["hostname"]

        service_to_backup = params.get("services_to_backup", None)
        if service_to_backup is not None:
            for service, config in service_to_backup.items():
                for config, dirs in config.items():
                    for dir_path in dirs:
                        if (
                            params["target"] == "compute"
                            and os.path.exists(dir_path) is False
                        ):
                            LOG.warning(
                                "Given backup path:%s for service: %s doesn't exist"
                                % (dir_path, service)
                            )
                            continue
                        else:
                            backup_directory = params["backup_directory"]
                            head, tail = os.path.split(dir_path)
                            if tail == "":
                                service_dir_path = os.path.split(head)[0]
                            else:
                                service_dir_path = head
                            backup_dest_path = os.path.join(
                                backup_directory,
                                service,
                                host,
                                service_dir_path[1:]
                                if service_dir_path.find("/") == 0
                                else service_dir_path,
                            )
                            utils.ensure_tree(backup_dest_path)
                            temp_dir = "/tmp/" + str(uuid.uuid4())
                            if params["target"] == "controller":
                                # Copy main dir content to temp dir
                                kwargs = {
                                    "authorized_key": authorized_key,
                                    "user": user,
                                    "host": host,
                                    "source_path": temp_dir,
                                    "dest_path": backup_dest_path,
                                }
                                ssh_cmd = utils.get_cmd_specs("ssh", **kwargs)
                                cmd1 = ssh_cmd + [
                                    "sudo ",
                                    "mkdir",
                                    temp_dir,
                                    "&&",  # Create temp directory
                                    # copy required content to
                                    # temp directory
                                    "sudo",
                                    "cp",
                                    "-r",
                                    dir_path,
                                    temp_dir,
                                    "&&",
                                    "sudo",
                                    "chown",
                                    "-R",
                                    user + ":" + user,
                                    temp_dir,
                                ]  # Change ownership of  files

                                # Copy files from temp dir to storage backend
                                cmd2 = utils.get_cmd_specs("scp", **kwargs)

                                # Remove temp folder
                                cmd3 = ssh_cmd + [
                                    "sudo",
                                    "rm",
                                    "-rf",
                                    temp_dir,
                                ]
                                for cmdspec in [cmd1, cmd2, cmd3]:
                                    cmd = " ".join(cmdspec)
                                    LOG.debug(("backup_config_files cmd %s " % cmd))
                                    process = utils.run_cmd(cmdspec)
                                    utils.poll_process(
                                        process,
                                        cmd,
                                        progress_file=progress_tracking_file_path,
                                    )
                            else:
                                cmdspec = [
                                    "sudo",
                                    "nova-rootwrap",
                                    CONF.rootwrap_config,
                                    "cp",
                                    "-R",
                                    dir_path,
                                    backup_dest_path,
                                ]
                                cmd = " ".join(cmdspec)
                                LOG.debug(("backup_config_files cmd %s " % cmd))
                                process = utils.run_cmd(cmdspec)
                                utils.poll_process(
                                    process,
                                    cmd,
                                    progress_file=progress_tracking_file_path,
                                )
    except Exception as ex:
        LOG.exception(ex)
        raise ex


@autolog.log_method(logger=Logger)
def backup_database(context, progress_tracking_file_path, params):
    """Take dump of given databases."""
    try:
        databases = params.get("databases", None)
        trusted_user = params["trusted_user"]["username"]
        authorized_key = params["authorized_key"]
        if databases is not None:
            for database, database_creds in databases.items():
                user = database_creds["user"]
                password = database_creds["password"]
                host = database_creds["host"]
                backup_directory = params["backup_directory"]
                backup_dest_path = os.path.join(
                    backup_directory,
                    host + "_" + str(int(time.time())) + "_database_dump.sql",
                )
                dump_file = open(backup_dest_path, "w")
                kwargs = {
                    "authorized_key": authorized_key,
                    "user": trusted_user,
                    "host": host,
                }
                mysql_dump = "mysqldump -u%s -p%s --all-databases" % (user, password,)
                ssh_cmd = utils.get_cmd_specs("ssh", **kwargs)
                cmdspec = ssh_cmd + [mysql_dump]
                cmd = " ".join(cmdspec)
                LOG.debug(("Taking backup of database cmd %s " % cmd))
                process = utils.run_cmd(cmdspec, stdout=dump_file)
                return_code = utils.poll_process(
                    process, cmd, progress_file=progress_tracking_file_path
                )
                if return_code:
                    message = "Error while taking database backup on node %s: %s" % (
                        host,
                        process.stderr.read(),
                    )
                    raise contego_exception.ErrorOccurred(reason=message)
    except Exception as ex:
        LOG.exception(ex)
        raise ex
    finally:
        if dump_file in locals():
            dump_file.close()