Repository URL to install this package:
|
Version:
6.1.1.dev2 ▾
|
import copy
import os
import re
import sys
import ast
import json
import shutil
from uuid import UUID, uuid4
import subprocess
import importlib
from tempfile import mkstemp
import calendar
from calendar import monthrange
from datetime import datetime, timedelta, timezone
from dateutil.relativedelta import relativedelta
from itertools import islice
from oslo_config import cfg
from workloadmgr.openstack.common import log as logging
from workloadmgr import autolog
from workloadmgr import flags
from workloadmgr import settings
from workloadmgr.vault import vault
from workloadmgr.compute import nova
from workloadmgr.db.workloadmgrdb import WorkloadMgrDB
from workloadmgr.openstack.common import jsonutils
from workloadmgr.openstack.common import timeutils
from workloadmgr import utils
from workloadmgr import exception
from workloadmgr.common.workloadmgr_keystoneclient import KeystoneClientBase
import pickle as pickle
from workloadmgr.datamover import contego
from workloadmgr.common import clients
workloads_manager_opts = [
cfg.StrOpt('cloud_unique_id',
default='test-cloud-id',
help='cloud unique id.'),
]
FLAGS = flags.FLAGS
FLAGS.register_opts(workloads_manager_opts)
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
Logger = autolog.Logger(LOG)
db = WorkloadMgrDB().db
def process_trust_values(name_in_backend, backend_settings_by_name, db_settings_by_name):
setting_values = backend_settings_by_name.get(name_in_backend)
if setting_values:
for db_key, db_value in db_settings_by_name.items():
if db_value['user_id'] == setting_values['user_id'] and \
db_value['project_id'] == setting_values['project_id']:
backend_settings_by_name.pop(name_in_backend, None)
break
@autolog.log_method(logger=Logger)
def upload_settings_db_entry(cntx):
# use context as none since we want settings of all users/tenants
# TODO: implement settings persistance per user/tenant
(backup_target, path) = vault.get_settings_backup_target()
settings_db = db.setting_get_all(
None, read_deleted='no', backup_settings=True)
for setting in settings_db:
if 'password' in setting.name.lower() and 'smtp_server_password' not in setting.name:
setting.value = '******'
for kvpair in setting.metadata:
if 'Password' in kvpair['key'] or 'password' in kvpair['key']:
kvpair['value'] = '******'
settings_json = jsonutils.dumps(settings_db)
settings_path = os.path.join(str(CONF.cloud_unique_id), "settings_db")
db_settings = json.loads(settings_json)
try:
backend_settings = json.loads(backup_target.get_object(settings_path))
except Exception as ex:
backend_settings = []
# If on the backend we have more settings than DB means we haven't
# imported them yet, In that case appending DB settings with backend
# settings.
# persisting backend and DB values and if found duplicate overriding them
# with DB values based on user_id and project_id
backend_settings_types = [setting['type'] for setting in backend_settings]
backend_settings_by_name = {backend_setting['name']: backend_setting for backend_setting in backend_settings}
db_settings_types = [setting['type'] for setting in db_settings]
db_settings_by_name = {db_setting['name']: db_setting for db_setting in db_settings}
common_types = list(set(db_settings_types).intersection(set(backend_settings_types)))
new_backend_settings_by_name = copy.deepcopy(backend_settings_by_name)
if common_types:
for key, value in backend_settings_by_name.items():
if value['type'] in common_types:
if value['type'] == 'trust_id':
process_trust_values(key, new_backend_settings_by_name, db_settings_by_name)
else:
new_backend_settings_by_name.pop(key, None)
new_backend_settings_by_name.update(db_settings_by_name)
new_backend_settings = [value for key, value in new_backend_settings_by_name.items()]
new_settings = json.dumps(new_backend_settings)
backup_target.put_object(settings_path, new_settings)
@autolog.log_method(logger=Logger)
def import_backend_settings_to_db(cntx):
# use context as none since we want settings of all users/tenants
(backup_target, path) = vault.get_settings_backup_target()
settings_db = db.setting_get_all(
None, read_deleted='no', backup_settings=True)
settings_json = jsonutils.dumps(settings_db)
db_settings = json.loads(settings_json)
settings_path = os.path.join(str(CONF.cloud_unique_id), "settings_db")
try:
backend_settings = json.loads(backup_target.get_object(settings_path))
except Exception as ex:
backend_settings = []
backend_settings_by_name = {backend_setting['name']: backend_setting for backend_setting in backend_settings}
skip_email_settings_import = False
db_settings_by_name = {}
for db_setting in db_settings:
# if user already has email_settings in db then skip importing the email_settings from the backend settings
if db_setting['type'] == 'email_settings':
skip_email_settings_import = True
db_settings_by_name[db_setting['name']] = db_setting
# get the list of backend setting names which are not in DB
extra_backend_settings_name = set(backend_settings_by_name.keys()).difference(set(db_settings_by_name.keys()))
backend_setting_bulk_values_list = []
backend_setting_metadata_bulk_values_list = []
for extra_backend_setting_name in extra_backend_settings_name:
extra_backend_setting = backend_settings_by_name[extra_backend_setting_name]
# adding the params to the backend settings to be imported
if not extra_backend_setting.get('status'):
extra_backend_setting['status'] = 'available'
if not extra_backend_setting.get('project_id'):
extra_backend_setting['project_id'] = cntx.project_id
if 'is_hidden' in extra_backend_setting:
extra_backend_setting['hidden'] = int(extra_backend_setting['is_hidden'])
# if email setting exist in backend but missing in DB then import into DB
if extra_backend_setting.get('type') == 'email_settings' and not skip_email_settings_import:
backend_setting_bulk_values_list.append(extra_backend_setting)
# if trust_id exists in backend but missing in DB for respective user and project then import into DB
elif extra_backend_setting.get('type') == 'trust_id':
# if trust_id already exists for the respective user_id & project_id in DB then skip import
for _, setting_value in db_settings_by_name.items():
if setting_value['type'] == 'trust_id' and setting_value['user_id'] == extra_backend_setting['user_id'] \
and setting_value['project_id'] == extra_backend_setting['project_id']:
break
else:
backend_setting_bulk_values_list.append(extra_backend_setting)
if extra_backend_setting.get('metadata'):
backend_setting_metadata_bulk_values_list.extend(extra_backend_setting['metadata'])
if backend_setting_bulk_values_list:
try:
db.setting_bulk_create(cntx, backend_setting_bulk_values_list, backend_setting_metadata_bulk_values_list)
except Exception as ex:
# passing any exception as license addition should not be impacted
LOG.exception(ex)
@autolog.log_method(logger=Logger)
def upload_allowed_quota_db_entry(cntx, project_id):
(backup_target, path) = vault.get_allowed_quota_backup_target()
# use admin context since we want fetch allowed_quota of all projects
admin_context = nova._get_tenant_context(cntx)
admin_context.is_admin = True
allowed_quota_db = db.get_allowed_quotas(
admin_context, project_id=project_id
)
allowed_quota_json = jsonutils.dumps(allowed_quota_db)
allowed_quota_path = os.path.join(str(CONF.cloud_unique_id), "allowed_quota_db")
db_allowed_quotas = json.loads(allowed_quota_json)
try:
backend_allowed_quotas = json.loads(
backup_target.get_object(allowed_quota_path)
)
except Exception as ex:
backend_allowed_quotas = []
backend_allowed_quota_by_project = {}
for backend_allowed_quota in backend_allowed_quotas:
if backend_allowed_quota['project_id'] not in backend_allowed_quota_by_project:
backend_allowed_quota_by_project[backend_allowed_quota['project_id']] = []
backend_allowed_quota_by_project[backend_allowed_quota['project_id']].append(backend_allowed_quota)
db_allowed_quota_by_project = {}
if not db_allowed_quotas:
db_allowed_quota_by_project[project_id] = []
for db_allowed_quota in db_allowed_quotas:
if db_allowed_quota['project_id'] not in db_allowed_quota_by_project:
db_allowed_quota_by_project[db_allowed_quota['project_id']] = []
db_allowed_quota_by_project[db_allowed_quota['project_id']].append(db_allowed_quota)
new_backend_allowed_quota = []
backend_allowed_quota_by_project.update(db_allowed_quota_by_project)
for key, values in backend_allowed_quota_by_project.items():
new_backend_allowed_quota.extend([value for value in values])
new_allowed_quota = json.dumps(new_backend_allowed_quota)
backup_target.put_object(allowed_quota_path, new_allowed_quota)
@autolog.log_method(logger=Logger)
def upload_workload_db_entry(cntx, workload_id):
workload_db = db.workload_get(cntx, workload_id)
backup_endpoint = db.get_metadata_value(workload_db.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
parent = backup_target.get_workload_path({'workload_id': workload_id})
for kvpair in workload_db.metadata:
if 'Password' in kvpair['key'] or 'password' in kvpair['key']:
kvpair['value'] = '******'
workload_json = jsonutils.dumps(workload_db)
path = os.path.join(parent, "workload_db")
backup_target.put_object(path, workload_json)
workload_vms_db = db.workload_vms_get(cntx, workload_id)
workload_vms_json = jsonutils.dumps(workload_vms_db)
path = os.path.join(parent, "workload_vms_db")
backup_target.put_object(path, workload_vms_json)
def _async_put_object(backup_target, path, workload_json):
fd, tmpfile = mkstemp(text=True)
os.close(fd)
with open(tmpfile, "w") as f:
f.write(workload_json)
return [tmpfile, path]
@autolog.log_method(logger=Logger)
def upload_snapshot_db_entry(cntx, snapshot_id, snapshot_status=None):
put_object_threads = []
try:
snapshot = db.snapshot_get(cntx, snapshot_id, read_deleted='yes')
except exception.SnapshotNotFound:
return # nothing to update in DB as snapshot is hard deleted
if snapshot['data_deleted']:
return
workload_id = snapshot['workload_id']
workload_db = db.workload_get(cntx, workload_id)
backup_endpoint = db.get_metadata_value(workload_db.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
bucket_immutable = backup_target.is_immutability_enabled(cntx, workload_db)
parent = backup_target.get_workload_path({'workload_id': workload_id})
for kvpair in workload_db.metadata:
if 'Password' in kvpair['key'] or 'password' in kvpair['key']:
kvpair['value'] = '******'
workload_json = jsonutils.dumps(workload_db)
path = os.path.join(parent, "workload_db")
put_object_threads.append(_async_put_object(backup_target, path, workload_json))
workload_vms_db = db.workload_vms_get(cntx, snapshot['workload_id'])
workload_vms_json = jsonutils.dumps(workload_vms_db)
path = os.path.join(parent, "workload_vms_db")
put_object_threads.append(_async_put_object(backup_target, path, workload_vms_json))
snapshot_db = db.snapshot_get(cntx, snapshot['id'], read_deleted='yes')
if snapshot_status:
snapshot_db.status = snapshot_status
snapshot_json = jsonutils.dumps(snapshot_db)
parent = backup_target.get_snapshot_path({'workload_id': workload_id,
'snapshot_id': snapshot['id']})
put_object_threads.append(_async_put_object(backup_target, os.path.join(parent, 'workload_db'), workload_json))
put_object_threads.append(_async_put_object(backup_target, os.path.join(parent, 'workload_vms_db'), workload_vms_json))
path = os.path.join(parent, "snapshot_db")
# Add to the vault
put_object_threads.append(_async_put_object(backup_target, path, snapshot_json))
# Dump network topology related data
snap_network_resources = db.snapshot_network_resources_get(
cntx, snapshot_id)
net_topology_json = jsonutils.dumps(snap_network_resources)
path = os.path.join(parent, "network_topology_db")
put_object_threads.append(_async_put_object(backup_target, path, net_topology_json))
snapvms = db.snapshot_vms_get(cntx, snapshot['id'])
snapvms_json = jsonutils.dumps(snapvms)
path = os.path.join(parent, "snapshot_vms_db")
# Add to the vault
put_object_threads.append(_async_put_object(backup_target, path, snapvms_json))
resources = db.snapshot_resources_get(cntx, snapshot['id'])
resources_json = jsonutils.dumps(resources)
path = os.path.join(parent, "resources_db")
put_object_threads.append(_async_put_object(backup_target, path, resources_json))
for res in resources:
res_json = jsonutils.dumps(res, sort_keys=True, indent=2)
vm_res_id = 'vm_res_id_%s' % (res['id'])
for meta in res.metadata:
if meta.key == "label":
vm_res_id = 'vm_res_id_%s_%s' % (res['id'], meta.value)
break
if res.resource_type == "network" or \
res.resource_type == "subnet" or \
res.resource_type == "router" or \
res.resource_type == "nic":
path = os.path.join(parent, "network", vm_res_id, "network_db")
network = db.vm_network_resource_snaps_get(cntx, res.id)
network_json = jsonutils.dumps(network)
put_object_threads.append(_async_put_object(backup_target, path, network_json))
elif res.resource_type == "disk":
path = os.path.join(
parent,
"vm_id_" +
res.vm_id,
vm_res_id.replace(
' ',
''),
"disk_db")
disk = db.vm_disk_resource_snaps_get(cntx, res.id)
disk_json = jsonutils.dumps(disk)
put_object_threads.append(_async_put_object(backup_target, path, disk_json))
elif res.resource_type == "security_group":
path = os.path.join(
parent,
"security_group",
vm_res_id,
"security_group_db")
security_group = db.vm_security_group_rule_snaps_get(cntx, res.id)
security_group_json = jsonutils.dumps(security_group)
put_object_threads.append(_async_put_object(backup_target, path, security_group_json))
# Wait for all of the async put threads to complete.
fd, tmpfile = mkstemp(text=True)
os.close(fd)
with open(tmpfile, "w") as f:
json.dump(put_object_threads, f)
put_object_threads.append([tmpfile, tmpfile])
retain_until = None
if bucket_immutable:
try:
retain_until = calculate_datetime_in_days_for_retention(cntx, snapshot, workload_db, db)
except exception.SnapshotNotFound:
pass
snapshot_metadata = {}
snapshot_metadata['retainuntil'] = retain_until
db.snapshot_update(cntx, snapshot.id, {'metadata': snapshot_metadata})
try:
if subprocess.call([sys.executable, os.path.join(os.path.dirname(__file__), "pcopy.py"), tmpfile, str(bucket_immutable), str(retain_until)]):
raise Exception("Cannot write snapshot record to backup target")
finally:
for tmpfile, x in put_object_threads:
try:
os.remove(tmpfile)
except Exception as ex:
LOG.exception(ex)
@autolog.log_method(logger=Logger)
def upload_config_workload_db_entry(cntx):
try:
config_workload_db = db.config_workload_get(cntx)
backup_endpoint = config_workload_db['backup_media_target']
backup_target = vault.get_backup_target(backup_endpoint)
config_workload_storage_path = backup_target.get_config_workload_path()
config_workload_json = jsonutils.dumps(config_workload_db)
path = os.path.join(config_workload_storage_path, "config_workload_db")
backup_target.put_object(path, config_workload_json)
except Exception as ex:
LOG.exception(ex)
@autolog.log_method(logger=Logger)
def upload_policy_db_entry(cntx, policy_id):
try:
policy = db.policy_get(cntx, policy_id)
backup_target, path = vault.get_settings_backup_target()
policy_path = backup_target.get_policy_path()
policy_path = os.path.join(
policy_path, 'policy' + '_' + str(policy_id))
policy_json = jsonutils.dumps(policy)
backup_target.put_object(policy_path, policy_json)
except Exception as ex:
LOG.exception(ex)
@autolog.log_method(logger=Logger)
def upload_migration_db_entry(cntx, migration_id, migration_status=None):
pass
@autolog.log_method(logger=Logger)
def upload_migration_plan_db_entry(cntx, migration_plan_id):
migration_plan_db = db.migration_plan_get(cntx, migration_plan_id)
backup_endpoint = db.get_metadata_value(migration_plan_db.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
parent = backup_target.get_migration_plan_path(
{'migration_plan_id': migration_plan_id})
for kvpair in migration_plan_db.metadata:
if 'Password' in kvpair['key'] or 'password' in kvpair['key']:
kvpair['value'] = '******'
migration_plan_json = jsonutils.dumps(migration_plan_db)
path = os.path.join(parent, "migration_plan_db")
backup_target.put_object(path, migration_plan_json)
migration_plan_vms_db = db.migration_plan_vms_get(cntx, migration_plan_id)
migration_plan_vms_json = jsonutils.dumps(migration_plan_vms_db)
path = os.path.join(parent, "migration_plan_vms_db")
backup_target.put_object(path, migration_plan_vms_json)
def _remove_data(context, snapshot_with_data):
snapshot_with_data = db.snapshot_get(
context, snapshot_with_data.id, read_deleted='yes')
if snapshot_with_data.status != 'deleted':
return
if snapshot_with_data.data_deleted:
return
LOG.info(_('Deleting the data of snapshot %s of workload %s') %
(snapshot_with_data.id, snapshot_with_data.workload_id))
workload_obj = db.workload_get(context, snapshot_with_data.workload_id)
backup_endpoint = db.get_metadata_value(workload_obj.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
try:
backup_target.snapshot_delete(context,
{'workload_id': snapshot_with_data.workload_id,
'workload_name': workload_obj.display_name,
'snapshot_id': snapshot_with_data.id})
db.snapshot_delete(context, snapshot_with_data.id, hard_delete=True)
except Exception as ex:
LOG.exception(str(ex))
@autolog.log_method(logger=Logger)
def _snapshot_delete(context, snapshot_id, database_only=False):
snapshot = db.snapshot_get(context, snapshot_id, read_deleted='yes')
db.snapshot_delete(context, snapshot.id)
child_snapshots = db.get_snapshot_children(context, snapshot_id)
all_child_snapshots_deleted = True
for child_snapshot_id in child_snapshots:
try:
child_snapshot = db.snapshot_get(
context, child_snapshot_id, read_deleted='yes')
if child_snapshot.status == 'error' or child_snapshot.status == 'deleted':
continue
all_child_snapshots_deleted = False
break
except Exception as ex:
LOG.exception(ex)
if all_child_snapshots_deleted and database_only is False:
_remove_data(context, snapshot)
if database_only is False:
upload_snapshot_db_entry(context, snapshot_id)
@autolog.log_method(logger=Logger)
def snapshot_delete(context, snapshot_id, database_only=False):
"""
Delete an existing snapshot
"""
_snapshot_delete(context, snapshot_id, database_only)
child_snapshots = db.get_snapshot_children(context, snapshot_id)
for child_snapshot_id in child_snapshots:
try:
child_snapshot = db.snapshot_get(
context, child_snapshot_id, read_deleted='yes')
if child_snapshot.status == 'deleted' and child_snapshot.data_deleted == False:
# now see if the data can be deleted
_snapshot_delete(context, child_snapshot_id, database_only)
except Exception as ex:
LOG.exception(ex)
parent_snapshots = db.get_snapshot_parents(context, snapshot_id)
for parent_snapshot_id in parent_snapshots:
try:
parent_snapshot = db.snapshot_get(
context, parent_snapshot_id, read_deleted='yes')
if parent_snapshot.status == 'deleted' and parent_snapshot.data_deleted == False:
# now see if the data can be deleted
_snapshot_delete(context, parent_snapshot_id, database_only)
except Exception as ex:
LOG.exception(ex)
@autolog.log_method(logger=Logger)
def delete_if_chain(context, snapshot, snapshots_to_delete):
try:
snapshots_to_delete_ids = set()
for snapshot_to_delete in snapshots_to_delete:
snapshots_to_delete_ids.add(snapshot_to_delete.id)
snapshot_obj = db.snapshot_type_time_size_update(
context, snapshot['id'])
workload_obj = db.workload_get(context, snapshot_obj.workload_id)
snapshots_all = db.snapshot_get_all_by_project_workload(
context, context.project_id, workload_obj.id, read_deleted='yes')
snap_chains = []
snap_chain = set()
snap_chains.append(snap_chain)
for snap in reversed(snapshots_all):
if snap.snapshot_type == 'full':
snap_chain = set()
snap_chains.append(snap_chain)
snap_chain.add(snap.id)
for snap_chain in snap_chains:
if snap_chain.issubset(snapshots_to_delete_ids):
for snap in snap_chain:
db.snapshot_delete(context, snap)
for snap in snap_chain:
snapshot_with_data = db.snapshot_get(context, snap, read_deleted='yes')
_remove_data(context, snapshot_with_data)
except Exception as ex:
LOG.exception(ex)
def download_snapshot_vm_from_object_store(
context, restore_id, snapshot_id, snapshot_vm_id):
snapshot = db.snapshot_get(context, snapshot_id, read_deleted='yes')
workload = db.workload_get(context, snapshot.workload_id)
object_store_download_time = 0
object_store_download_time += vault.download_snapshot_vm_from_object_store(
context,
{
'restore_id': restore_id,
'workload_id': snapshot.workload_id,
'snapshot_id': snapshot.id,
'snapshot_vm_id': snapshot_vm_id})
parent_snapshots = db.get_snapshot_parents(context, snapshot_id)
for parent_snapshot_id in parent_snapshots:
parent_snapshot_vms = db.snapshot_vms_get(context, parent_snapshot_id)
for parent_snapshot_vm in parent_snapshot_vms:
if parent_snapshot_vm.vm_id == snapshot_vm_id:
object_store_download_time += vault.download_snapshot_vm_from_object_store(
context,
{
'restore_id': restore_id,
'workload_id': snapshot.workload_id,
'workload_name': workload.display_name,
'snapshot_id': parent_snapshot_id,
'snapshot_vm_id': snapshot_vm_id})
return object_store_download_time
def download_snapshot_vm_resource_from_object_store(
context, restore_id, snapshot_id, snapshot_vm_resource_id):
snapshot = db.snapshot_get(context, snapshot_id, read_deleted='yes')
workload = db.workload_get(context, snapshot.workload_id)
snapshot_vm_resource = db.snapshot_vm_resource_get(
context, snapshot_vm_resource_id)
snapshot_vm = db.snapshot_vm_get(
context, snapshot_vm_resource.vm_id, snapshot.id)
object_store_download_time = 0
while snapshot_vm_resource:
object_store_download_time += vault.download_snapshot_vm_resource_from_object_store(
context,
{
'restore_id': restore_id,
'workload_id': snapshot.workload_id,
'workload_name': workload.display_name,
'snapshot_id': snapshot_vm_resource.snapshot_id,
'snapshot_vm_id': snapshot_vm_resource.vm_id,
'snapshot_vm_name': snapshot_vm.vm_name,
'snapshot_vm_resource_id': snapshot_vm_resource.id,
'snapshot_vm_resource_name': snapshot_vm_resource.resource_name})
vm_disk_resource_snaps = db.vm_disk_resource_snaps_get(
context, snapshot_vm_resource.id)
for vm_disk_resource_snap in vm_disk_resource_snaps:
if vm_disk_resource_snap.vm_disk_resource_snap_backing_id:
vm_disk_resource_snap_backing = db.vm_disk_resource_snap_get(
context, vm_disk_resource_snap.vm_disk_resource_snap_backing_id)
if vm_disk_resource_snap_backing.snapshot_vm_resource_id != snapshot_vm_resource.id:
snapshot_vm_resource = db.snapshot_vm_resource_get(
context, vm_disk_resource_snap_backing.snapshot_vm_resource_id)
break
else:
snapshot_vm_resource = None
break
return object_store_download_time
def purge_snapshot_vm_from_staging_area(context, snapshot_id, snapshot_vm_id):
snapshot = db.snapshot_get(context, snapshot_id, read_deleted='yes')
workload = db.workload_get(context, snapshot.workload_id)
backup_endpoint = db.get_metadata_value(workload.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
backup_target.purge_snapshot_vm_from_staging_area(
context,
{
'workload_id': snapshot.workload_id,
'snapshot_id': snapshot_id,
'snapshot_vm_id': snapshot_vm_id})
parent_snapshots = db.get_snapshot_parents(context, snapshot_id)
for parent_snapshot_id in parent_snapshots:
parent_snapshot_vms = db.snapshot_vms_get(context, parent_snapshot_id)
for parent_snapshot_vm in parent_snapshot_vms:
if parent_snapshot_vm.vm_id == snapshot_vm_id:
backup_target.purge_snapshot_vm_from_staging_area(
context,
{
'workload_id': snapshot.workload_id,
'snapshot_id': parent_snapshot_id,
'snapshot_vm_id': snapshot_vm_id})
def purge_snapshot_vm_resource_from_staging_area(
context, snapshot_id, snapshot_vm_resource_id):
snapshot = db.snapshot_get(context, snapshot_id, read_deleted='yes')
workload = db.workload_get(context, snapshot.workload_id)
backup_endpoint = db.get_metadata_value(workload.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
snapshot_vm_resource = db.snapshot_vm_resource_get(
context, snapshot_vm_resource_id)
snapshot_vm = db.snapshot_vm_get(
context, snapshot_vm_resource.vm_id, snapshot.id)
while snapshot_vm_resource:
backup_target.purge_snapshot_vm_resource_from_staging_area(
context,
{
'workload_id': snapshot.workload_id,
'snapshot_id': snapshot_vm_resource.snapshot_id,
'snapshot_vm_id': snapshot_vm_resource.vm_id,
'snapshot_vm_name': snapshot_vm.vm_name,
'snapshot_vm_resource_id': snapshot_vm_resource.id,
'snapshot_vm_resource_name': snapshot_vm_resource.resource_name})
vm_disk_resource_snap = db.vm_disk_resource_snap_get_top(
context, snapshot_vm_resource.id)
if vm_disk_resource_snap.vm_disk_resource_snap_backing_id:
vm_disk_resource_snap = db.vm_disk_resource_snap_get(
context, vm_disk_resource_snap.vm_disk_resource_snap_backing_id)
snapshot_vm_resource = db.snapshot_vm_resource_get(
context, vm_disk_resource_snap.snapshot_vm_resource_id)
else:
snapshot_vm_resource = None
def purge_restore_vm_from_staging_area(
context, restore_id, snapshot_id, snapshot_vm_id):
snapshot = db.snapshot_get(context, snapshot_id, read_deleted='yes')
workload = db.workload_get(context, snapshot.workload_id)
backup_endpoint = db.get_metadata_value(workload.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
backup_target.purge_restore_vm_from_staging_area(
context,
{
'restore_id': restore_id,
'workload_id': snapshot.workload_id,
'snapshot_id': snapshot_id,
'snapshot_vm_id': snapshot_vm_id})
"""
parent_snapshots = db.get_snapshot_parents(context, snapshot_id)
for parent_snapshot_id in parent_snapshots:
parent_snapshot_vms = db.snapshot_vms_get(context, parent_snapshot_id)
for parent_snapshot_vm in parent_snapshot_vms:
if parent_snapshot_vm.vm_id == snapshot_vm_id:
vault.purge_restore_vm_from_staging_area(context, { 'restore_id': restore_id,
'workload_id': snapshot.workload_id,
'snapshot_id': parent_snapshot_id,
'snapshot_vm_id': snapshot_vm_id})
"""
def purge_restore_vm_resource_from_staging_area(
context, restore_id, snapshot_id, snapshot_vm_resource_id):
snapshot = db.snapshot_get(context, snapshot_id, read_deleted='yes')
workload = db.workload_get(context, snapshot.workload_id)
backup_endpoint = db.get_metadata_value(workload.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
snapshot_vm_resource = db.snapshot_vm_resource_get(
context, snapshot_vm_resource_id)
snapshot_vm = db.snapshot_vm_get(
context, snapshot_vm_resource.vm_id, snapshot.id)
while snapshot_vm_resource:
backup_target.purge_restore_vm_resource_from_staging_area(
context,
{
'restore_id': restore_id,
'workload_id': snapshot.workload_id,
'snapshot_id': snapshot_vm_resource.snapshot_id,
'snapshot_vm_id': snapshot_vm_resource.vm_id,
'snapshot_vm_name': snapshot_vm.vm_name,
'snapshot_vm_resource_id': snapshot_vm_resource.id,
'snapshot_vm_resource_name': snapshot_vm_resource.resource_name})
vm_disk_resource_snap = db.vm_disk_resource_snap_get_top(
context, snapshot_vm_resource.id)
if vm_disk_resource_snap.vm_disk_resource_snap_backing_id:
vm_disk_resource_snap = db.vm_disk_resource_snap_get(
context, vm_disk_resource_snap.vm_disk_resource_snap_backing_id)
snapshot_vm_resource = db.snapshot_vm_resource_get(
context, vm_disk_resource_snap.snapshot_vm_resource_id)
else:
snapshot_vm_resource = None
def common_apply_retention_policy(cntx, instances, snapshot):
def _delete_deleted_snap_chains(cntx, snapshot):
try:
snapshot_obj = db.snapshot_type_time_size_update(
cntx, snapshot['id'])
workload_obj = db.workload_get(cntx, snapshot_obj.workload_id)
backup_endpoint = db.get_metadata_value(workload_obj.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
snapshots_all = db.snapshot_get_all_by_project_workload(
cntx, cntx.project_id, workload_obj.id, read_deleted='yes')
snap_chains = []
snap_chain = []
snap_chains.append(snap_chain)
for snap in reversed(snapshots_all):
if snap.snapshot_type == 'full':
snap_chain = []
snap_chains.append(snap_chain)
snap_chain.append(snap)
deleted_snap_chains = []
for snap_chain in snap_chains:
deleted_chain = True
for snap in snap_chain:
if snap.status != 'deleted':
deleted_chain = False
break
if deleted_chain:
deleted_snap_chains.append(snap_chain)
for snap_chain in deleted_snap_chains:
for snap in snap_chain:
if snap.deleted and snap.data_deleted == False:
LOG.info(
_('Deleting the data of snapshot %s %s %s of workload %s') %
(snap.display_name,
snap.id,
snap.created_at.strftime("%d-%m-%Y %H:%M:%S"),
workload_obj.display_name))
try:
backup_target.snapshot_delete(cntx,
{'workload_id': snap.workload_id,
'workload_name': workload_obj.display_name,
'snapshot_id': snap.id})
db.snapshot_update(
cntx, snap.id, {
'data_deleted': True})
except Exception as ex:
LOG.exception(str(ex))
except Exception as ex:
LOG.exception(ex)
try:
db.snapshot_update(
cntx, snapshot['id'], {
'progress_msg': 'Applying retention policy', 'status': 'executing'})
_delete_deleted_snap_chains(cntx, snapshot)
affected_snapshots = []
snapshot_obj = db.snapshot_get(cntx, snapshot['id'])
workload_obj = db.workload_get(cntx, snapshot_obj.workload_id)
backup_endpoint = db.get_metadata_value(workload_obj.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
jobschedule = pickle.loads(bytes(workload_obj.jobschedule, 'utf-8'))
snapshots_all = db.snapshot_get_all_by_project_workload(
cntx, cntx.project_id, workload_obj.id, read_deleted='yes')
snapshots_valid = []
snapshots_valid.append(snapshot_obj)
for snap in snapshots_all:
if snapshots_valid[0].id == snap.id:
continue
if snap.status == 'available':
snapshots_valid.append(snap)
elif snap.status == 'deleted' and snap.data_deleted == False:
snapshots_valid.append(snap)
snapshot_to_commit = None
snapshots_to_delete = set()
snapshots_valid.sort(key=lambda x: x.created_at)
snapshots_valid.reverse()
snap_meta_map = {}
for snap_obj in snapshots_valid:
for meta in snap_obj.metadata:
if meta.key == 'retention_tags':
if meta.value in ['null', None, '[]']:
continue
snap_meta_map[snap_obj.id] = ast.literal_eval(meta.value)
def apply_retention_policy(retention_type):
ret_obj = jobschedule.get(retention_type)
if ret_obj:
retention_count = int(ret_obj.get("retention", 0))
else:
return
retention_cur_count = 0
for snap_obj in snapshots_valid:
if retention_type in snap_meta_map[snap_obj.id]:
retention_cur_count += 1
if retention_cur_count > retention_count :
snap_meta_map[snap_obj.id].remove(retention_type)
def generate_commit_delete_pairs(valid_snapshot_obj_list, snaps_to_delete):
delete_pairs = []
current_sequence = []
# Iterate through valid_snapshot_list
for snapshot in valid_snapshot_obj_list:
if snapshot.id in snaps_to_delete:
# If the snapshot is to be deleted, add it to the current sequence
current_sequence.append(snapshot)
else:
# If the snapshot is not to be deleted, check if there's a sequence to add
if current_sequence:
# Add the current sequence to delete_pairs
delete_pairs.append({"commit": snapshot, "snapshots_to_delete": current_sequence})
current_sequence = [] # Reset current sequence
# If there's still a sequence left, add it to delete_pairs
if current_sequence:
delete_pairs.append({"commit": None, "snapshots_to_delete": current_sequence})
return delete_pairs
cur_snap_tags = snap_meta_map[snapshot_obj.id]
for ret_type in cur_snap_tags:
apply_retention_policy(ret_type)
snap_delete_list = []
for snap_obj in snapshots_valid:
if len(snap_meta_map[snap_obj.id]) == 0:
snap_delete_list.append(snap_obj.id)
else:
snapshot_metadata = {"retention_tags": json.dumps(snap_meta_map[snap_obj.id])}
db.snapshot_update(cntx, snap_obj.id, {'metadata': snapshot_metadata})
reversed_snapshots_valid = copy.deepcopy(snapshots_valid)
reversed_snapshots_valid.reverse()
final_snap_change_list = generate_commit_delete_pairs(
reversed_snapshots_valid, snap_delete_list)
snapshot_to_commit_delete = final_snap_change_list
if backup_target.commit_supported() == False:
for snap in snapshot_to_commit_delete:
snapshots_to_delete = snap.get("snapshots_to_delete", [])
delete_if_chain(cntx, snapshot, snapshots_to_delete)
return (snapshot_to_commit, snapshots_to_delete,
affected_snapshots, workload_obj, snapshot_obj, 0)
return (snapshot_to_commit_delete,
affected_snapshots, workload_obj, snapshot_obj, 1)
except Exception as ex:
LOG.exception(ex)
raise ex
def common_apply_retention_disk_check(
cntx, snapshot_to_commit, snap, workload_obj):
def _snapshot_disks_deleted(snap):
try:
all_disks_deleted = True
some_disks_deleted = False
snapshot_vm_resources = db.snapshot_resources_get(cntx, snap.id)
for snapshot_vm_resource in snapshot_vm_resources:
if snapshot_vm_resource.resource_type != 'disk':
continue
if snapshot_vm_resource.snapshot_type == 'full' and \
snapshot_vm_resource.status != 'deleted' and all_disks_deleted:
db.snapshot_vm_resource_delete(
cntx, snapshot_vm_resource.id)
continue
if snapshot_vm_resource.status != 'deleted':
all_disks_deleted = False
else:
some_disks_deleted = True
return all_disks_deleted, some_disks_deleted
except exception.SnapshotVMResourcesNotFound as ex:
LOG.exception(ex)
return False, True
db.snapshot_type_time_size_update(cntx, snapshot_to_commit.id)
backup_endpoint = db.get_metadata_value(workload_obj.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
all_disks_deleted, some_disks_deleted = _snapshot_disks_deleted(snap)
if some_disks_deleted:
db.snapshot_delete(cntx, snap.id)
if all_disks_deleted:
try:
LOG.info(
_('Deleting the data of snapshot %s %s %s of workload %s') %
(snap.display_name,
snap.id,
snap.created_at.strftime("%d-%m-%Y %H:%M:%S"),
workload_obj.display_name))
backup_target.snapshot_delete(cntx,
{'workload_id': snap.workload_id,
'workload_name': workload_obj.display_name,
'snapshot_id': snap.id})
db.snapshot_delete(cntx, snap.id, hard_delete=True)
except Exception as ex:
LOG.exception(str(ex))
def common_apply_retention_snap_delete(cntx, snap, workload_obj):
backup_endpoint = db.get_metadata_value(workload_obj.metadata,
'backup_media_target')
backup_target = vault.get_backup_target(backup_endpoint)
if not snap.data_deleted:
try:
LOG.info(
_('Deleting the data of snapshot %s %s %s of workload %s') %
(snap.display_name,
snap.id,
snap.created_at.strftime("%d-%m-%Y %H:%M:%S"),
workload_obj.display_name))
backup_target.snapshot_delete(cntx,
{'workload_id': snap.workload_id,
'workload_name': workload_obj.display_name,
'snapshot_id': snap.id})
db.snapshot_delete(cntx, snap.id)
except Exception as ex:
LOG.exception(str(ex))
def common_apply_retention_db_backing_update(cntx, snapshot_vm_resource,
vm_disk_resource_snap,
vm_disk_resource_snap_backing,
affected_snapshots):
vm_disk_resource_snap_values = {
'size': vm_disk_resource_snap_backing.size,
'vm_disk_resource_snap_backing_id': vm_disk_resource_snap_backing.vm_disk_resource_snap_backing_id}
db.vm_disk_resource_snap_update(
cntx,
vm_disk_resource_snap.id,
vm_disk_resource_snap_values)
snapshot_vm_resource_backing = db.snapshot_vm_resource_get(
cntx, vm_disk_resource_snap_backing.snapshot_vm_resource_id)
snapshot_vm_resource_values = {
'size': snapshot_vm_resource_backing.size,
'snapshot_type': snapshot_vm_resource_backing.snapshot_type,
'time_taken': snapshot_vm_resource_backing.time_taken}
db.snapshot_vm_resource_update(
cntx,
snapshot_vm_resource.id,
snapshot_vm_resource_values)
db.vm_disk_resource_snap_delete(cntx, vm_disk_resource_snap_backing.id)
db.snapshot_vm_resource_delete(
cntx, vm_disk_resource_snap_backing.snapshot_vm_resource_id)
snapshot_vm_resource_backing = db.snapshot_vm_resource_get(
cntx, vm_disk_resource_snap_backing.snapshot_vm_resource_id)
if snapshot_vm_resource_backing.snapshot_id not in affected_snapshots:
affected_snapshots.append(snapshot_vm_resource_backing.snapshot_id)
return affected_snapshots
@autolog.log_method(logger=Logger)
def policy_delete(context, policy_id):
"""
Delete an existing policy.
"""
try:
backup_target, path = vault.get_settings_backup_target()
backup_target.policy_delete(context, policy_id)
except Exception as ex:
LOG.exception(ex)
@autolog.log_method(logger=Logger)
def get_compute_host(context):
try:
# Look for contego node, which is up
compute_nodes = get_compute_nodes(context, up_only=True)
if len(compute_nodes) > 0:
return compute_nodes[0].host
else:
message = "No compute node is up for validate database credentials."
raise exception.ErrorOccurred(reason=message)
except Exception as ex:
raise ex
@autolog.log_method(logger=Logger)
def validate_database_creds(context, databases, trust_creds):
try:
host = get_compute_host(context)
contego_service = contego.API(production=True)
params = {'databases': databases,
'host': host, 'trust_creds': trust_creds}
status = contego_service.validate_database_creds(context, params)
if status['result'] != "success":
message = "Please verify given database credentials."
raise exception.ErrorOccurred(reason=message)
else:
return True
except exception as ex:
raise ex
@autolog.log_method(logger=Logger)
def validate_trusted_user_and_key(context, trust_creds):
try:
host = get_compute_host(context)
contego_service = contego.API(production=True)
params = {'host': host, 'trust_creds': trust_creds}
status = contego_service.validate_trusted_user_and_key(context, params)
if status['result'] != "success":
message = "Please verify, given trusted user should have passwordless sudo access using given private key."
raise exception.ErrorOccurred(reason=message)
else:
return True
except Exception as ex:
raise ex
@autolog.log_method(logger=Logger)
def get_controller_nodes(context):
try:
contego_service = contego.API(production=True)
result = contego_service.get_controller_nodes(context)
return result['controller_nodes']
except exception as ex:
raise ex
@autolog.log_method(logger=Logger)
def get_compute_nodes(context, host=None, up_only=False):
try:
contego_nodes = []
resp = {}
admin_cntx = nova._get_tenant_context(context, cloud_admin=True)
clients.initialise()
contego_client = clients.Clients(admin_cntx).client("contego")
service_info = contego_client.contego.get_service_list()
if service_info and isinstance(service_info, tuple) and service_info[1].get('services'):
resp['services'] = service_info[1]['services']
else:
resp['services'] = []
for service in resp['services']:
if up_only is True:
if service['binary'] == 'tvault-contego' and service['state'] == 'up':
contego_nodes.append(service)
else:
if service['binary'] == 'tvault-contego':
contego_nodes.append(service)
return contego_nodes
except Exception as ex:
raise ex
@autolog.log_method(logger=Logger)
def get_restore_options(
name, desc, snapshot_obj, restore_type='selective',
restore_topology=False):
try:
restore_options = {
'name': name,
'description': desc,
'oneclickrestore': False,
'restore_type': restore_type,
'type': 'openstack',
'openstack': {
'restore_topology': restore_topology,
'instances': [],
'restore_topology': restore_topology,
# TODO: Add support to populate 'networks_mapping' if needed.
'networks_mapping': {
'networks': [],
},
},
}
for instance in snapshot_obj['instances']:
# Find the flavor for this vm
name = instance['name']
flavor = {
"disk": instance['flavor']['disk'],
"vcpus": instance['flavor']['vcpus'],
"ram": instance['flavor']['ram'],
"ephemeral": instance['flavor']['ephemeral'],
"swap": ""
}
# TODO: check what should be availability_zone
az = instance['metadata'].get('availability_zone', 'nova')
vm_options = {
'id': instance['id'],
'include': True,
'name': name,
'flavor': flavor,
'availability_zone': az,
'nics': [nic for nic in instance.get('nics', [])],
"vdisks": [],
}
# get new volume types
for vdisk in instance['vdisks']:
if not vdisk.get('volume_id', None):
continue
# TODO: check the default value
new_type = {
'id': vdisk['volume_id'],
'new_volume_type': vdisk.get('volume_type', None),
'availability_zone': vdisk.get('availability_zone', ''),
}
vm_options['vdisks'].append(new_type)
restore_options['openstack']['instances'].append(vm_options)
return restore_options
except Exception as ex:
raise ex
new_sch = {
'enabled': "",
'start_date': "",
'end_date': "",
'start_time': "",
'manual': {
'retention': 0 ,
'snapshot_type': 'incremental'
},
'hourly': {
'interval': 0,
'retention': 0,
'snapshot_type': 'incremental'
},
'daily': {
'backup_time': "[]",
'retention': 0,
'snapshot_type': 'incremental'
},
'weekly': {
'backup_day': "[]",
'retention': 0,
'snapshot_type': 'incremental'
},
'monthly': {
'month_backup_day': "[]",
'retention': 0,
'snapshot_type': 'full'
},
'yearly': {
'backup_month': "[]",
'retention': 0,
'snapshot_type': 'full'
},
'timezone': 'UTC'
}
hourly_interval = [1, 2, 3, 4, 6, 8, 12, 24]
@autolog.log_method(logger=Logger)
def convert_old_to_new_schedule(workload_values, workload_backup_media_size):
try:
old_jobschedule = pickle.loads(bytes(workload_values['jobschedule'], 'utf-8'))
interval = old_jobschedule.get("interval")
if not interval:
workload_values = update_other_workload_metadata(
workload_values, workload_backup_media_size)
return workload_values
interval = int(re.search(r'\d+', interval).group())
fullbackup_interval = old_jobschedule.get("fullbackup_interval")
retention_policy_type = old_jobschedule.get("retention_policy_type")
retention_policy_value = old_jobschedule.get("retention_policy_value")
old_fields = ['enabled', 'start_date', 'end_date', 'start_time', 'timezone', 'appliance_timezone']
old_interval = interval
new_interval = 0
new_sch = {}
for old_field in old_fields:
new_sch[old_field] = old_jobschedule.get(old_field)
if interval <= 24:
if interval in hourly_interval:
new_interval = interval
else:
while interval <= 24:
if interval in hourly_interval:
new_interval = interval
break
interval += 1
new_retention = retention_policy_value
manual_retention_days = retention_policy_value
if retention_policy_type == "Number of days to retain Snapshots":
ret_hours = int(retention_policy_value) * 24
new_retention = int(ret_hours/old_interval)
else:
# set a default value for immutable backups retaintion days
manual_retention_days = 30
if fullbackup_interval == '-1':
hourly = {
"interval": new_interval,
"snapshot_type": "incremental",
"retention" : new_retention
}
new_sch["hourly"] = hourly
else:
generate_snap_list = []
start_date = old_jobschedule.get("start_date")
start_time = old_jobschedule.get("start_time")
start_date_string = f"{start_date} {start_time}"
date_time_object = datetime.strptime(start_date_string, "%m/%d/%Y %I:%M %p")
for i in range(new_retention):
generate_snap_list.append(date_time_object)
date_time_object = date_time_object + timedelta(hours=old_interval)
delta = generate_snap_list[-1]- generate_snap_list[0]
number_of_day_span = delta.days
hourly = {
"interval": new_interval,
"snapshot_type": "incremental",
"retention" : new_retention
}
new_sch["hourly"] = hourly
idx = 0
backup_time = []
while idx < new_retention and \
(generate_snap_list[idx] - generate_snap_list[0]).days < 1:
backup_time.append(generate_snap_list[idx].strftime("%H:%M"))
idx = idx + int(fullbackup_interval) + 1
retention_per_day = (number_of_day_span + 1) * len(backup_time)
daily = {
"backup_time": backup_time,
"snapshot_type": "full",
"retention" : retention_per_day
}
new_sch["daily"] = daily
# Adding manual retention count
new_sch["manual"] = {
"retention": new_retention,
"retention_days_to_keep": manual_retention_days
}
workload_values['jobschedule'] = str(pickle.dumps(new_sch, 0), 'utf-8')
workload_values = update_other_workload_metadata(
workload_values, workload_backup_media_size)
return workload_values
except Exception as ex:
raise ex
def update_other_workload_metadata(workload_values, workload_backup_media_size):
'''
Update workload values with "backup_media_target"
and "workload_approx_backup_size".
'''
try:
workload_metadata = workload_values['metadata']
if 'backup_media_target' not in workload_metadata:
jobschedule = pickle.loads(bytes(workload_values['jobschedule'], 'utf-8'))
fulls = 0
incrs = 0
scheduler_type = ["hourly", "daily", "weekly", "monthly", "yearly", "manual" ]
for key in scheduler_type:
if jobschedule.get(key, None):
if jobschedule[key].get("snapshot_type", "") == "full":
fulls = fulls + int(jobschedule[key].get("retention", 0))
elif jobschedule[key].get("snapshot_type", "") == "incremental":
incrs = incrs + int(jobschedule[key].get("retention", 0))
if workload_backup_media_size.get(
workload_values['id'], None) is None:
workload_backup_media_size[workload_values['id']
] = 1024 * 1024 * 1024
workload_approx_backup_size = \
(fulls * workload_backup_media_size[workload_values['id']] * vault.CONF.workload_full_backup_factor +
incrs * workload_backup_media_size[
workload_values['id']] * vault.CONF.workload_incr_backup_factor) / 100
workload_values['metadata'][0]['backup_media_target'] = workload_backup_media_size[workload_values['id']]
workload_values['metadata'][0]['workload_approx_backup_size'] = workload_approx_backup_size
except Exception as ex:
LOG.exception(ex)
return workload_values
def convert_old_policy_to_new_policy_field(policy_field_values):
try:
old_jobschedule = policy_field_values
for old_policy_field in old_jobschedule:
if old_policy_field.get('policy_field_name') == 'interval':
interval = old_policy_field.get('value')
interval = int(re.search(r'\d+', interval).group())
elif old_policy_field.get('policy_field_name') == 'fullbackup_interval':
fullbackup_interval = old_policy_field.get('value')
elif old_policy_field.get('policy_field_name') == "retention_policy_type":
retention_policy_type = old_policy_field.get('value')
elif old_policy_field.get('policy_field_name') == "retention_policy_value":
retention_policy_value = old_policy_field.get('value')
old_interval = interval
new_interval = 0
if interval <= 24:
if interval in hourly_interval:
new_interval = interval
else:
while interval <= 24:
if interval in hourly_interval:
new_interval = interval
break
interval += 1
new_retention = retention_policy_value
new_sch = {}
manual_retention_days = retention_policy_value
if retention_policy_type == "Number of days to retain Snapshots":
ret_hours = int(retention_policy_value) * 24
new_retention = int(ret_hours/old_interval)
else:
# set a default value for immutable backups retaintion days
manual_retention_days = 30
if fullbackup_interval == '-1':
hourly = {
"interval": new_interval,
"snapshot_type": "incremental",
"retention" : new_retention
}
new_sch["hourly"] = str(pickle.dumps(hourly, 0), 'utf-8')
else:
generate_snap_list = []
start_date = datetime.utcnow().strftime('%m/%d/%Y')
start_time = '09:00 AM'
start_date_string = f"{start_date} {start_time}"
date_time_object = datetime.strptime(start_date_string, "%m/%d/%Y %I:%M %p")
for i in range(int(new_retention)):
generate_snap_list.append(date_time_object)
date_time_object = date_time_object + timedelta(hours=old_interval)
delta = generate_snap_list[-1]- generate_snap_list[0]
number_of_day_span = delta.days
hourly = {
"interval": new_interval,
"snapshot_type": "incremental",
"retention" : new_retention
}
new_sch["hourly"] = str(pickle.dumps(hourly, 0), 'utf-8')
idx = 0
backup_time = []
while idx < int(new_retention) and \
(generate_snap_list[idx] - generate_snap_list[0]).days < 1:
backup_time.append(generate_snap_list[idx].strftime("%H:%M"))
idx = idx + int(fullbackup_interval) + 1
retention_per_day = (number_of_day_span + 1) * len(backup_time)
daily = {
"backup_time": backup_time,
"snapshot_type": "full",
"retention" : retention_per_day
}
new_sch["daily"] = str(pickle.dumps(daily, 0), 'utf-8')
new_sch["start_time"] = str(pickle.dumps(start_time, 0), 'utf-8')
# adding manual retention_count and retention_days_to_keep
new_sch['retentionmanual'] = str(pickle.dumps({'retentionmanual': manual_retention_days}, 0), 'utf-8')
new_sch['manual'] = str(pickle.dumps(
{'retention': 30,
'retention_days_to_keep': manual_retention_days}, 0), 'utf-8')
return new_sch
except Exception as ex:
raise ex
def convert_to_new_snapshot_meta(snapshot_db_json_obj, backup_target_type):
'''
Update necessary snapshot metadata values
metadata: retention_tags
if 'retention_tags' key is not there, then
add a new metadata key with a value either ["hourly"]
or ["manual"] based on the display name of the snapshot.
metadata: backup_target_types
'backup_target_types' key has to be either added or updated
to the value same as its source btt where the backend file
must exists.
'''
def update_backup_target_types(snapshot_metadata, backup_target_type):
current_datetime_gmt = datetime.now(timezone.utc). \
strftime("%Y-%m-%dT%H:%M:%S.%f")
for meta in snapshot_metadata:
if meta['key'] == 'backup_target_types':
meta['updated_at'] = current_datetime_gmt
meta['value'] = backup_target_type
return snapshot_metadata
btt_meta = snapshot_metadata[0].copy()
btt_meta['created_at'] = current_datetime_gmt
btt_meta['updated_at'] = None
btt_meta['key'] = 'backup_target_types'
btt_meta['id'] = str(uuid4())
btt_meta['value'] = backup_target_type
snapshot_metadata.append(btt_meta)
return snapshot_metadata
try:
# Update retention tags
snapshot_metadata = snapshot_db_json_obj.get('metadata')
snapshot_name = snapshot_db_json_obj.get('display_name')
if snapshot_metadata:
for meta in snapshot_metadata:
if meta['key'] == 'retention_tags':
# Update Backup target types
snapshot_metadata = update_backup_target_types(
snapshot_metadata, backup_target_type)
snapshot_db_json_obj['metadata'] = \
snapshot_metadata
return snapshot_db_json_obj
retention_tags_meta = snapshot_metadata[0].copy()
current_datetime_gmt = datetime.now(timezone.utc). \
strftime("%Y-%m-%dT%H:%M:%S.%f")
retention_tags_meta['created_at'] = current_datetime_gmt
retention_tags_meta['updated_at'] = None
retention_tags_meta['key'] = 'retention_tags'
retention_tags_meta['id'] = str(uuid4())
if snapshot_name == 'jobscheduler':
retention_tags_meta['value'] = '["hourly"]'
else:
retention_tags_meta['value'] = '["manual"]'
snapshot_metadata.append(retention_tags_meta)
# Update Backup target types
snapshot_metadata = update_backup_target_types(
snapshot_metadata, backup_target_type)
snapshot_db_json_obj['metadata'] = snapshot_metadata
except Exception as ex:
LOG.exception(ex)
return snapshot_db_json_obj
def chunks(lst, chk_size):
lst_it = iter(lst)
return list(iter(lambda: tuple(islice(lst_it, chk_size)), ()))
def is_valid_uuid(uuid_val):
try:
uuid_obj = UUID(uuid_val)
except ValueError:
return False
return str(uuid_obj) == uuid_val
def import_resource(context, workload_id, snapshot_id, resource, db_version):
source_btt_name = [each.get('value') for each in db.snapshot_get(context, snapshot_id).metadata if each.get('key') == 'backup_target_types']
source_btt = db.get_backup_target_by_btt(context, source_btt_name[0])
source_btt_obj = {}
source_btt_obj['backup_target_types'] = db.get_backup_target_by_btt(context, source_btt_name[0]).__dict__
res_dir = os.path.join(
source_btt.filesystem_export_mount_path,
'workload_'+workload_id,
'snapshot_'+snapshot_id)
if resource == 'network_db':
res_dir = os.path.join(res_dir,'network')
if resource == 'resources_db':
res_dir = os.path.join(res_dir, 'resources_db')
if not res_dir:
raise FileNotFoundError('File: {0} Not Found'.format(res_dir))
snap_res_param = {"workload_id": workload_id,
"snapshot": snapshot_id,
"resource_type": resource,
"resource_path": res_dir}
module_name = 'workloadmgr.db.imports.import_workloads'
import_workload_module = importlib.import_module(module_name)
status = import_workload_module.import_workload(
context,
None,
workload_id,
db_version,
True,
source_btt_obj,
**snap_res_param)
def determine_retention_profile(context, snapshot):
"""It determines Deletion of snapshot chain once retention is honoured.
Args:
snapshots: snapshot list of ids
Returns:
None
Raises:
Exception
"""
try:
snapshot_obj = db.snapshot_get(context, snapshot['id'])
workload_obj = db.workload_get(context, snapshot_obj.workload_id)
backup_endpoint = db.get_metadata_value(workload_obj.metadata,
'backup_media_target')
backup_target = db.get_backup_target_by_backend_endpoint(
context, backup_endpoint)
curr_snap_retain_until = calculate_datetime_in_days_for_retention(
context, snapshot_obj, workload_obj, db)
snapshots_all = db.snapshot_get_all_by_project_workload(context,
context.project_id, workload_obj.id, read_deleted='yes')
snapshots_valid = []
snapshots_valid = [snap for snap in snapshots_all \
if snap.status == 'available']
snapshots_valid.append(snapshot_obj)
snapshots_valid.sort(key=lambda x: x.created_at, reverse=True)
jobschedule = pickle.loads(bytes(workload_obj.jobschedule, 'utf-8'))
snap_meta_map = {}
for snap_obj in snapshots_valid:
for meta in snap_obj.metadata:
if meta.key == 'retention_tags':
if meta.value in ['null', None, '[]']:
continue
snap_meta_map[snap_obj.id] = ast.literal_eval(meta.value)
ret_tags = snap_meta_map[snapshot_obj.id]
all_snap_tags = None
def get_retention_tags(snap_obj):
for meta in snap_obj.metadata:
if meta.key == 'retention_tags':
if meta.value in ['null', None, '[]']:
return []
else:
return ast.literal_eval(meta.value)
def find_snapshot_based_on_retention_tag(snapshots_valid, tag):
tags_snap_list = []
for snap_obj in snapshots_valid:
if tag in get_retention_tags(snap_obj):
tags_snap_list.append(snap_obj)
return tags_snap_list
def create_retention_group(all_snap_tags, tag, ret_value):
if ret_value >= len(all_snap_tags):
tag_snaps = all_snap_tags
untag_snaps = []
else:
tag_snaps = all_snap_tags[:ret_value]
untag_snaps = all_snap_tags[ret_value:]
return tag_snaps, untag_snaps
# Remove the retention tags from the snapshots
# that are no longer nedded to be retained
# as per the retention count value
# but those may be required to stay in the backend
# to maintain the chain of the snapshots
for tag in ret_tags:
all_snap_tags = find_snapshot_based_on_retention_tag(
snapshots_valid, tag
)
ret_value = int(jobschedule.get(tag)['retention'])
tag_snaps, untag_snaps = create_retention_group(
all_snap_tags, tag, ret_value
)
for snap_obj in untag_snaps:
val = ast.literal_eval(
db.get_metadata_value(
snap_obj.metadata,
'retention_tags'
)
).remove(tag)
snapshot_metadata = {"retention_tags": json.dumps(val)}
db.snapshot_update(
context, snap_obj.id, {'metadata': snapshot_metadata}
)
# Delete the snapshots which are no longer needed
# Condition to check:
# - it should not have any retention tags
# - it should not be a part of active snapshot chain
# i.e if it is the top of the chain
#
# Do not update the retain-until date of the previous
# snapshots if the current snapshot is of type Full
update_backing_chain_retain_until = True
if snapshot_obj.snapshot_type.lower() == 'full':
update_backing_chain_retain_until = False
# Get a the list snapshots of current backing chain
modify_retain_until_snaps = []
if update_backing_chain_retain_until:
i = 1
while i < len(snapshots_valid):
modify_retain_until_snaps.append(snapshots_valid[i])
if snapshots_valid[i].snapshot_type.lower() == 'full':
break
i += 1
# traversing all the available snapshots
# of the given workload which are not part of
idx = 1
while idx < len(snapshots_valid):
prev_snap_type = None
# Condition#1 : should not have any retention tags
try:
ret_tags = get_retention_tags(
db.snapshot_get(context, snapshots_valid[idx].id)
)
if ret_tags:
# this snapshot can not be deleted
# as it has a valid retention tag
idx += 1
continue
except Exception as ex:
LOG.exception(ex)
# Condition#2 : should be top of the chain
try:
prev_snap_obj = None
prev_snap_type = None
prev_snap_obj = db.snapshot_get(
context, snapshots_valid[idx-1].id
)
prev_snap_type = prev_snap_obj.snapshot_type
except exception.SnapshotNotFound:
# prev snapshot already deleted
prev_snap_type = None
except Exception as ex:
raise
# delete if it is not a backing file of any
# other snapshot and has not retention tags
if not prev_snap_type or prev_snap_type.lower() == 'full':
db.snapshot_delete(
context, snapshots_valid[idx].id, hard_delete=True)
# remove from modify snapshot list if present
try:
index = modify_retain_until_snaps.index(
snapshots_valid[idx])
modify_retain_until_snaps.pop(index)
except ValueError:
LOG.debug("Given snapshot not in the modify list")
idx += 1
if not update_backing_chain_retain_until:
return
for each in modify_retain_until_snaps:
metadata_snap_path = os.path.join(
backup_target.filesystem_export_mount_path,
'workload_'+workload_obj.id,
'snapshot_'+each.id
)
snap_retain_until = db.get_metadata_value(
each.metadata, 'retainuntil')
if datetime.strptime(snap_retain_until, "%Y-%m-%dT%H:%M:%S") < \
datetime.strptime(curr_snap_retain_until,
"%Y-%m-%dT%H:%M:%S"):
update_retain_until_timestamp(metadata_snap_path,
curr_snap_retain_until)
# update retain_unitl metadata
snapshot_metadata = {}
snapshot_metadata['retainuntil'] = curr_snap_retain_until
db.snapshot_update(context, each.id,
{'metadata': snapshot_metadata})
except Exception as ex:
LOG.exception(ex)
raise
def update_retain_until_timestamp(snap_path, timestamp):
""" This generates recursive paths"""
for root,dirs,files in os.walk(snap_path, topdown=True):
for file in files:
try:
utils.setfattr_retain_until(os.path.join(root,file), timestamp)
except TypeError as te:
pass
except Exception as ex:
raise ex
def check_snapshot_meeting_retention_criteria(context, snapshots, workload, snapshot_obj):
"""It checks weather current snapshot meeting Retention criteria.
1- get a list of current snapshots, sorted as per created date from most recent to oldest
2- find the retention value ret_value
- if manual retention, then take manual retention value
- else take hourly (as all other scheduling types are derivative of hourly)
3- find the most recent (ret_value - 1) number of snapshots
4- if no full type snapshots found, return True, or else return False
Args:
snapshots: snapshot list of ids
snapshot_obj: current snapshot object
workload: workload object
Returns:
Bool, If criteria meets
Raises: pass
"""
snapshots.sort(key=lambda x: x.created_at, reverse=True)
retention_val = ast.literal_eval(db.get_metadata_value(
db.snapshot_get(context, snapshot_obj.id).metadata, 'retention_tags'))
workload_obj = pickle.loads(bytes(workload.jobschedule, 'utf-8'))
ret_val = len(snapshots) + 1
if 'manual' in retention_val:
ret_val = int(workload_obj['manual'].get('retention'))
else:
ret_val = int(workload_obj['hourly'].get('retention'))
if ret_val > len(snapshots):
return False
for idx in range(ret_val):
if snapshots[idx].snapshot_type.lower() == 'full':
return False
return True
def calculate_datetime_in_days_for_retention(cntx, snapshot, workload_db, db, delta=timedelta(days=1)):
""" Retain Untill calculation for snapshots based on retention type and retention value."""
workload_obj_ret = pickle.loads(bytes(workload_db.jobschedule, 'utf-8'))
ret_days = 0
for each in ast.literal_eval(db.get_metadata_value(
db.snapshot_get(cntx, snapshot.id).metadata, 'retention_tags')):
if each == 'manual':
retention = workload_obj_ret[each]['retention']
interval = workload_obj_ret['retentionmanual'].get(
'retentionmanual') if workload_obj_ret.get(
'retentionmanual') else 0
if not interval:
interval = 0
ret_days = int(interval)*24
if each == 'hourly':
retention = int(workload_obj_ret[each]['retention'])
interval = int(workload_obj_ret[each]['interval'])
ret_days = (interval*retention+1)
if each == 'daily':
retention = int(workload_obj_ret[each]['retention'])
interval = int(workload_obj_ret[each]['interval'])
ret_days = (interval*retention+1)*24
if each =='weekly':
retention = int(workload_obj_ret[each]['retention'])
interval = int(workload_obj_ret[each]['interval'])
ret_days = (interval*retention+1)*7*24
if each == 'monthly':
retention = int(workload_obj_ret[each]['retention'])
interval = int(workload_obj_ret[each]['interval'])
ret_days = (interval*retention+1)*30*24
if each == 'yearly':
retention = int(workload_obj_ret[each]['retention'])
interval = int(workload_obj_ret[each]['interval'])
ret_days = (interval*retention+1)*365*24
datetime_after_num_days = (snapshot.created_at + relativedelta(hours=ret_days))
datetime_after_num_days = (datetime_after_num_days + delta).strftime('%Y-%m-%d %H:%M:%S')
return datetime.strptime(datetime_after_num_days, '%Y-%m-%d %H:%M:%S').isoformat()
def validate_snapshot_type(context, snapshots):
"""It checks weather current snapshot should be triggered as a full snapshot
1- get a list of current snapshots, sorted as per created date from most recent to oldest
3- find the most recent full snapshot
4- if no full type snapshots found or any deleted snapshot found, return True, or else return False
Args:
snapshots: snapshot list of ids
Returns:
Bool, If criteria meets
"""
snapshots.sort(key=lambda x: x.created_at, reverse=True)
for snap in snapshots:
if snap.status.lower() == 'deleted':
return True
if snap.snapshot_type.lower() == 'full':
return False
return True