Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright (c) 2013-2017 Trilio Data, Inc.
# All Rights Reserved.

"""
Implementation of a backup target endpoint for TrilioVault
"""

import abc
import base64
import glob
import pickle
import json
import os
import io
import types
import time
from ctypes import *
import subprocess
from subprocess import check_output
import re
import shutil
import socket
import uuid
import threading
from urllib.parse import urlparse

from oslo_config import cfg

from workloadmgr.db import base
from workloadmgr import exception
from workloadmgr import utils
from workloadmgr.openstack.common import fileutils
from workloadmgr.openstack.common import log as logging
from workloadmgr.openstack.common import timeutils
from workloadmgr.db.workloadmgrdb import WorkloadMgrDB
from workloadmgr.virt import qemuimages
from workloadmgr import autolog
from workloadmgr.openstack.common.gettextutils import _
from os.path import isfile, isdir, join
from os import environ, walk, _exit as os_exit

from threading import Thread
from functools import wraps

from keystoneauth1.identity.generic import password as passMod
from keystoneauth1 import session
from keystoneclient import client

LOG = logging.getLogger(__name__)
Logger = autolog.Logger(LOG)

wlm_vault_opts = [
    cfg.StrOpt('vault_storage_type',
               default='none',
               help='Storage type: local, das, vault, nfs, swift-i, swift-s, s3'),
    # swift-i: integrated(keystone), swift-s: standalone
    cfg.StrOpt('vault_data_directory',
               default='/var/triliovault-mounts',
               help='Location where snapshots will be stored'),
    cfg.StrOpt('vault_data_directory_old',
               default='/var/triliovault',
               help='Legacy location where snapshots will be stored'),
    cfg.StrOpt('vault_storage_nfs_export',
               default='local',
               help='NFS Export'),
    cfg.StrOpt('vault_storage_nfs_options',
               default='nolock',
               help='NFS Options'),
    cfg.StrOpt('vault_storage_das_device',
               default='none',
               help='das device /dev/sdb'),
    cfg.StrOpt('vault_swift_auth_version',
               default='KEYSTONE_V2',
               help='KEYSTONE_V2 KEYSTONE_V3 TEMPAUTH'),
    cfg.StrOpt('vault_swift_auth_url',
               default='http://localhost:5000/v2.0',
               help='Keystone Authorization URL'),
    cfg.StrOpt('vault_swift_tenant',
               default='admin',
               help='Swift tenant'),
    cfg.StrOpt('vault_swift_username',
               default='admin',
               help='Swift username'),
    cfg.StrOpt('vault_swift_password',
               default='password',
               help='Swift password'),
    cfg.StrOpt('vault_swift_container_prefix',
               default='TrilioVault',
               help='Swift Container Prefix'),
    cfg.StrOpt('vault_swift_segment_size',
               # default='5368709120', 5GB
               default='524288000',  # 500MB
               help='Default segment size 500MB'),
    cfg.IntOpt('vault_retry_count',
               default=2,
               help='The number of times we retry on failures'),
    cfg.StrOpt('vault_swift_url_template',
               default='http://localhost:8080/v1/AUTH_%(project_id)s',
               help='The URL of the Swift endpoint'),
    cfg.StrOpt('vault_read_chunk_size_kb',
               default=128,
               help='Read size in KB'),
    cfg.StrOpt('vault_write_chunk_size_kb',
               default=32,
               help='Write size in KB'),
    cfg.StrOpt('trustee_role',
               default='Member',
               help='Role that trustee will impersonate'),
    cfg.StrOpt('triliovault_public_key',
               default='/etc/triliovault-wlm/triliovault.pub',
               help='Location where snapshots will be stored'),
    cfg.StrOpt('domain_name',
               default='default',
               help='cloud-admin user domain id'),
    cfg.StrOpt('triliovault_user_domain_id',
               default='default',
               help='triliovault user domain name'),
    cfg.StrOpt('keystone_auth_version',
               default='2.0',
               help='Keystone authentication version'),
    cfg.IntOpt('workload_full_backup_factor',
               default=50,
               help='The size of full backup compared to actual resource size in percentage'),
    cfg.IntOpt('workload_incr_backup_factor',
               default=10,
               help='The size of incremental backup compared to full backup in percentage'),
    cfg.BoolOpt('global_job_scheduler_override',
                default=False,
                help='If true, global job scheduler gets disabled irrespective of settings value'),
    cfg.StrOpt('cloud_admin_role',
               default='admin',
               help='Cloud admin role on admin tenant'),
]

CONF = cfg.CONF
CONF.register_opts(wlm_vault_opts)


def run_async(func):
    """
        run_async(func)
            function decorator, intended to make "func" run in a separate
            thread (asynchronously).
            Returns the created Thread object

            E.g.:
            @run_async
            def task1():
                do_something

            @run_async
            def task2():
                do_something_too

            t1 = task1()
            t2 = task2()
            ...
            t1.join()
            t2.join()
    """

    @wraps(func)
    def async_func(*args, **kwargs):
        func_hl = Thread(target=func, args=args, kwargs=kwargs)
        func_hl.start()
        return func_hl

    return async_func


class TrilioVaultBackupTarget(object, metaclass=abc.ABCMeta):

    def __init__(self, backupendpoint, backup_target_type, mountpath=None):
        self.__backup_endpoint = backupendpoint
        self.__backup_target_type = backup_target_type
        self.__mountpath = mountpath or backupendpoint

    @property
    def backup_endpoint(self):
        return self.__backup_endpoint

    @property
    def backup_target_type(self):
        return self.__backup_target_type

    @property
    def mount_path(self):
        return self.__mountpath

    def __str__(self):
        return "%s:%s" % (self.backup_target_type,
                          self.backup_endpoint)

    ###
    #   All path manipulation methods
    ###
    @abc.abstractmethod
    def get_progress_tracker_directory(self, tracker_metadata):
        """
        Get the location where all tracking objects are stored. The tracking
        object is a file on NFS. It can be object in object store
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def get_progress_tracker_path(self, tracker_metadata):
        """
        Get the path of the tracker object based on the tracker matadata.
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def get_workload_transfers_directory(self):
        """
        Get the path of the directory where transfer authentication keys are stored when
        transfering workload ownership between tenants of two different clouds
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def get_workload_transfers_path(self, transfers_metadata):
        """
        The absolute path of workload transfer file for the workload id
        defined in transfers_metadata
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def get_workload_path(self, workload_metadata):
        pass

    @abc.abstractmethod
    def get_snapshot_path(self, snapshot_metadata):
        pass

    @abc.abstractmethod
    def get_snapshot_vm_path(self, snapshot_vm_metadata):
        pass

    @abc.abstractmethod
    def get_snapshot_vm_resource_path(self, snapshot_vm_resource_metadata):
        pass

    @abc.abstractmethod
    def get_snapshot_vm_disk_resource_path(
            self, snapshot_vm_disk_resource_metadata):
        pass

    @abc.abstractmethod
    def get_restore_staging_path(self, restore_metadata):
        pass

    @abc.abstractmethod
    def get_restore_vm_staging_path(self, restore_vm_metadata):
        pass

    @abc.abstractmethod
    def get_restore_vm_resource_staging_path(
            self, restore_vm_resource_metadata):
        pass

    @abc.abstractmethod
    def get_restore_vm_disk_resource_staging_path(
            self, restore_vm_disk_resource_metadata):
        pass

    ##
    # purge staging area functions
    ##
    def purge_snapshot_from_staging_area(self, context, snapshot_metadata):
        try:
            directory = self.get_progress_tracker_directory(snapshot_metadata)
            if os.path.isdir(directory) is True:
                shutil.rmtree(directory)
        except Exception as ex:
            LOG.debug("Failed to remove snapshot having data: {}".format(snapshot_metadata))
            LOG.debug(ex)

    def purge_snapshot_vm_from_staging_area(
            self, context, snapshot_vm_metadata):
        pass

    def purge_snapshot_vm_resource_from_staging_area(
            self, context, snapshot_vm_resource_metadata):
        pass

    def purge_restore_vm_from_staging_area(self, context, restore_vm_metadata):
        pass

    def purge_restore_vm_resource_from_staging_area(
            self, context, restore_vm_resource_metadata):
        pass

    ##
    # backup target capabilities
    ##
    @abc.abstractmethod
    def commit_supported(self):
        pass

    ##
    # backup target capabilities
    ##
    @abc.abstractmethod
    def requires_staging(self):
        pass

    ##
    # backup target capabilities
    ##
    @abc.abstractmethod
    def tracking_supported(self):
        """
        Can the backup media can be used for maintaining progress
        tracking files for tracking various snapshot and upload
        operations between data movers and triliovault backup engines
        """
        pass

    ##
    # backup target availability status
    ##
    @abc.abstractmethod
    def is_online(self):
        pass

    @abc.abstractmethod
    def mount_backup_target(self):
        pass

    @abc.abstractmethod
    def get_total_capacity(self, context):
        """
        return total capacity of the backup target and
        amount of storage that is utilized
        """
        pass

    ##
    # object manipulation methods on the backup target
    ##

    ##
    # for workload transfers
    ##
    @abc.abstractmethod
    def get_all_workload_transfers(self):
        """
        List of workload transfers on this particular backup media
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def transfers_delete(self, context, transfers_metadata):
        """
        List of workload transfers on this particular backup media
        """
        raise NotImplementedError()

    ##
    # triliovault object (json) access methods
    @abc.abstractmethod
    def put_object(self, path, json_data):
        pass

    @abc.abstractmethod
    def get_object(self, path):
        pass

    @abc.abstractmethod
    def object_exists(self, path):
        pass

    @abc.abstractmethod
    def get_object_size(self, vault_path):
        pass

    @abc.abstractmethod
    def workload_delete(self, context, workload_metadata):
        pass

    @abc.abstractmethod
    def snapshot_delete(self, context, snapshot_metadata):
        pass

    ##
    # upload workloadmgr objects metadata functions
    ##
    def upload_snapshot_metatdata_to_object_store(self, context,
                                                  snapshot_metadata):
        pass

    def download_metadata_from_object_store(self, context):
        return 0


def ensure_mounted():
    '''Make sure NFS share is mounted at designated location. Otherwise
       throw exception '''

    def wrap(func):
        def new_function(*args, **kw):
            if args[0].is_mounted() == False:
                try:
                    args[0].mount_backup_target()
                except:
                    raise exception.InvalidNFSMountPoint(
                        reason="'%s' is not '%s' mounted" %
                           (args[0].mount_path, args[0].backup_endpoint))

            return func(*args, **kw)
        return new_function
    return wrap


def to_abs():
    '''convert the path to absolute path, it called with relative path'''

    def wrap(func):
        def wrap_to_abs(*args, **kw):
            path = args[1]
            if not os.path.isabs(path):
                path = os.path.join(args[0].mount_path, path)
            new_args = (args[0], path)
            new_args += args[2:]
            return func(*new_args, **kw)
        return wrap_to_abs
    return wrap


class NfsTrilioVaultBackupTarget(TrilioVaultBackupTarget):
    def __init__(self, backupendpoint):
        if CONF.vault_storage_type == 'nfs':
            base64encode = base64.b64encode(str.encode(
                            urlparse(backupendpoint).path)).decode()
            mountpath = os.path.join(CONF.vault_data_directory,
                                     base64encode)
            self.umount_backup_target_object_store()
            fileutils.ensure_tree(mountpath)
            self.__mountpath = mountpath
            super(
                NfsTrilioVaultBackupTarget,
                self).__init__(
                backupendpoint,
                "nfs",
                mountpath=mountpath)
            if not self.is_mounted():
                utils.chmod(mountpath, '0740')

        elif (CONF.vault_storage_type.lower() == 'swift-s' or
              CONF.vault_storage_type.lower() == 's3'):
            mountpath = CONF.vault_data_directory
            self.__mountpath = mountpath
            super(
                NfsTrilioVaultBackupTarget,
                self).__init__(
                backupendpoint,
                CONF.vault_storage_type.lower(),
                mountpath=mountpath)

    def get_progress_tracker_directory(self, tracker_metadata):
        """
        Get the location where all tracking objects are stored. The tracking
        object is a file on NFS. It can be object in object store
        """
        mountpath = self.mount_path
        progress_tracker_directory = os.path.join(
            mountpath, "contego_tasks", 'snapshot_%s' %
            (tracker_metadata['snapshot_id']))

        return progress_tracker_directory

    def get_progress_tracker_path(self, tracker_metadata):
        """
        Get the path of the tracker object based on the tracker matadata.
        """
        progress_tracker_directory = self.get_progress_tracker_directory(
            tracker_metadata)
        if progress_tracker_directory:
            progress_tracking_file_path = os.path.join(
                progress_tracker_directory,
                tracker_metadata['resource_id'])
            return progress_tracking_file_path
        else:
            return None

    def read_progress_tracking_file(self, progress_tracking_path):
        '''
        cmd:- dd if=/var/trilio/triliovault-mounts/contego_tasks/snapshot_18116db5-95f5-45fd-a6d2-a8e557121ba9/48abf568-5571-444d-b4c8-aa8e698f38be iflag=direct
        REsponse:- 10.10.10.121
                   In Progress
                   Completed
                   Completed
                   0+1 records in
                   0+1 records out
        '''
        try:
            # call dd command ##
            LOG.debug('Reading Progress Tracking file: {}'.format(progress_tracking_path))
            p = subprocess.Popen(['dd',
                                'if={}'.format(progress_tracking_path),
                                'iflag=direct'],
                                stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE)
            stdout, stderr = p.communicate()
            if p.returncode == 0:
                LOG.debug('Reading Progress Tracking File Completed Successfully.')
                return stdout.decode('utf-8').split('\n')
            else:
                LOG.debug('Error occurred while reading progress tracking file. {}'.format(p.stderr.decode('utf-8')))
                return ['']
        except:
            LOG.warning("Could not read progress tracking file : %s" % progress_tracking_path)
            return ['']

    def get_workload_transfers_directory(self):
        """
        Get the path of the directory where transfer authentication keys are stored when
        transfering workload ownership between tenants of two different clouds
        """
        workload_transfers_directory = os.path.join(self.mount_path,
                                                    "workload_transfers")

        fileutils.ensure_tree(workload_transfers_directory)
        utils.chmod(workload_transfers_directory, '0740')
        return workload_transfers_directory

    def get_workload_transfers_path(self, transfers_metadata):
        """
        The absolute path of workload transfer file for the workload id
        defined in transfers_metadata
        """
        workload_transfers_directory = self.get_workload_transfers_directory()
        if workload_transfers_directory:
            workload_transfers_file_path = os.path.join(
                workload_transfers_directory,
                transfers_metadata['workload_id'])
            return workload_transfers_file_path
        else:
            return None

    @ensure_mounted()
    def get_workload_path(self, workload_metadata):
        workload_path = os.path.join(
            self.mount_path, 'workload_%s' %
            (workload_metadata['workload_id']))
        return workload_path

    @ensure_mounted()
    def get_config_workload_path(self):
        config_workload_path = os.path.join(
            self.mount_path, CONF.cloud_unique_id, 'config_workload')
        return config_workload_path

    def get_config_backup_path(self, backup_id):
        workload_path = self.get_config_workload_path()
        backup_path = os.path.join(workload_path, 'backup_%s' % (backup_id))
        return backup_path

    def get_snapshot_path(self, snapshot_metadata):
        workload_path = self.get_workload_path(snapshot_metadata)
        snapshot_path = os.path.join(
            workload_path, 'snapshot_%s' %
            (snapshot_metadata['snapshot_id']))
        return snapshot_path

    def get_snapshot_vm_path(self, snapshot_vm_metadata):
        snapshot_path = self.get_snapshot_path(snapshot_vm_metadata)
        snapshot_vm_path = os.path.join(
            snapshot_path, 'vm_id_%s' %
            (snapshot_vm_metadata['snapshot_vm_id']))
        return snapshot_vm_path

    def get_snapshot_vm_resource_path(self, snapshot_vm_resource_metadata):
        snapshot_vm_path = self.get_snapshot_vm_path(
            snapshot_vm_resource_metadata)
        snapshot_vm_resource_path = os.path.join(
            snapshot_vm_path,
            'vm_res_id_%s_%s' %
            (snapshot_vm_resource_metadata['snapshot_vm_resource_id'],
             snapshot_vm_resource_metadata['snapshot_vm_resource_name'].replace(' ', '')))

        return snapshot_vm_resource_path

    def get_snapshot_vm_disk_resource_path(
            self, snapshot_vm_disk_resource_metadata):
        snapshot_vm_resource_path = \
            self.get_snapshot_vm_resource_path(
                snapshot_vm_disk_resource_metadata)
        snapshot_vm_disk_resource_path = os.path.join(
            snapshot_vm_resource_path,
            snapshot_vm_disk_resource_metadata['vm_disk_resource_snap_id'])

        return snapshot_vm_disk_resource_path

    def get_restore_staging_path(self, restore_metadata):
        vault_data_directory = os.path.join(self.mount_path,
                                            "staging",
                                            socket.gethostname())
        restore_staging_path = os.path.join(
            vault_data_directory, 'restore_%s' %
            (restore_metadata['restore_id']))
        return restore_staging_path

    def get_restore_vm_staging_path(self, restore_vm_metadata):
        restore_staging_path = self.get_restore_staging_path(
            restore_vm_metadata)
        restore_vm_staging_path = os.path.join(
            restore_staging_path, 'vm_id_%s' %
            (restore_vm_metadata['snapshot_vm_id']))
        return restore_vm_staging_path

    def get_restore_vm_resource_staging_path(
            self, restore_vm_resource_metadata):
        restore_vm_staging_path = self.get_restore_vm_staging_path(
            restore_vm_resource_metadata)
        restore_vm_resource_staging_path = os.path.join(
            restore_vm_staging_path,
            'vm_res_id_%s_%s' %
            (restore_vm_resource_metadata['snapshot_vm_resource_id'],
             restore_vm_resource_metadata['snapshot_vm_resource_name'].replace(
             ' ',
             '')))
        return restore_vm_resource_staging_path

    def get_restore_vm_disk_resource_staging_path(
            self, restore_vm_disk_resource_metadata):
        restore_vm_resource_staging_path = self.get_restore_vm_resource_staging_path(
            restore_vm_disk_resource_metadata)
        restore_vm_disk_resource_staging_path = os.path.join(
            restore_vm_resource_staging_path,
            restore_vm_disk_resource_metadata['vm_disk_resource_snap_id'])
        return restore_vm_disk_resource_staging_path

    @ensure_mounted()
    def get_policy_path(self):
        policy_path = os.path.join(
            self.mount_path, CONF.cloud_unique_id, 'workload_policy')
        return policy_path

    def remove_directory(self, path):
        try:
            if os.path.isdir(path):
                shutil.rmtree(path)
        except Exception as ex:
            raise ex

    ##
    # backup target capabilities
    ##
    def commit_supported(self):
        return True

    ##
    # backup target capabilities
    ##
    def requires_staging(self):
        return False

    def tracking_supported(self):
        return True

    ##
    # backup target availability status
    ##
    @autolog.log_method(logger=Logger)
    def is_online(self):
        status = False
        try:
            nfsshare = self.backup_endpoint
            nfsserver = nfsshare.split(":/")[0]
            nfsserver = nfsserver.split('[')[1] if '[' in nfsserver else nfsserver
            nfsserver = nfsserver.split(']')[0] if ']' in nfsserver else nfsserver

            rpcinfo = utils.execute("rpcinfo", "-s", nfsserver)

            for i in rpcinfo[0].split("\n")[1:]:
                if len(i.split()) and i.split()[3] in ['mountd', 'nfs']:
                    status = True
                    break
        except Exception as ex:
            LOG.debug("Failed to verify backup endpoint status: {}".format(self.backup_endpoint))
            LOG.debug(ex)

        return status

    @autolog.log_method(logger=Logger)
    def is_mounted(self):
        '''Make sure backup endpoint is mounted at mount_path'''
        mountpath = self.mount_path
        nfsshare = self.backup_endpoint

        if not os.path.ismount(mountpath):
            return False

        with open('/proc/mounts', 'r') as f:
            mounts = [{line.split()[1]: line.split()[0]}
                      for line in f.readlines() if line.split()[1] == mountpath]

        return len(mounts) and mounts[0].get(mountpath, None) == nfsshare

    def umount_backup_target_object_store(self):
        try:
            command = ['sudo', CONF.wlm_rootwrap, CONF.rootwrap_config, 'umount', '-f', CONF.vault_data_directory]
            subprocess.call(command, shell=False)
        except Exception as exception:
            LOG.exception(exception)

    @autolog.log_method(logger=Logger)
    def umount_backup_target(self):
        nfsshare = self.backup_endpoint
        mountpath = self.mount_path

        """ mounts storage """
        try:
            command = ['sudo', CONF.wlm_rootwrap, CONF.rootwrap_config, 'umount', nfsshare]
            subprocess.call(command, shell=False)
        except Exception as exception:
            LOG.exception(exception)

        try:
            command = ['sudo', CONF.wlm_rootwrap, CONF.rootwrap_config, 'umount', nfsshare]
            subprocess.call(command, shell=False)
        except Exception as exception:
            LOG.exception(exception)

        try:
            command = ['sudo', CONF.wlm_rootwrap, CONF.rootwrap_config, 'umount', '-l', nfsshare]
            subprocess.call(command, shell=False)
        except Exception as exception:
            LOG.exception(exception)

    @autolog.log_method(logger=Logger)
    def mount_backup_target(self, old_share=False):
        self.umount_backup_target()

        nfsshare = self.backup_endpoint
        mountpath = self.mount_path
        nfsoptions = CONF.vault_storage_nfs_options

        if self.is_online():
            command = ['timeout', '-sKILL', '30', 'sudo', CONF.wlm_rootwrap, CONF.rootwrap_config,
                       'mount', '-o', nfsoptions, nfsshare,
                       mountpath]
            subprocess.check_call(command, shell=False)
            if old_share is True:
                command = ['timeout', '-sKILL', '30', 'sudo', CONF.wlm_rootwrap, CONF.rootwrap_config,
                           'mount', '--bind', mountpath,
                           CONF.vault_data_directory_old]
                subprocess.check_call(command, shell=False)
        else:
            raise exception.BackupTargetOffline(endpoint=nfsshare)

    @autolog.log_method(logger=Logger)
    def get_total_capacity(self, context):
        """
        return total capacity of the backup target and
        amount of storage that is utilized
        """
        total_capacity = 1
        total_utilization = 1
        try:
            mountpath = self.mount_path
            nfsshare = self.backup_endpoint
            stdout, stderr = utils.execute('df', mountpath)
            if stderr != '':
                msg = _(
                    'Could not execute df command successfully. Error %s'), (stderr)
                raise exception.ErrorOccurred(reason=msg)

            # Filesystem     1K-blocks      Used Available Use% Mounted on
            # /dev/sda1      464076568 248065008 192431096  57% /

            fields = stdout.split('\n')[0].split()
            values = stdout.split('\n')[1].split()

            total_capacity = int(values[1]) * 1024
            # Used entry in df command is not reliable indicator. Hence we use
            # size - available as total utilization
            total_utilization = total_capacity - int(values[3]) * 1024
            """
            try:
                stdout, stderr = utils.execute('du', '-shb', mountpath, run_as_root=True)
                if stderr != '':
                    msg = _('Could not execute du command successfully. Error %s'), (stderr)
                    raise exception.ErrorOccurred(reason=msg)
                #196022926557    /var/triliovault
                du_values = stdout.split()
                total_utilization = int(du_values[0])
            except Exception as ex:
                LOG.exception(ex)
            """
        except Exception as ex:
            LOG.debug("Failed to fetch details of mount path: {}".format(self.mount_path))
            LOG.debug(ex)

        return total_capacity, total_utilization

    ##
    # object manipulation methods on the backup target
    ##

    ##
    # for workload transfers
    ##
    def get_all_workload_transfers(self):
        """
        List of workload transfers on this particular backup media
        """
        workload_transfers_directory = self.get_workload_transfers_directory()
        transfers = []
        if workload_transfers_directory:
            pattern = os.path.join(workload_transfers_directory, "*")
            for transfer_file in glob.glob(pattern):
                tran = json.loads(self.get_object(transfer_file))
                transfers.append(tran)

        return transfers

    @autolog.log_method(logger=Logger)
    def transfers_delete(self, context, transfers_metadata):
        """
        List of workload transfers on this particular backup media
        """
        try:
            transfer_path = self.get_workload_transfers_path(
                transfers_metadata)
            if isfile(transfer_path):
                os.remove(transfer_path)
        except Exception as ex:
            LOG.debug("Failed to delete workload data from: {}".format(transfers_metadata))
            LOG.debug(ex)

    ##
    # triliovault object (json) access methods
    @to_abs()
    @autolog.log_method(logger=Logger)
    def put_object(self, path, json_data):
        head, tail = os.path.split(path)
        fileutils.ensure_tree(head)
        with open(path, 'w') as json_file:
            json_file.write(json_data)
        return

    @to_abs()
    @autolog.log_method(logger=Logger)
    def get_object(self, path):
        with open(path, 'r') as json_file:
            return json_file.read()

    @to_abs()
    def object_exists(self, path):
        return os.path.isfile(path)

    @to_abs()
    def get_object_size(self, path):
        size = 0
        try:
            statinfo = os.stat(path)
            size = statinfo.st_size
        except Exception as ex:
            LOG.debug("Path: {} may not be accessible for now".format(path))
            LOG.debug(ex)
        return size

    @autolog.log_method(logger=Logger)
    def get_workloads(self, context):
        self.download_metadata_from_object_store(context)
        parent_path = self.mount_path
        workload_urls = []
        try:
            for name in os.listdir(parent_path):
                if os.path.isdir(os.path.join(parent_path, name)
                                 ) and name.startswith('workload_'):
                    workload_urls.append(os.path.join(parent_path, name))
        except Exception as ex:
            LOG.debug("Failed to fetch workload, with error: {}".format(ex))
        return workload_urls

    @autolog.log_method(logger=Logger)
    def workload_delete(self, context, workload_metadata):
        workload_path = None
        try:
            workload_path = self.get_workload_path(workload_metadata)
            self.remove_directory(workload_path)
        except Exception as ex:
            LOG.debug("Failed to delete workload, at path: {}".format(workload_path))
            LOG.debug(ex)

    @autolog.log_method(logger=Logger)
    def snapshot_delete(self, context, snapshot_metadata):
        try:
            snapshot_path = self.get_snapshot_path(snapshot_metadata)
            self.remove_directory(snapshot_path)
        except Exception as ex:
            LOG.debug("Failed to delete snapshot data from: {}".format(snapshot_metadata))
            LOG.debug(ex)

    @autolog.log_method(logger=Logger)
    def config_backup_delete(self, context, backup_id):
        try:
            backup_path = self.get_config_backup_path(backup_id)
            self.remove_directory(backup_path)
        except Exception as ex:
            LOG.debug("Failed to delete config backup for id: {}".format(backup_id))
            LOG.debug(ex)

    @autolog.log_method(logger=Logger)
    def policy_delete(self, context, policy_id):
        try:
            policy_path = self.get_policy_path()
            policy_path = os.path.join(
                policy_path, 'policy' + '_' + str(policy_id))
            if isfile(policy_path):
                os.remove(policy_path)
        except Exception as ex:
            LOG.debug("Failed to delete policy data for id: {}".format(policy_id))
            LOG.debug(ex)

    ##
    # Object specific operations
    @autolog.log_method(logger=Logger)
    def _update_workload_ownership_on_media(self, context, workload_id):
        try:
            workload_path = self.get_workload_path(
                {'workload_id': workload_id})

            def _update_metadata_file(pathname):
                metadata = json.loads(self.get_object(pathname))

                metadata['user_id'] = context.user_id
                metadata['project_id'] = context.project_id

                self.put_object(pathname, json.dumps(metadata))

            for snap in glob.glob(os.path.join(workload_path, "snapshot_*")):
                _update_metadata_file(os.path.join(snap, "snapshot_db"))

            _update_metadata_file(os.path.join(workload_path, "workload_db"))

        except Exception as ex:
            LOG.exception(ex)
            raise


class ObjectStoreTrilioVaultBackupTarget(NfsTrilioVaultBackupTarget):
    def __init__(self, backupendpoint):
        super(ObjectStoreTrilioVaultBackupTarget, self).__init__(backupendpoint)

    def _delete_path(self, path):
        retry = 0
        while os.path.isdir(path):
            try:
                command = ['rm', '-rf', path]
                subprocess.check_call(command, shell=False)
            except BaseException:
                pass
            retry += 1
            if retry >= 1:
                break

    def remove_directory(self, path):
        self._delete_path(path)

    @autolog.log_method(logger=Logger)
    def get_progress_tracker_directory(self, tracker_metadata):
        """
        Get the location where all tracking objects are stored. The tracking
        object is a file on NFS. It can be object in object store
        """
        mountpath = self.mount_path
        progress_tracker_directory = os.path.join(
            mountpath, "contego_tasks", 'snapshot_%s' %
            (tracker_metadata['snapshot_id']))

        fileutils.ensure_tree(progress_tracker_directory)
        return progress_tracker_directory

    @autolog.log_method(logger=Logger)
    def mount_backup_target(self, old_share=False):
        LOG.error("Cannot mount backup target, verify whether s3-fuse-plugin is working or not")
        raise Exception("Cannot mount backup target")

    @autolog.log_method(logger=Logger)
    def is_online(self):
        return self.is_mounted()

    @autolog.log_method(logger=Logger)
    def umount_backup_target(self):
        LOG.error("Cannot unmount backup target, verify whether s3-fuse-plugin is working or not")
        raise Exception("Cannot unmount backup target")

    @to_abs()
    @autolog.log_method(logger=Logger)
    def put_object(self, path, json_data):
        head, tail = os.path.split(path)
        fileutils.ensure_tree(head)
        try:
            with open(path, 'w') as json_file:
                json_file.write(json_data)
        except BaseException:
            with open(path, 'w') as json_file:
                json_file.write(json_data)
        return

    @autolog.log_method(logger=Logger)
    def snapshot_delete(self, context, snapshot_metadata):
        try:
            snapshot_path = self.get_snapshot_path(snapshot_metadata)
            self._delete_path(snapshot_path)
        except Exception as ex:
            LOG.debug("Failed to delete snapshot data from backend with matching data: {}".format(snapshot_metadata))
            LOG.debug(ex)

    @autolog.log_method(logger=Logger)
    def config_backup_delete(self, context, backup_id):
        try:
            backup_path = self.get_config_backup_path(backup_id)
            self._delete_path(backup_path)
        except Exception as ex:
            LOG.debug("Failed to delete config backup with id: {}".format(backup_id))
            LOG.debug(ex)

    @autolog.log_method(logger=Logger)
    def get_total_capacity(self, context):
        """
        return total capacity of the backup target and
        amount of storage that is utilized
        """
        total_capacity = 1
        total_utilization = 1
        try:
            mountpath = self.mount_path
            stdout, stderr = utils.execute('stat', '-f', mountpath)
            if stderr != '':
                msg = _(
                    'Could not execute stat command successfully. Error %s'), (stderr)
                raise exception.ErrorOccurred(reason=msg)
            total_capacity = int(
                stdout.split('\n')[3].split('Blocks:')[1].split(' ')[2])
            try:
                total_free = int(
                    stdout.split('\n')[3].split('Blocks:')[1].split(' ')[4])
            except BaseException:
                total_free = int(stdout.split('\n')[3].split(
                    'Blocks:')[1].split('Available: ')[1])
            total_utilization = abs(total_capacity - total_free)

        except Exception as ex:
            LOG.debug("Failed to fetch details of mount: {}".format(self.mount_path))
            LOG.debug(ex)

        return total_capacity, total_utilization


triliovault_backup_targets = {}


@autolog.log_method(logger=Logger)
def mount_backup_media():
    for idx, backup_target in enumerate(
            CONF.vault_storage_nfs_export.split(',')):
        backup_target = backup_target.strip()
        if backup_target == '':
            continue
        if CONF.vault_storage_type.lower() == 'nfs':
            backend = NfsTrilioVaultBackupTarget(backup_target)
        elif (CONF.vault_storage_type.lower() == 'swift-s' or
              CONF.vault_storage_type.lower() == 's3'):
            backend = ObjectStoreTrilioVaultBackupTarget(backup_target)

        triliovault_backup_targets[backup_target] = backend
        if not backend.is_mounted():
            LOG.debug("Trying to mount the backup target, since mount point does not exists.")
            backend.mount_backup_target()

def get_backup_target(backup_endpoint):
    backup_endpoint = backup_endpoint.strip()
    backup_target = triliovault_backup_targets.get(backup_endpoint, None)

    if backup_target is None:
        mount_backup_media()
        backup_target = triliovault_backup_targets.get(backup_endpoint, None)

    return backup_target


def get_settings_backup_target():
    settings_path_new = os.path.join(CONF.cloud_unique_id, "settings_db")
    for backup_endpoint in CONF.vault_storage_nfs_export.split(','):
        get_backup_target(backup_endpoint.strip())
    for endpoint, backup_target in list(triliovault_backup_targets.items()):
        if backup_target.object_exists(settings_path_new):
            return (backup_target, settings_path_new)

    list(triliovault_backup_targets.values())[0].put_object(settings_path_new,
                                                      json.dumps([]))
    settings_path = "settings_db"
    for endpoint, backup_target in list(triliovault_backup_targets.items()):
        if backup_target.object_exists(settings_path):
            return (backup_target, settings_path)

    return (list(triliovault_backup_targets.values())[0], settings_path_new)


def get_allowed_quota_backup_target():
    allowed_quota_path_new = os.path.join(CONF.cloud_unique_id, "allowed_quota_db")
    for backup_endpoint in CONF.vault_storage_nfs_export.split(','):
        get_backup_target(backup_endpoint.strip())
    for endpoint, backup_target in list(triliovault_backup_targets.items()):
        if backup_target.object_exists(allowed_quota_path_new):
            return backup_target, allowed_quota_path_new

    if triliovault_backup_targets:
        list(triliovault_backup_targets.values())[0].put_object(
            allowed_quota_path_new, json.dumps([]))
    allowed_quota_path = "allowed_quota_db"
    for endpoint, backup_target in list(triliovault_backup_targets.items()):
        if backup_target.object_exists(allowed_quota_path):
            return backup_target, allowed_quota_path

    return list(triliovault_backup_targets.values())[0], allowed_quota_path_new


def get_capacities_utilizations(context):
    def fill_capacity_utilization(context, backup_target, stats):
        nfsshare = backup_target.backup_endpoint
        cap, util = backup_target.get_total_capacity(context)

        stats[nfsshare] = {'total_capacity': cap,
                           'total_utilization': util,
                           'nfsstatus': True}

    stats = {}
    threads = []
    for nfsshare in CONF.vault_storage_nfs_export.split(','):
        nfsshare = nfsshare.strip()
        backup_target = get_backup_target(nfsshare)
        nfsstatus = backup_target.is_online()

        stats[nfsshare] = {'total_capacity': -1,
                           'total_utilization': -1,
                           'nfsstatus': nfsstatus}
        if nfsstatus is True:
            t = threading.Thread(target=fill_capacity_utilization,
                                 args=[context, backup_target, stats])
            t.start()
            threads.append(t)

    for t in threads:
        t.join()

    return stats


def get_workloads(context):
    workloads = []

    for backup_endpoint in CONF.vault_storage_nfs_export.split(','):
        get_backup_target(backup_endpoint)

    for endpoint, backup_target in list(triliovault_backup_targets.items()):
        workloads += backup_target.get_workloads(context)

    return workloads


def validate_workload(workload_url):
    if os.path.isdir(workload_url) and os.path.exists(
            os.path.join(workload_url, "workload_db")):
        return True
    else:
        return False


def get_all_workload_transfers(context):
    transfers = []

    for backup_endpoint in CONF.vault_storage_nfs_export.split(','):
        get_backup_target(backup_endpoint)

    for endpoint, backup_target in list(triliovault_backup_targets.items()):
        transfers += backup_target.get_all_workload_transfers()

    return transfers


def get_nfs_share_for_workload_by_free_overcommit(context, workload):
    """
       workload is a dict with id, name, description and metadata.
       metadata includes size of the workload and approximate backup storage needed
       to hold all backups
    """

    shares = {}
    caps = get_capacities_utilizations(context)
    for endpoint, backend in list(triliovault_backup_targets.items()):
        if caps[endpoint]['nfsstatus'] is False:
            continue
        shares[endpoint] = {
            'noofworkloads': 0,
            'totalcommitted': 0,
            'endpoint': endpoint,
            'capacity': caps[endpoint]['total_capacity'],
            'used': caps[endpoint]['total_utilization']
        }
    if len(shares) == 0:
        raise exception.InvalidState(reason="No NFS shares mounted")

    # if only one nfs share is configured, then return that share
    if len(shares) == 1:
        return list(shares.keys())[0]

    for endpoint, values in list(shares.items()):
        base64encode = base64.b64encode(str.encode(
                        urlparse(endpoint).path)).decode()
        mountpath = os.path.join(CONF.vault_data_directory, base64encode)
        for w in os.listdir(mountpath):
            try:
                if 'workload_' not in w:
                    continue
                workload_path = os.path.join(mountpath, w)
                with open(os.path.join(workload_path, "workload_db"), "r") as f:
                    wjson = json.load(f)
                values['noofworkloads'] += 1
                workload_approx_backup_size = 0

                for meta in wjson['metadata']:
                    if meta['key'] == 'workload_approx_backup_size':
                        workload_approx_backup_size = int(meta['value'])

                if workload_approx_backup_size == 0:
                    workload_backup_media_size = 0
                    for result in glob.iglob(os.path.join(
                            workload_path, 'snapshot_*/snapshot_db')):
                        with open(result, "r") as snaprecf:
                            snaprec = json.load(snaprecf)
                        if snaprec['snapshot_type'] == "full":
                            workload_backup_media_size = snaprec['size'] / \
                                                         1024 / 1024 / 1024

                    # workload_backup_media_size is in GBs
                    workload_backup_media_size = workload_backup_media_size or 10
                    jobschedule = pickle.loads(bytes(wjson['jobschedule'], 'utf-8'))
                    if jobschedule['retention_policy_type'] == 'Number of Snapshots to Keep':
                        incrs = int(jobschedule['retention_policy_value'])
                    else:
                        jobsperday = int(
                            jobschedule['interval'].split("hr")[0])
                        incrs = int(
                            jobschedule['retention_policy_value']) * jobsperday

                    if jobschedule['fullbackup_interval'] == '-1':
                        fulls = 1
                    else:
                        fulls = incrs / int(jobschedule['fullbackup_interval'])
                        incrs = incrs - fulls

                    workload_approx_backup_size = \
                        (fulls * workload_backup_media_size * CONF.workload_full_backup_factor +
                         incrs * workload_backup_media_size * CONF.workload_incr_backup_factor) / 100
                    values['totalcommitted'] += workload_approx_backup_size * \
                                                1024 * 1024 * 1024
                else:
                    values['totalcommitted'] += workload_approx_backup_size * \
                                                1024 * 1024 * 1024
            except Exception as ex:
                LOG.debug("Failed fetch data of backend from mount path: {}".format(mountpath))
                LOG.debug(ex)

    def getKey(item):
        item['free'] = item['capacity'] - item['totalcommitted']
        return min(item['capacity'] - item['totalcommitted'],
                   item['capacity'] - item['used'])

    sortedlist = sorted(list(shares.values()), reverse=True, key=getKey)

    return sortedlist[0]['endpoint']


def get_workloads_for_tenant(context, tenant_ids):
    workload_ids = []
    for backup_endpoint in CONF.vault_storage_nfs_export.split(','):
        backup_target = None
        try:
            backup_target = get_backup_target(backup_endpoint)
            for workload_url in backup_target.get_workloads(context):
                workload_values = json.loads(backup_target.get_object(
                    os.path.join(workload_url, 'workload_db')))
                project_id = workload_values.get('project_id')
                workload_id = workload_values.get('id')
                if project_id in tenant_ids:
                    workload_ids.append(workload_id)
        except Exception as ex:
            LOG.exception(ex)
    return workload_ids


def update_workload_db(context, workloads_to_update, new_tenant_id, user_id):
    workload_urls = []
    jobscheduler_map = {}

    try:
        # Get list of workload directory path for workloads need to update
        for workload_id in workloads_to_update:
            for backup_endpoint in CONF.vault_storage_nfs_export.split(','):
                backup_target = None
                backup_target = get_backup_target(backup_endpoint)
                workload_url = os.path.join(
                    backup_target.mount_path, "workload_" + workload_id)
                if os.path.isdir(workload_url):
                    workload_urls.append(workload_url)
                    break

        # Iterate through each workload directory and update workload_db and
        # snapsot_db with new values
        for workload_path in workload_urls:
            for path, subdirs, files in os.walk(workload_path):
                for name in files:
                    if name.endswith("snapshot_db") or name.endswith(
                            "workload_db") or name.endswith("network_topology_db"):
                        LOG.debug("Updating %s" % os.path.join(path, name))
                        with open(os.path.join(path, name), 'r') as db_file:
                            db_values = json.loads(db_file.read())

                        if name.endswith("network_topology_db"):
                            for resource in db_values:
                                for metadata in resource['metadata']:
                                    if metadata['key'] == 'json_data':
                                        metadata_json = json.loads(metadata['value'])
                                        metadata_json['tenant_id'] = new_tenant_id
                                        metadata_json['project_id'] = new_tenant_id
                                    metadata['value'] = json.dumps(metadata_json)
                        else:
                            if db_values.get('project_id', None) is not None:
                                db_values['project_id'] = new_tenant_id
                            else:
                                db_values['tenant_id'] = new_tenant_id
                            db_values['user_id'] = user_id

                            if db_values.get('jobschedule', None) is not None:
                                jobschedule = pickle.loads(bytes(
                                    db_values['jobschedule'], 'utf-8'))

                                if jobschedule.get('appliance_timezone'):
                                    jobschedule['timezone'] = jobschedule['appliance_timezone']
                                    jobschedule = utils.convert_jobschedule_date_tz(jobschedule)
                                if jobschedule.get('timezone') and jobschedule['timezone'] != 'UTC':
                                   jobschedule = utils.convert_jobschedule_date_tz(jobschedule)

                                if jobschedule['enabled'] is True:
                                    jobschedule['enabled'] = False
                                    db_values['jobschedule'] = str(pickle.dumps(jobschedule, 0), 'utf-8')
                                jobscheduler_map[db_values['id']
                                ] = db_values['jobschedule']
                        try:
                            with open(os.path.join(path, name), 'w+') as fil:
                                json.dump(db_values, fil)
                        except Exception as ex:
                            time.sleep(2)
                            with open(os.path.join(path, name), 'w+') as fil:
                                json.dump(db_values, fil)
        return jobscheduler_map

    except Exception as ex:
        LOG.debug("Error occurred while updating data for workloads: {}".format(workloads_to_update))
        LOG.debug(ex)


def create_backup_directory(context, services, backup_directory_path):
    try:
        fileutils.ensure_tree(backup_directory_path)
        for service, config_path in services.items():
            service_name = service
            fileutils.ensure_tree(
                os.path.join(
                    backup_directory_path,
                    service_name))
    except Exception as ex:
        LOG.debug("Failed to create backup directory: {}".format(backup_directory_path))
        LOG.debug(ex)


def get_directory_size(path):
    try:
        cmd = ['du', '-sc', path]
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
        out, err = p.communicate()
        return int(out.split('\t')[0]) * 1024
    except Exception as ex:
        LOG.debug("Failed to fetch size of directory: {}".format(path))
        LOG.debug(ex)


def get_key_file(key_data, temp=False):
    try:
        backup_target, path = get_settings_backup_target()
        config_workload_path = backup_target.get_config_workload_path()
        if temp is True:
            file_path = os.path.join(
                config_workload_path, "authorized_key_temp")
        else:
            file_path = os.path.join(config_workload_path, "authorized_key")
        backup_target.put_object(file_path, key_data)
        os.chmod(file_path, 0o600)
        return file_path
    except Exception as ex:
        LOG.exception(ex)
        raise ex


"""
if __name__ == '__main__':
    nfsbackend =

    nfsbackend.umount_backup_target()
    nfsbackend.mount_backup_target()
    workload_path = nfsbackend.get_workload_path({'workload_id': str(uuid.uuid4())})
    print(workload_path)
    import pdb;pdb.set_trace()
    print(nfsbackend.get_total_capacity(None))
    nfsbackend.umount_backup_target()
    workload_path = nfsbackend.get_workload_path({'workload_id': str(uuid.uuid4())})
    print(workload_path)
"""