Repository URL to install this package:
Version:
5.0.6.dev10 ▾
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013-2017 Trilio Data, Inc.
# All Rights Reserved.
"""
Implementation of a backup target endpoint for TrilioVault
"""
import abc
import base64
import glob
import pickle
import json
import os
import io
import types
import time
from ctypes import *
import subprocess
from subprocess import check_output
import re
import shutil
import socket
import uuid
import threading
from urllib.parse import urlparse
from oslo_config import cfg
from workloadmgr.db import base
from workloadmgr import exception
from workloadmgr import utils
from workloadmgr.openstack.common import fileutils
from workloadmgr.openstack.common import log as logging
from workloadmgr.openstack.common import timeutils
from workloadmgr.db.workloadmgrdb import WorkloadMgrDB
from workloadmgr.virt import qemuimages
from workloadmgr import autolog
from workloadmgr.openstack.common.gettextutils import _
from os.path import isfile, isdir, join
from os import environ, walk, _exit as os_exit
from threading import Thread
from functools import wraps
from keystoneauth1.identity.generic import password as passMod
from keystoneauth1 import session
from keystoneclient import client
LOG = logging.getLogger(__name__)
Logger = autolog.Logger(LOG)
wlm_vault_opts = [
cfg.StrOpt('vault_storage_type',
default='none',
help='Storage type: local, das, vault, nfs, swift-i, swift-s, s3'),
# swift-i: integrated(keystone), swift-s: standalone
cfg.StrOpt('vault_data_directory',
default='/var/triliovault-mounts',
help='Location where snapshots will be stored'),
cfg.StrOpt('vault_data_directory_old',
default='/var/triliovault',
help='Legacy location where snapshots will be stored'),
cfg.StrOpt('vault_storage_nfs_export',
default='local',
help='NFS Export'),
cfg.StrOpt('vault_storage_nfs_options',
default='nolock',
help='NFS Options'),
cfg.StrOpt('vault_storage_das_device',
default='none',
help='das device /dev/sdb'),
cfg.StrOpt('vault_swift_auth_version',
default='KEYSTONE_V2',
help='KEYSTONE_V2 KEYSTONE_V3 TEMPAUTH'),
cfg.StrOpt('vault_swift_auth_url',
default='http://localhost:5000/v2.0',
help='Keystone Authorization URL'),
cfg.StrOpt('vault_swift_tenant',
default='admin',
help='Swift tenant'),
cfg.StrOpt('vault_swift_username',
default='admin',
help='Swift username'),
cfg.StrOpt('vault_swift_password',
default='password',
help='Swift password'),
cfg.StrOpt('vault_swift_container_prefix',
default='TrilioVault',
help='Swift Container Prefix'),
cfg.StrOpt('vault_swift_segment_size',
# default='5368709120', 5GB
default='524288000', # 500MB
help='Default segment size 500MB'),
cfg.IntOpt('vault_retry_count',
default=2,
help='The number of times we retry on failures'),
cfg.StrOpt('vault_swift_url_template',
default='http://localhost:8080/v1/AUTH_%(project_id)s',
help='The URL of the Swift endpoint'),
cfg.StrOpt('vault_read_chunk_size_kb',
default=128,
help='Read size in KB'),
cfg.StrOpt('vault_write_chunk_size_kb',
default=32,
help='Write size in KB'),
cfg.StrOpt('trustee_role',
default='Member',
help='Role that trustee will impersonate'),
cfg.StrOpt('triliovault_public_key',
default='/etc/triliovault-wlm/triliovault.pub',
help='Location where snapshots will be stored'),
cfg.StrOpt('domain_name',
default='default',
help='cloud-admin user domain id'),
cfg.StrOpt('triliovault_user_domain_id',
default='default',
help='triliovault user domain name'),
cfg.StrOpt('keystone_auth_version',
default='2.0',
help='Keystone authentication version'),
cfg.IntOpt('workload_full_backup_factor',
default=50,
help='The size of full backup compared to actual resource size in percentage'),
cfg.IntOpt('workload_incr_backup_factor',
default=10,
help='The size of incremental backup compared to full backup in percentage'),
cfg.BoolOpt('global_job_scheduler_override',
default=False,
help='If true, global job scheduler gets disabled irrespective of settings value'),
cfg.StrOpt('cloud_admin_role',
default='admin',
help='Cloud admin role on admin tenant'),
]
CONF = cfg.CONF
CONF.register_opts(wlm_vault_opts)
def run_async(func):
"""
run_async(func)
function decorator, intended to make "func" run in a separate
thread (asynchronously).
Returns the created Thread object
E.g.:
@run_async
def task1():
do_something
@run_async
def task2():
do_something_too
t1 = task1()
t2 = task2()
...
t1.join()
t2.join()
"""
@wraps(func)
def async_func(*args, **kwargs):
func_hl = Thread(target=func, args=args, kwargs=kwargs)
func_hl.start()
return func_hl
return async_func
class TrilioVaultBackupTarget(object, metaclass=abc.ABCMeta):
def __init__(self, backupendpoint, backup_target_type, mountpath=None):
self.__backup_endpoint = backupendpoint
self.__backup_target_type = backup_target_type
self.__mountpath = mountpath or backupendpoint
@property
def backup_endpoint(self):
return self.__backup_endpoint
@property
def backup_target_type(self):
return self.__backup_target_type
@property
def mount_path(self):
return self.__mountpath
def __str__(self):
return "%s:%s" % (self.backup_target_type,
self.backup_endpoint)
###
# All path manipulation methods
###
@abc.abstractmethod
def get_progress_tracker_directory(self, tracker_metadata):
"""
Get the location where all tracking objects are stored. The tracking
object is a file on NFS. It can be object in object store
"""
raise NotImplementedError()
@abc.abstractmethod
def get_progress_tracker_path(self, tracker_metadata):
"""
Get the path of the tracker object based on the tracker matadata.
"""
raise NotImplementedError()
@abc.abstractmethod
def get_workload_transfers_directory(self):
"""
Get the path of the directory where transfer authentication keys are stored when
transfering workload ownership between tenants of two different clouds
"""
raise NotImplementedError()
@abc.abstractmethod
def get_workload_transfers_path(self, transfers_metadata):
"""
The absolute path of workload transfer file for the workload id
defined in transfers_metadata
"""
raise NotImplementedError()
@abc.abstractmethod
def get_workload_path(self, workload_metadata):
pass
@abc.abstractmethod
def get_snapshot_path(self, snapshot_metadata):
pass
@abc.abstractmethod
def get_snapshot_vm_path(self, snapshot_vm_metadata):
pass
@abc.abstractmethod
def get_snapshot_vm_resource_path(self, snapshot_vm_resource_metadata):
pass
@abc.abstractmethod
def get_snapshot_vm_disk_resource_path(
self, snapshot_vm_disk_resource_metadata):
pass
@abc.abstractmethod
def get_restore_staging_path(self, restore_metadata):
pass
@abc.abstractmethod
def get_restore_vm_staging_path(self, restore_vm_metadata):
pass
@abc.abstractmethod
def get_restore_vm_resource_staging_path(
self, restore_vm_resource_metadata):
pass
@abc.abstractmethod
def get_restore_vm_disk_resource_staging_path(
self, restore_vm_disk_resource_metadata):
pass
##
# purge staging area functions
##
def purge_snapshot_from_staging_area(self, context, snapshot_metadata):
try:
directory = self.get_progress_tracker_directory(snapshot_metadata)
if os.path.isdir(directory) is True:
shutil.rmtree(directory)
except Exception as ex:
LOG.debug("Failed to remove snapshot having data: {}".format(snapshot_metadata))
LOG.debug(ex)
def purge_snapshot_vm_from_staging_area(
self, context, snapshot_vm_metadata):
pass
def purge_snapshot_vm_resource_from_staging_area(
self, context, snapshot_vm_resource_metadata):
pass
def purge_restore_vm_from_staging_area(self, context, restore_vm_metadata):
pass
def purge_restore_vm_resource_from_staging_area(
self, context, restore_vm_resource_metadata):
pass
##
# backup target capabilities
##
@abc.abstractmethod
def commit_supported(self):
pass
##
# backup target capabilities
##
@abc.abstractmethod
def requires_staging(self):
pass
##
# backup target capabilities
##
@abc.abstractmethod
def tracking_supported(self):
"""
Can the backup media can be used for maintaining progress
tracking files for tracking various snapshot and upload
operations between data movers and triliovault backup engines
"""
pass
##
# backup target availability status
##
@abc.abstractmethod
def is_online(self):
pass
@abc.abstractmethod
def mount_backup_target(self):
pass
@abc.abstractmethod
def get_total_capacity(self, context):
"""
return total capacity of the backup target and
amount of storage that is utilized
"""
pass
##
# object manipulation methods on the backup target
##
##
# for workload transfers
##
@abc.abstractmethod
def get_all_workload_transfers(self):
"""
List of workload transfers on this particular backup media
"""
raise NotImplementedError()
@abc.abstractmethod
def transfers_delete(self, context, transfers_metadata):
"""
List of workload transfers on this particular backup media
"""
raise NotImplementedError()
##
# triliovault object (json) access methods
@abc.abstractmethod
def put_object(self, path, json_data):
pass
@abc.abstractmethod
def get_object(self, path):
pass
@abc.abstractmethod
def object_exists(self, path):
pass
@abc.abstractmethod
def get_object_size(self, vault_path):
pass
@abc.abstractmethod
def workload_delete(self, context, workload_metadata):
pass
@abc.abstractmethod
def snapshot_delete(self, context, snapshot_metadata):
pass
##
# upload workloadmgr objects metadata functions
##
def upload_snapshot_metatdata_to_object_store(self, context,
snapshot_metadata):
pass
def download_metadata_from_object_store(self, context):
return 0
def ensure_mounted():
'''Make sure NFS share is mounted at designated location. Otherwise
throw exception '''
def wrap(func):
def new_function(*args, **kw):
if args[0].is_mounted() == False:
try:
args[0].mount_backup_target()
except:
raise exception.InvalidNFSMountPoint(
reason="'%s' is not '%s' mounted" %
(args[0].mount_path, args[0].backup_endpoint))
return func(*args, **kw)
return new_function
return wrap
def to_abs():
'''convert the path to absolute path, it called with relative path'''
def wrap(func):
def wrap_to_abs(*args, **kw):
path = args[1]
if not os.path.isabs(path):
path = os.path.join(args[0].mount_path, path)
new_args = (args[0], path)
new_args += args[2:]
return func(*new_args, **kw)
return wrap_to_abs
return wrap
class NfsTrilioVaultBackupTarget(TrilioVaultBackupTarget):
def __init__(self, backupendpoint):
if CONF.vault_storage_type == 'nfs':
base64encode = base64.b64encode(str.encode(
urlparse(backupendpoint).path)).decode()
mountpath = os.path.join(CONF.vault_data_directory,
base64encode)
self.umount_backup_target_object_store()
fileutils.ensure_tree(mountpath)
self.__mountpath = mountpath
super(
NfsTrilioVaultBackupTarget,
self).__init__(
backupendpoint,
"nfs",
mountpath=mountpath)
if not self.is_mounted():
utils.chmod(mountpath, '0740')
elif (CONF.vault_storage_type.lower() == 'swift-s' or
CONF.vault_storage_type.lower() == 's3'):
mountpath = CONF.vault_data_directory
self.__mountpath = mountpath
super(
NfsTrilioVaultBackupTarget,
self).__init__(
backupendpoint,
CONF.vault_storage_type.lower(),
mountpath=mountpath)
def get_progress_tracker_directory(self, tracker_metadata):
"""
Get the location where all tracking objects are stored. The tracking
object is a file on NFS. It can be object in object store
"""
mountpath = self.mount_path
progress_tracker_directory = os.path.join(
mountpath, "contego_tasks", 'snapshot_%s' %
(tracker_metadata['snapshot_id']))
return progress_tracker_directory
def get_progress_tracker_path(self, tracker_metadata):
"""
Get the path of the tracker object based on the tracker matadata.
"""
progress_tracker_directory = self.get_progress_tracker_directory(
tracker_metadata)
if progress_tracker_directory:
progress_tracking_file_path = os.path.join(
progress_tracker_directory,
tracker_metadata['resource_id'])
return progress_tracking_file_path
else:
return None
def read_progress_tracking_file(self, progress_tracking_path):
'''
cmd:- dd if=/var/trilio/triliovault-mounts/contego_tasks/snapshot_18116db5-95f5-45fd-a6d2-a8e557121ba9/48abf568-5571-444d-b4c8-aa8e698f38be iflag=direct
REsponse:- 10.10.10.121
In Progress
Completed
Completed
0+1 records in
0+1 records out
'''
try:
# call dd command ##
LOG.debug('Reading Progress Tracking file: {}'.format(progress_tracking_path))
p = subprocess.Popen(['dd',
'if={}'.format(progress_tracking_path),
'iflag=direct'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode == 0:
LOG.debug('Reading Progress Tracking File Completed Successfully.')
return stdout.decode('utf-8').split('\n')
else:
LOG.debug('Error occurred while reading progress tracking file. {}'.format(p.stderr.decode('utf-8')))
return ['']
except:
LOG.warning("Could not read progress tracking file : %s" % progress_tracking_path)
return ['']
def get_workload_transfers_directory(self):
"""
Get the path of the directory where transfer authentication keys are stored when
transfering workload ownership between tenants of two different clouds
"""
workload_transfers_directory = os.path.join(self.mount_path,
"workload_transfers")
fileutils.ensure_tree(workload_transfers_directory)
utils.chmod(workload_transfers_directory, '0740')
return workload_transfers_directory
def get_workload_transfers_path(self, transfers_metadata):
"""
The absolute path of workload transfer file for the workload id
defined in transfers_metadata
"""
workload_transfers_directory = self.get_workload_transfers_directory()
if workload_transfers_directory:
workload_transfers_file_path = os.path.join(
workload_transfers_directory,
transfers_metadata['workload_id'])
return workload_transfers_file_path
else:
return None
@ensure_mounted()
def get_workload_path(self, workload_metadata):
workload_path = os.path.join(
self.mount_path, 'workload_%s' %
(workload_metadata['workload_id']))
return workload_path
@ensure_mounted()
def get_config_workload_path(self):
config_workload_path = os.path.join(
self.mount_path, CONF.cloud_unique_id, 'config_workload')
return config_workload_path
def get_config_backup_path(self, backup_id):
workload_path = self.get_config_workload_path()
backup_path = os.path.join(workload_path, 'backup_%s' % (backup_id))
return backup_path
def get_snapshot_path(self, snapshot_metadata):
workload_path = self.get_workload_path(snapshot_metadata)
snapshot_path = os.path.join(
workload_path, 'snapshot_%s' %
(snapshot_metadata['snapshot_id']))
return snapshot_path
def get_snapshot_vm_path(self, snapshot_vm_metadata):
snapshot_path = self.get_snapshot_path(snapshot_vm_metadata)
snapshot_vm_path = os.path.join(
snapshot_path, 'vm_id_%s' %
(snapshot_vm_metadata['snapshot_vm_id']))
return snapshot_vm_path
def get_snapshot_vm_resource_path(self, snapshot_vm_resource_metadata):
snapshot_vm_path = self.get_snapshot_vm_path(
snapshot_vm_resource_metadata)
snapshot_vm_resource_path = os.path.join(
snapshot_vm_path,
'vm_res_id_%s_%s' %
(snapshot_vm_resource_metadata['snapshot_vm_resource_id'],
snapshot_vm_resource_metadata['snapshot_vm_resource_name'].replace(' ', '')))
return snapshot_vm_resource_path
def get_snapshot_vm_disk_resource_path(
self, snapshot_vm_disk_resource_metadata):
snapshot_vm_resource_path = \
self.get_snapshot_vm_resource_path(
snapshot_vm_disk_resource_metadata)
snapshot_vm_disk_resource_path = os.path.join(
snapshot_vm_resource_path,
snapshot_vm_disk_resource_metadata['vm_disk_resource_snap_id'])
return snapshot_vm_disk_resource_path
def get_restore_staging_path(self, restore_metadata):
vault_data_directory = os.path.join(self.mount_path,
"staging",
socket.gethostname())
restore_staging_path = os.path.join(
vault_data_directory, 'restore_%s' %
(restore_metadata['restore_id']))
return restore_staging_path
def get_restore_vm_staging_path(self, restore_vm_metadata):
restore_staging_path = self.get_restore_staging_path(
restore_vm_metadata)
restore_vm_staging_path = os.path.join(
restore_staging_path, 'vm_id_%s' %
(restore_vm_metadata['snapshot_vm_id']))
return restore_vm_staging_path
def get_restore_vm_resource_staging_path(
self, restore_vm_resource_metadata):
restore_vm_staging_path = self.get_restore_vm_staging_path(
restore_vm_resource_metadata)
restore_vm_resource_staging_path = os.path.join(
restore_vm_staging_path,
'vm_res_id_%s_%s' %
(restore_vm_resource_metadata['snapshot_vm_resource_id'],
restore_vm_resource_metadata['snapshot_vm_resource_name'].replace(
' ',
'')))
return restore_vm_resource_staging_path
def get_restore_vm_disk_resource_staging_path(
self, restore_vm_disk_resource_metadata):
restore_vm_resource_staging_path = self.get_restore_vm_resource_staging_path(
restore_vm_disk_resource_metadata)
restore_vm_disk_resource_staging_path = os.path.join(
restore_vm_resource_staging_path,
restore_vm_disk_resource_metadata['vm_disk_resource_snap_id'])
return restore_vm_disk_resource_staging_path
@ensure_mounted()
def get_policy_path(self):
policy_path = os.path.join(
self.mount_path, CONF.cloud_unique_id, 'workload_policy')
return policy_path
def remove_directory(self, path):
try:
if os.path.isdir(path):
shutil.rmtree(path)
except Exception as ex:
raise ex
##
# backup target capabilities
##
def commit_supported(self):
return True
##
# backup target capabilities
##
def requires_staging(self):
return False
def tracking_supported(self):
return True
##
# backup target availability status
##
@autolog.log_method(logger=Logger)
def is_online(self):
status = False
try:
nfsshare = self.backup_endpoint
nfsserver = nfsshare.split(":/")[0]
nfsserver = nfsserver.split('[')[1] if '[' in nfsserver else nfsserver
nfsserver = nfsserver.split(']')[0] if ']' in nfsserver else nfsserver
rpcinfo = utils.execute("rpcinfo", "-s", nfsserver)
for i in rpcinfo[0].split("\n")[1:]:
if len(i.split()) and i.split()[3] in ['mountd', 'nfs']:
status = True
break
except Exception as ex:
LOG.debug("Failed to verify backup endpoint status: {}".format(self.backup_endpoint))
LOG.debug(ex)
return status
@autolog.log_method(logger=Logger)
def is_mounted(self):
'''Make sure backup endpoint is mounted at mount_path'''
mountpath = self.mount_path
nfsshare = self.backup_endpoint
if not os.path.ismount(mountpath):
return False
with open('/proc/mounts', 'r') as f:
mounts = [{line.split()[1]: line.split()[0]}
for line in f.readlines() if line.split()[1] == mountpath]
return len(mounts) and mounts[0].get(mountpath, None) == nfsshare
def umount_backup_target_object_store(self):
try:
command = ['sudo', CONF.wlm_rootwrap, CONF.rootwrap_config, 'umount', '-f', CONF.vault_data_directory]
subprocess.call(command, shell=False)
except Exception as exception:
LOG.exception(exception)
@autolog.log_method(logger=Logger)
def umount_backup_target(self):
nfsshare = self.backup_endpoint
mountpath = self.mount_path
""" mounts storage """
try:
command = ['sudo', CONF.wlm_rootwrap, CONF.rootwrap_config, 'umount', nfsshare]
subprocess.call(command, shell=False)
except Exception as exception:
LOG.exception(exception)
try:
command = ['sudo', CONF.wlm_rootwrap, CONF.rootwrap_config, 'umount', nfsshare]
subprocess.call(command, shell=False)
except Exception as exception:
LOG.exception(exception)
try:
command = ['sudo', CONF.wlm_rootwrap, CONF.rootwrap_config, 'umount', '-l', nfsshare]
subprocess.call(command, shell=False)
except Exception as exception:
LOG.exception(exception)
@autolog.log_method(logger=Logger)
def mount_backup_target(self, old_share=False):
self.umount_backup_target()
nfsshare = self.backup_endpoint
mountpath = self.mount_path
nfsoptions = CONF.vault_storage_nfs_options
if self.is_online():
command = ['timeout', '-sKILL', '30', 'sudo', CONF.wlm_rootwrap, CONF.rootwrap_config,
'mount', '-o', nfsoptions, nfsshare,
mountpath]
subprocess.check_call(command, shell=False)
if old_share is True:
command = ['timeout', '-sKILL', '30', 'sudo', CONF.wlm_rootwrap, CONF.rootwrap_config,
'mount', '--bind', mountpath,
CONF.vault_data_directory_old]
subprocess.check_call(command, shell=False)
else:
raise exception.BackupTargetOffline(endpoint=nfsshare)
@autolog.log_method(logger=Logger)
def get_total_capacity(self, context):
"""
return total capacity of the backup target and
amount of storage that is utilized
"""
total_capacity = 1
total_utilization = 1
try:
mountpath = self.mount_path
nfsshare = self.backup_endpoint
stdout, stderr = utils.execute('df', mountpath)
if stderr != '':
msg = _(
'Could not execute df command successfully. Error %s'), (stderr)
raise exception.ErrorOccurred(reason=msg)
# Filesystem 1K-blocks Used Available Use% Mounted on
# /dev/sda1 464076568 248065008 192431096 57% /
fields = stdout.split('\n')[0].split()
values = stdout.split('\n')[1].split()
total_capacity = int(values[1]) * 1024
# Used entry in df command is not reliable indicator. Hence we use
# size - available as total utilization
total_utilization = total_capacity - int(values[3]) * 1024
"""
try:
stdout, stderr = utils.execute('du', '-shb', mountpath, run_as_root=True)
if stderr != '':
msg = _('Could not execute du command successfully. Error %s'), (stderr)
raise exception.ErrorOccurred(reason=msg)
#196022926557 /var/triliovault
du_values = stdout.split()
total_utilization = int(du_values[0])
except Exception as ex:
LOG.exception(ex)
"""
except Exception as ex:
LOG.debug("Failed to fetch details of mount path: {}".format(self.mount_path))
LOG.debug(ex)
return total_capacity, total_utilization
##
# object manipulation methods on the backup target
##
##
# for workload transfers
##
def get_all_workload_transfers(self):
"""
List of workload transfers on this particular backup media
"""
workload_transfers_directory = self.get_workload_transfers_directory()
transfers = []
if workload_transfers_directory:
pattern = os.path.join(workload_transfers_directory, "*")
for transfer_file in glob.glob(pattern):
tran = json.loads(self.get_object(transfer_file))
transfers.append(tran)
return transfers
@autolog.log_method(logger=Logger)
def transfers_delete(self, context, transfers_metadata):
"""
List of workload transfers on this particular backup media
"""
try:
transfer_path = self.get_workload_transfers_path(
transfers_metadata)
if isfile(transfer_path):
os.remove(transfer_path)
except Exception as ex:
LOG.debug("Failed to delete workload data from: {}".format(transfers_metadata))
LOG.debug(ex)
##
# triliovault object (json) access methods
@to_abs()
@autolog.log_method(logger=Logger)
def put_object(self, path, json_data):
head, tail = os.path.split(path)
fileutils.ensure_tree(head)
with open(path, 'w') as json_file:
json_file.write(json_data)
return
@to_abs()
@autolog.log_method(logger=Logger)
def get_object(self, path):
with open(path, 'r') as json_file:
return json_file.read()
@to_abs()
def object_exists(self, path):
return os.path.isfile(path)
@to_abs()
def get_object_size(self, path):
size = 0
try:
statinfo = os.stat(path)
size = statinfo.st_size
except Exception as ex:
LOG.debug("Path: {} may not be accessible for now".format(path))
LOG.debug(ex)
return size
@autolog.log_method(logger=Logger)
def get_workloads(self, context):
self.download_metadata_from_object_store(context)
parent_path = self.mount_path
workload_urls = []
try:
for name in os.listdir(parent_path):
if os.path.isdir(os.path.join(parent_path, name)
) and name.startswith('workload_'):
workload_urls.append(os.path.join(parent_path, name))
except Exception as ex:
LOG.debug("Failed to fetch workload, with error: {}".format(ex))
return workload_urls
@autolog.log_method(logger=Logger)
def workload_delete(self, context, workload_metadata):
workload_path = None
try:
workload_path = self.get_workload_path(workload_metadata)
self.remove_directory(workload_path)
except Exception as ex:
LOG.debug("Failed to delete workload, at path: {}".format(workload_path))
LOG.debug(ex)
@autolog.log_method(logger=Logger)
def snapshot_delete(self, context, snapshot_metadata):
try:
snapshot_path = self.get_snapshot_path(snapshot_metadata)
self.remove_directory(snapshot_path)
except Exception as ex:
LOG.debug("Failed to delete snapshot data from: {}".format(snapshot_metadata))
LOG.debug(ex)
@autolog.log_method(logger=Logger)
def config_backup_delete(self, context, backup_id):
try:
backup_path = self.get_config_backup_path(backup_id)
self.remove_directory(backup_path)
except Exception as ex:
LOG.debug("Failed to delete config backup for id: {}".format(backup_id))
LOG.debug(ex)
@autolog.log_method(logger=Logger)
def policy_delete(self, context, policy_id):
try:
policy_path = self.get_policy_path()
policy_path = os.path.join(
policy_path, 'policy' + '_' + str(policy_id))
if isfile(policy_path):
os.remove(policy_path)
except Exception as ex:
LOG.debug("Failed to delete policy data for id: {}".format(policy_id))
LOG.debug(ex)
##
# Object specific operations
@autolog.log_method(logger=Logger)
def _update_workload_ownership_on_media(self, context, workload_id):
try:
workload_path = self.get_workload_path(
{'workload_id': workload_id})
def _update_metadata_file(pathname):
metadata = json.loads(self.get_object(pathname))
metadata['user_id'] = context.user_id
metadata['project_id'] = context.project_id
self.put_object(pathname, json.dumps(metadata))
for snap in glob.glob(os.path.join(workload_path, "snapshot_*")):
_update_metadata_file(os.path.join(snap, "snapshot_db"))
_update_metadata_file(os.path.join(workload_path, "workload_db"))
except Exception as ex:
LOG.exception(ex)
raise
class ObjectStoreTrilioVaultBackupTarget(NfsTrilioVaultBackupTarget):
def __init__(self, backupendpoint):
super(ObjectStoreTrilioVaultBackupTarget, self).__init__(backupendpoint)
def _delete_path(self, path):
retry = 0
while os.path.isdir(path):
try:
command = ['rm', '-rf', path]
subprocess.check_call(command, shell=False)
except BaseException:
pass
retry += 1
if retry >= 1:
break
def remove_directory(self, path):
self._delete_path(path)
@autolog.log_method(logger=Logger)
def get_progress_tracker_directory(self, tracker_metadata):
"""
Get the location where all tracking objects are stored. The tracking
object is a file on NFS. It can be object in object store
"""
mountpath = self.mount_path
progress_tracker_directory = os.path.join(
mountpath, "contego_tasks", 'snapshot_%s' %
(tracker_metadata['snapshot_id']))
fileutils.ensure_tree(progress_tracker_directory)
return progress_tracker_directory
@autolog.log_method(logger=Logger)
def mount_backup_target(self, old_share=False):
LOG.error("Cannot mount backup target, verify whether s3-fuse-plugin is working or not")
raise Exception("Cannot mount backup target")
@autolog.log_method(logger=Logger)
def is_online(self):
return self.is_mounted()
@autolog.log_method(logger=Logger)
def umount_backup_target(self):
LOG.error("Cannot unmount backup target, verify whether s3-fuse-plugin is working or not")
raise Exception("Cannot unmount backup target")
@to_abs()
@autolog.log_method(logger=Logger)
def put_object(self, path, json_data):
head, tail = os.path.split(path)
fileutils.ensure_tree(head)
try:
with open(path, 'w') as json_file:
json_file.write(json_data)
except BaseException:
with open(path, 'w') as json_file:
json_file.write(json_data)
return
@autolog.log_method(logger=Logger)
def snapshot_delete(self, context, snapshot_metadata):
try:
snapshot_path = self.get_snapshot_path(snapshot_metadata)
self._delete_path(snapshot_path)
except Exception as ex:
LOG.debug("Failed to delete snapshot data from backend with matching data: {}".format(snapshot_metadata))
LOG.debug(ex)
@autolog.log_method(logger=Logger)
def config_backup_delete(self, context, backup_id):
try:
backup_path = self.get_config_backup_path(backup_id)
self._delete_path(backup_path)
except Exception as ex:
LOG.debug("Failed to delete config backup with id: {}".format(backup_id))
LOG.debug(ex)
@autolog.log_method(logger=Logger)
def get_total_capacity(self, context):
"""
return total capacity of the backup target and
amount of storage that is utilized
"""
total_capacity = 1
total_utilization = 1
try:
mountpath = self.mount_path
stdout, stderr = utils.execute('stat', '-f', mountpath)
if stderr != '':
msg = _(
'Could not execute stat command successfully. Error %s'), (stderr)
raise exception.ErrorOccurred(reason=msg)
total_capacity = int(
stdout.split('\n')[3].split('Blocks:')[1].split(' ')[2])
try:
total_free = int(
stdout.split('\n')[3].split('Blocks:')[1].split(' ')[4])
except BaseException:
total_free = int(stdout.split('\n')[3].split(
'Blocks:')[1].split('Available: ')[1])
total_utilization = abs(total_capacity - total_free)
except Exception as ex:
LOG.debug("Failed to fetch details of mount: {}".format(self.mount_path))
LOG.debug(ex)
return total_capacity, total_utilization
triliovault_backup_targets = {}
@autolog.log_method(logger=Logger)
def mount_backup_media():
for idx, backup_target in enumerate(
CONF.vault_storage_nfs_export.split(',')):
backup_target = backup_target.strip()
if backup_target == '':
continue
if CONF.vault_storage_type.lower() == 'nfs':
backend = NfsTrilioVaultBackupTarget(backup_target)
elif (CONF.vault_storage_type.lower() == 'swift-s' or
CONF.vault_storage_type.lower() == 's3'):
backend = ObjectStoreTrilioVaultBackupTarget(backup_target)
triliovault_backup_targets[backup_target] = backend
if not backend.is_mounted():
LOG.debug("Trying to mount the backup target, since mount point does not exists.")
backend.mount_backup_target()
def get_backup_target(backup_endpoint):
backup_endpoint = backup_endpoint.strip()
backup_target = triliovault_backup_targets.get(backup_endpoint, None)
if backup_target is None:
mount_backup_media()
backup_target = triliovault_backup_targets.get(backup_endpoint, None)
return backup_target
def get_settings_backup_target():
settings_path_new = os.path.join(CONF.cloud_unique_id, "settings_db")
for backup_endpoint in CONF.vault_storage_nfs_export.split(','):
get_backup_target(backup_endpoint.strip())
for endpoint, backup_target in list(triliovault_backup_targets.items()):
if backup_target.object_exists(settings_path_new):
return (backup_target, settings_path_new)
list(triliovault_backup_targets.values())[0].put_object(settings_path_new,
json.dumps([]))
settings_path = "settings_db"
for endpoint, backup_target in list(triliovault_backup_targets.items()):
if backup_target.object_exists(settings_path):
return (backup_target, settings_path)
return (list(triliovault_backup_targets.values())[0], settings_path_new)
def get_allowed_quota_backup_target():
allowed_quota_path_new = os.path.join(CONF.cloud_unique_id, "allowed_quota_db")
for backup_endpoint in CONF.vault_storage_nfs_export.split(','):
get_backup_target(backup_endpoint.strip())
for endpoint, backup_target in list(triliovault_backup_targets.items()):
if backup_target.object_exists(allowed_quota_path_new):
return backup_target, allowed_quota_path_new
if triliovault_backup_targets:
list(triliovault_backup_targets.values())[0].put_object(
allowed_quota_path_new, json.dumps([]))
allowed_quota_path = "allowed_quota_db"
for endpoint, backup_target in list(triliovault_backup_targets.items()):
if backup_target.object_exists(allowed_quota_path):
return backup_target, allowed_quota_path
return list(triliovault_backup_targets.values())[0], allowed_quota_path_new
def get_capacities_utilizations(context):
def fill_capacity_utilization(context, backup_target, stats):
nfsshare = backup_target.backup_endpoint
cap, util = backup_target.get_total_capacity(context)
stats[nfsshare] = {'total_capacity': cap,
'total_utilization': util,
'nfsstatus': True}
stats = {}
threads = []
for nfsshare in CONF.vault_storage_nfs_export.split(','):
nfsshare = nfsshare.strip()
backup_target = get_backup_target(nfsshare)
nfsstatus = backup_target.is_online()
stats[nfsshare] = {'total_capacity': -1,
'total_utilization': -1,
'nfsstatus': nfsstatus}
if nfsstatus is True:
t = threading.Thread(target=fill_capacity_utilization,
args=[context, backup_target, stats])
t.start()
threads.append(t)
for t in threads:
t.join()
return stats
def get_workloads(context):
workloads = []
for backup_endpoint in CONF.vault_storage_nfs_export.split(','):
get_backup_target(backup_endpoint)
for endpoint, backup_target in list(triliovault_backup_targets.items()):
workloads += backup_target.get_workloads(context)
return workloads
def validate_workload(workload_url):
if os.path.isdir(workload_url) and os.path.exists(
os.path.join(workload_url, "workload_db")):
return True
else:
return False
def get_all_workload_transfers(context):
transfers = []
for backup_endpoint in CONF.vault_storage_nfs_export.split(','):
get_backup_target(backup_endpoint)
for endpoint, backup_target in list(triliovault_backup_targets.items()):
transfers += backup_target.get_all_workload_transfers()
return transfers
def get_nfs_share_for_workload_by_free_overcommit(context, workload):
"""
workload is a dict with id, name, description and metadata.
metadata includes size of the workload and approximate backup storage needed
to hold all backups
"""
shares = {}
caps = get_capacities_utilizations(context)
for endpoint, backend in list(triliovault_backup_targets.items()):
if caps[endpoint]['nfsstatus'] is False:
continue
shares[endpoint] = {
'noofworkloads': 0,
'totalcommitted': 0,
'endpoint': endpoint,
'capacity': caps[endpoint]['total_capacity'],
'used': caps[endpoint]['total_utilization']
}
if len(shares) == 0:
raise exception.InvalidState(reason="No NFS shares mounted")
# if only one nfs share is configured, then return that share
if len(shares) == 1:
return list(shares.keys())[0]
for endpoint, values in list(shares.items()):
base64encode = base64.b64encode(str.encode(
urlparse(endpoint).path)).decode()
mountpath = os.path.join(CONF.vault_data_directory, base64encode)
for w in os.listdir(mountpath):
try:
if 'workload_' not in w:
continue
workload_path = os.path.join(mountpath, w)
with open(os.path.join(workload_path, "workload_db"), "r") as f:
wjson = json.load(f)
values['noofworkloads'] += 1
workload_approx_backup_size = 0
for meta in wjson['metadata']:
if meta['key'] == 'workload_approx_backup_size':
workload_approx_backup_size = int(meta['value'])
if workload_approx_backup_size == 0:
workload_backup_media_size = 0
for result in glob.iglob(os.path.join(
workload_path, 'snapshot_*/snapshot_db')):
with open(result, "r") as snaprecf:
snaprec = json.load(snaprecf)
if snaprec['snapshot_type'] == "full":
workload_backup_media_size = snaprec['size'] / \
1024 / 1024 / 1024
# workload_backup_media_size is in GBs
workload_backup_media_size = workload_backup_media_size or 10
jobschedule = pickle.loads(bytes(wjson['jobschedule'], 'utf-8'))
if jobschedule['retention_policy_type'] == 'Number of Snapshots to Keep':
incrs = int(jobschedule['retention_policy_value'])
else:
jobsperday = int(
jobschedule['interval'].split("hr")[0])
incrs = int(
jobschedule['retention_policy_value']) * jobsperday
if jobschedule['fullbackup_interval'] == '-1':
fulls = 1
else:
fulls = incrs / int(jobschedule['fullbackup_interval'])
incrs = incrs - fulls
workload_approx_backup_size = \
(fulls * workload_backup_media_size * CONF.workload_full_backup_factor +
incrs * workload_backup_media_size * CONF.workload_incr_backup_factor) / 100
values['totalcommitted'] += workload_approx_backup_size * \
1024 * 1024 * 1024
else:
values['totalcommitted'] += workload_approx_backup_size * \
1024 * 1024 * 1024
except Exception as ex:
LOG.debug("Failed fetch data of backend from mount path: {}".format(mountpath))
LOG.debug(ex)
def getKey(item):
item['free'] = item['capacity'] - item['totalcommitted']
return min(item['capacity'] - item['totalcommitted'],
item['capacity'] - item['used'])
sortedlist = sorted(list(shares.values()), reverse=True, key=getKey)
return sortedlist[0]['endpoint']
def get_workloads_for_tenant(context, tenant_ids):
workload_ids = []
for backup_endpoint in CONF.vault_storage_nfs_export.split(','):
backup_target = None
try:
backup_target = get_backup_target(backup_endpoint)
for workload_url in backup_target.get_workloads(context):
workload_values = json.loads(backup_target.get_object(
os.path.join(workload_url, 'workload_db')))
project_id = workload_values.get('project_id')
workload_id = workload_values.get('id')
if project_id in tenant_ids:
workload_ids.append(workload_id)
except Exception as ex:
LOG.exception(ex)
return workload_ids
def update_workload_db(context, workloads_to_update, new_tenant_id, user_id):
workload_urls = []
jobscheduler_map = {}
try:
# Get list of workload directory path for workloads need to update
for workload_id in workloads_to_update:
for backup_endpoint in CONF.vault_storage_nfs_export.split(','):
backup_target = None
backup_target = get_backup_target(backup_endpoint)
workload_url = os.path.join(
backup_target.mount_path, "workload_" + workload_id)
if os.path.isdir(workload_url):
workload_urls.append(workload_url)
break
# Iterate through each workload directory and update workload_db and
# snapsot_db with new values
for workload_path in workload_urls:
for path, subdirs, files in os.walk(workload_path):
for name in files:
if name.endswith("snapshot_db") or name.endswith(
"workload_db") or name.endswith("network_topology_db"):
LOG.debug("Updating %s" % os.path.join(path, name))
with open(os.path.join(path, name), 'r') as db_file:
db_values = json.loads(db_file.read())
if name.endswith("network_topology_db"):
for resource in db_values:
for metadata in resource['metadata']:
if metadata['key'] == 'json_data':
metadata_json = json.loads(metadata['value'])
metadata_json['tenant_id'] = new_tenant_id
metadata_json['project_id'] = new_tenant_id
metadata['value'] = json.dumps(metadata_json)
else:
if db_values.get('project_id', None) is not None:
db_values['project_id'] = new_tenant_id
else:
db_values['tenant_id'] = new_tenant_id
db_values['user_id'] = user_id
if db_values.get('jobschedule', None) is not None:
jobschedule = pickle.loads(bytes(
db_values['jobschedule'], 'utf-8'))
if jobschedule.get('appliance_timezone'):
jobschedule['timezone'] = jobschedule['appliance_timezone']
jobschedule = utils.convert_jobschedule_date_tz(jobschedule)
if jobschedule.get('timezone') and jobschedule['timezone'] != 'UTC':
jobschedule = utils.convert_jobschedule_date_tz(jobschedule)
if jobschedule['enabled'] is True:
jobschedule['enabled'] = False
db_values['jobschedule'] = str(pickle.dumps(jobschedule, 0), 'utf-8')
jobscheduler_map[db_values['id']
] = db_values['jobschedule']
try:
with open(os.path.join(path, name), 'w+') as fil:
json.dump(db_values, fil)
except Exception as ex:
time.sleep(2)
with open(os.path.join(path, name), 'w+') as fil:
json.dump(db_values, fil)
return jobscheduler_map
except Exception as ex:
LOG.debug("Error occurred while updating data for workloads: {}".format(workloads_to_update))
LOG.debug(ex)
def create_backup_directory(context, services, backup_directory_path):
try:
fileutils.ensure_tree(backup_directory_path)
for service, config_path in services.items():
service_name = service
fileutils.ensure_tree(
os.path.join(
backup_directory_path,
service_name))
except Exception as ex:
LOG.debug("Failed to create backup directory: {}".format(backup_directory_path))
LOG.debug(ex)
def get_directory_size(path):
try:
cmd = ['du', '-sc', path]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
out, err = p.communicate()
return int(out.split('\t')[0]) * 1024
except Exception as ex:
LOG.debug("Failed to fetch size of directory: {}".format(path))
LOG.debug(ex)
def get_key_file(key_data, temp=False):
try:
backup_target, path = get_settings_backup_target()
config_workload_path = backup_target.get_config_workload_path()
if temp is True:
file_path = os.path.join(
config_workload_path, "authorized_key_temp")
else:
file_path = os.path.join(config_workload_path, "authorized_key")
backup_target.put_object(file_path, key_data)
os.chmod(file_path, 0o600)
return file_path
except Exception as ex:
LOG.exception(ex)
raise ex
"""
if __name__ == '__main__':
nfsbackend =
nfsbackend.umount_backup_target()
nfsbackend.mount_backup_target()
workload_path = nfsbackend.get_workload_path({'workload_id': str(uuid.uuid4())})
print(workload_path)
import pdb;pdb.set_trace()
print(nfsbackend.get_total_capacity(None))
nfsbackend.umount_backup_target()
workload_path = nfsbackend.get_workload_path({'workload_id': str(uuid.uuid4())})
print(workload_path)
"""