Repository URL to install this package:
|
Version:
2.5 ▾
|
# Copyright 2015 TrilioData Inc.
# All Rights Reserved.
import copy
import os
import stat
import uuid
import json
import math
from tempfile import mkstemp
from lxml import etree
import re
import time
try:
from oslo_log import log as logging
except ImportError:
from nova.openstack.common import log as logging
try:
from oslo_config import cfg
except ImportError:
from oslo.config import cfg
try:
from oslo_utils import fileutils
except ImportError:
from nova.openstack.common import fileutils
from oslo_privsep import priv_context
from keystoneclient.auth.identity import v2 as v2_auth
from keystoneclient import session
try:
from nova import volume
except BaseException:
pass
try:
from nova.volume import cinder as volume1
except BaseException:
pass
from nova import exception
from nova.objects import block_device
from nova.volume import encryptors
import nova.virt.libvirt.utils as libvirt_utils
from nova.virt.libvirt import config as vconfig
from nova import utils as nova_utils
from contego import utils as contego_utils
from contego.nova.extension.driver import qemuimages
from contego.nova.extension.driver import vault
from contego.nova.extension.driver import loopingcall
from backend import Backend
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
if CONF.vault_storage_type.lower() in ('swift-s', 's3'):
CONF.vault_using_fuse = True
else:
CONF.vault_using_fuse = False
#Setting helper command for privsep
priv_context.init(root_helper=['sudo', 'nova-rootwrap', CONF.rootwrap_config ])
def is_blk_device(dev):
try:
if stat.S_ISBLK(os.stat(dev).st_mode):
return True
return False
except Exception:
LOG.debug('Path %s not found in is_blk_device check', dev)
return False
def check_for_odirect_support(src, dest, flag='oflag=direct'):
# Check whether O_DIRECT is supported
try:
nova_utils.execute('dd', 'count=0', 'if=%s' % src, 'of=%s' % dest,
flag, run_as_root=True)
return True
except Exception:
return False
def get_new_context(context):
try:
auth_plugin = v2_auth.Password(
auth_url=CONF.neutron.admin_auth_url,
username="admin",
password=CONF.neutron.admin_password,
tenant_id=context.tenant)
keystone_session = session.Session.load_from_conf_options(
CONF, "neutron")
new_token = auth_plugin.get_token(keystone_session)
new_context = copy.deepcopy(context)
new_context.auth_token = new_token
return new_context
except Exception:
return context
class CinderBackend(Backend):
def __init__(self, **kwargs):
try:
self._volume_api = volume.API()
except BaseException:
self._volume_api = volume1.API()
self.virt_driver = kwargs['virt_driver']
self.backend = 'cinder'
def prepare_snapshot(self, devices, **kwargs):
pass
def _volume_snapshot_create(self, context, domain):
"""Perform volume snapshot.
:param domain: VM that volume is attached to
:param volume_id: volume UUID to snapshot
:param new_file: relative path to new qcow2 file present on share
"""
xml = domain.XMLDesc(0)
xml_doc = etree.fromstring(xml)
device_info = vconfig.LibvirtConfigGuest()
device_info.parse_dom(xml_doc)
cinder_disks_to_snap = [] # to be snapshotted by libvirt
disks_to_skip = [] # local disks not snapshotted
snapshots = []
for guest_disk in device_info.devices:
if (guest_disk.root_name != 'disk'):
continue
if (guest_disk.target_dev is None):
continue
if (guest_disk.serial is None):
disks_to_skip.append(guest_disk.target_dev)
continue
try:
self._volume_api.get(context, guest_disk.serial)
except Exception as ex:
disks_to_skip.append(guest_disk.target_dev)
continue
# disk is a Cinder volume with the correct volume_id
disk_info = {
'dev': guest_disk.target_dev,
'serial': guest_disk.serial,
'current_file': guest_disk.source_path,
'source_protocol': guest_disk.source_protocol,
'source_name': guest_disk.source_name,
'source_hosts': guest_disk.source_hosts,
'source_ports': guest_disk.source_ports
}
# Determine path for new_file based on current path
cinder_disks_to_snap.append(disk_info)
if not cinder_disks_to_snap:
msg = 'Found no cinder disks to snapshot.'
LOG.info(msg)
return snapshots
try:
for disk_info in cinder_disks_to_snap:
# Call volume api snapshot api to create a new snapshot
# Use force as we know the volume is mapped to the vm
snapshot = self._volume_api.create_snapshot_force(
context, disk_info['serial'],
'TrilioVaultSnapshot',
'TrilioVault initiated snapshot')
snapshot['backend'] = disk_info['source_protocol'] or \
self.backend
snapshot['dev'] = disk_info['dev']
snapshot['path'] = disk_info['current_file'] or \
disk_info['source_name']
snapshot['size'] = snapshot['size'] * 1024 * 1024 * 1024
snapshot['backings'] = [{'path': snapshot['id'],
'size': snapshot['size']}]
snapshots.append(snapshot)
except Exception as ex:
LOG.exception('Error occurred during '
'creating snapshot of volume %s' %
disk_info['serial'])
raise ex
# wait until each snapshot gets to either available or error
# state
for snapshot in snapshots:
# Call volume api snapshot api to create a new snapshot
def _wait_for_snapshot():
snapobj = self._volume_api.get_snapshot(
context, snapshot['id'])
if snapobj['status'] == 'available' or \
snapobj['status'] == 'error': # noqa
raise loopingcall.LoopingCallDone()
timer = loopingcall.FixedIntervalLoopingCall(_wait_for_snapshot)
timer.start(interval=10).wait()
try:
for snapshot in snapshots:
snapobj = self._volume_api.get_snapshot(
context, snapshot['id'])
if snapobj['status'] == 'error':
LOG.error(('snapshot %s failed' % snapobj['id']))
raise Exception(('snapshot %s failed' % snapobj['id']))
except Exception as ex:
for snapshot in snapshots:
try:
self._volume_api.delete_snapshot(context, snapshot['id'])
except Exception:
pass
snapshots = []
raise ex
return snapshots
def create_snapshot(self, devices, **kwargs):
"""Create snapshots of cinder volumes attached to the instance.
"""
instance_uuid = kwargs['instance_uuid']
instance_name = kwargs['instance_name']
context = kwargs['context']
try:
virt_dom = self.virt_driver._conn.lookupByName(instance_name)
except exception.InstanceNotFound:
raise exception.InstanceNotRunning(instance_id=instance_uuid)
try:
snapshots = self._volume_snapshot_create(context, virt_dom)
except Exception as ex:
LOG.error('Error occurred during '
'create_snapshot. ')
LOG.exception(ex)
raise ex
return snapshots
def delete_snapshot(self, disk_info, **kwargs):
"""Deletes a cinder snapshot."""
try:
params = kwargs['params']
workload_failed = params["workload_failed"]
if workload_failed:
# If the snapshot failed, delete the corresponding snapshot
kwargs['delete_only'] = [uuid.UUID(disk_info['id'])]
else:
# Delete any snapshots that were left behind due to failed
# backup operations
kwargs['ignore_snapshots'] = [uuid.UUID(disk_info['id'])]
kwargs['volume_id_to_reset'] = uuid.UUID(disk_info['volume_id'])
self.reset_snapshot(None, **kwargs)
except Exception as ex:
LOG.debug(('Cannot delete snapshot %s'), disk_info['path'])
LOG.exception(ex)
def update_snapshot_info(self, disk_info, **kwargs):
# TODO update the snapshot size
return disk_info
def check_prev_snapshot(self, disk_info, **kwargs):
context = kwargs['context']
prev_snap_info = None
try:
if disk_info['prev_disk_info']:
prev_disk_info = disk_info['prev_disk_info']
context = get_new_context(context)
prev_snap_info = self._volume_api.get_snapshot(
context, prev_disk_info['id'])
except Exception as ex:
LOG.exception(ex)
if prev_snap_info:
return {'result': 'success'}
else:
return {'result': 'invalid'}
def upload_snapshot(self, disk_info, **kwargs):
context = kwargs['context']
params = kwargs['params']
if CONF.vault_storage_type != 'nfs' and CONF.vault_using_fuse != True:
raise Exception("Only NFS/Swift/S3 is supported for now")
def _create_volume_from_snapshot(snapshot_id, size):
# map the current snapshot
admin_context = context.elevated()
sizeingb = int(math.ceil(size / 1024 / 1024 / 1024))
snap_volume = self._volume_api.create(
context, sizeingb,
"temp_volume-%s" % snapshot_id,
"from triliovault",
snapshot={'id': snapshot_id},
metadata={'fortriliovault': "True"})
def _wait_for_volume():
new_context = get_new_context(context)
vol = self._volume_api.get(new_context, snap_volume['id'])
if vol['status'] == 'available' or \
vol['status'] == 'error': # noqa
raise loopingcall.LoopingCallDone()
timer = loopingcall.FixedIntervalLoopingCall(_wait_for_volume)
timer.start(interval=10).wait()
new_context = get_new_context(context)
vol = self._volume_api.get(new_context, snap_volume['id'])
if vol['status'] != 'available': # noqa
return None
connector = self.virt_driver.get_volume_connector(None)
connection = self._volume_api.initialize_connection(
admin_context, snap_volume['id'], connector)
# Add to the initiator so the volume can be accessible from the
# host
encryptors.get_encryption_metadata(
admin_context, self._volume_api,
snap_volume['id'], connection)
new_context = get_new_context(context)
try:
disk_info = {'bus': 'virtio', 'type': 'disk', 'dev': u'vdc'}
try:
self.virt_driver._connect_volume(connection, disk_info)
except BaseException:
conf = self.virt_driver.volume_driver_method(
'connect_volume', connection, disk_info)
if 'data' in connection:
connection['data']['device_path'] = conf.source_path
except Exception as ex:
LOG.exception(("Driver failed to attach volume "
"%(volume_id)s at %(mountpoint)s") %
{'volume_id': snap_volume['id'],
'mountpoint': snap_volume['mountpoint']},
context=context)
self._volume_api.terminate_connection(
admin_context, snap_volume['id'],
connector)
self._volume_api.delete(new_context, snap_volume['id'])
raise ex
return {'volume': snap_volume,
'connection': connection,
'path': ""}
curr_snap_volume_info = _create_volume_from_snapshot(
disk_info['id'], disk_info['size'])
if curr_snap_volume_info is None:
raise Exception("Cannot create volume from snapshot. "
"Please check cinder logs for the root cause")
try:
conn = curr_snap_volume_info['connection']
curr_snap_path = conn['data']['device_path']
snapshot_vm_disk_resource_path = \
vault.get_snapshot_vm_disk_resource_path(params['metadata'])
progress_tracker_path = vault.get_progress_tracker_path(params['metadata'])
if disk_info['prev_disk_info']:
prev_disk_info = disk_info['prev_disk_info']
context = get_new_context(context)
prev_snap_volume_info = _create_volume_from_snapshot(
prev_disk_info['id'], prev_disk_info['size'])
if prev_snap_volume_info is None:
raise Exception("Cannot create volume from prev snapshot")
try:
pconn = prev_snap_volume_info['connection']
prev_snap_path = pconn['data']['device_path']
vsize = str(prev_snap_volume_info['volume']['size']) + 'G'
head, tail = os.path.split(snapshot_vm_disk_resource_path)
fileutils.ensure_tree(head)
if CONF.vault_storage_type.lower() in ('nfs', 'swift-s', 's3'):
try:
libvirt_utils.create_cow_image(
None, snapshot_vm_disk_resource_path, vsize)
except:
# s3 sometimes may throw exception in create_cow_images
# due its eventual consistency nature
# give sometime for S3 to propagate the changes
time.sleep(30)
qemuimages.qemu_img_info(snapshot_vm_disk_resource_path)
self._create_qcow2_from_diff(
prev_snap_path, curr_snap_path,
snapshot_vm_disk_resource_path,
progress_tracker_path)
else:
qcow2file = self._get_temp_file_path(True)
libvirt_utils.create_cow_image(None, qcow2file, vsize)
self._create_qcow2_from_diff(
prev_snap_path, curr_snap_path,
qcow2file, progress_tracker_path)
vault.upload_snapshot_vm_disk_resource(
context, params['metadata'], qcow2file)
os.remove(qcow2file)
finally:
try:
self.virt_driver._disconnect_volume(
prev_snap_volume_info['connection'],
disk_info['dev'])
except BaseException as ex:
LOG.exception(ex)
try:
self.virt_driver.volume_driver_method(
'disconnect_volume',
prev_snap_volume_info['connection'],
disk_info['dev'])
except Exception as ex:
LOG.exception(ex)
raise Exception("Cannot disconnect volume from host. "
"Please check tvault-contego.log for detailed errors")
try:
context = get_new_context(context)
self._volume_api.delete(
context,
prev_snap_volume_info['volume']['id'])
except:
LOG.info("volume %s cannot be deleted because of token "
"expiration. Will cleanup during vast_finalize()" %
prev_snap_volume_info['volume']['id'])
pass
else:
head, tail = os.path.split(snapshot_vm_disk_resource_path)
fileutils.ensure_tree(head)
cmdspec = ['qemu-img', 'convert', '-p', '-O', 'qcow2',
curr_snap_path, snapshot_vm_disk_resource_path]
self._execute_qemu_img_and_track_progress(
cmdspec, curr_snap_path,
snapshot_vm_disk_resource_path, progress_tracker_path)
qemuimages.qemu_img_info(snapshot_vm_disk_resource_path)
finally:
try:
self.virt_driver._disconnect_volume(
curr_snap_volume_info['connection'],
disk_info['dev'])
except BaseException:
try:
self.virt_driver.volume_driver_method(
'disconnect_volume',
curr_snap_volume_info['connection'],
disk_info['dev'])
except Exception as ex:
LOG.exception(ex)
raise Exception("Cannot disconnect volume from host. Please check "
"tvault-contego.log for detailed errors")
try:
context = get_new_context(context)
self._volume_api.delete(
context, curr_snap_volume_info['volume']['id'])
except:
LOG.info("volume %s cannot be deleted because of token "
"expiration. Will cleanup during vast_finalize()" %
curr_snap_volume_info['volume']['id'])
pass
def _volume_snapshot_delete(self, context, domain,
volume_id_to_reset=None,
ignore_snapshots=None,
delete_only=None):
xml = domain.XMLDesc(0)
xml_doc = etree.fromstring(xml)
device_info = vconfig.LibvirtConfigGuest()
device_info.parse_dom(xml_doc)
for guest_disk in device_info.devices:
if (guest_disk.root_name != 'disk'):
continue
if (guest_disk.target_dev is None):
continue
if (guest_disk.serial is None):
continue
if volume_id_to_reset and \
volume_id_to_reset != uuid.UUID(guest_disk.serial):
continue
try:
volume = self._volume_api.get(context, guest_disk.serial)
if not volume:
continue
except Exception:
continue
# Get the volume object of interest
volumes = self._volume_api.get_all(context)
for snapshot in self._volume_api.get_all_snapshots(context):
try:
key = 'display_name'
if key not in snapshot:
key = 'name'
# if the snapshot volume is not for the volume we
# are interested in or if the snapshot name does
# not contain triliovault string
# we will ignore the snapshot
if snapshot['volume_id'] != volume['id'] or \
not 'triliovault' in snapshot[key].lower(): # noqa
continue
# delete temporary volumes created for this snapshot anyway
# irrespective of whether the snapshot in exclusion list or not
try:
for vol in volumes:
if vol[key] == 'temp_volume-%s' % snapshot['id']:
self._volume_api.delete(context, vol['id'])
break
except:
pass
if ignore_snapshots and \
uuid.UUID(snapshot['id']) in ignore_snapshots: # noqa
continue
if delete_only and \
not uuid.UUID(snapshot['id']) in delete_only: # noqa
continue
# we definitely need to delete the snapshot
self._volume_api.delete_snapshot(context, snapshot['id'])
except BaseException:
pass
return []
def reset_snapshot(self, devices, **kwargs):
# delete all triliovault created snapshots
instance_uuid = kwargs['instance_uuid']
instance_name = kwargs['instance_name']
context = kwargs['context']
try:
virt_dom = self.virt_driver._conn.lookupByName(instance_name)
except exception.InstanceNotFound:
raise exception.InstanceNotRunning(instance_id=instance_uuid)
# remove all snapshots that triliovault created and left behind
try:
ignore_snapshots = None
if 'ignore_snapshots' in kwargs:
ignore_snapshots = kwargs['ignore_snapshots']
delete_only = None
if 'delete_only' in kwargs:
delete_only = kwargs['delete_only']
volume_id_to_reset = None
if 'volume_id_to_reset' in kwargs:
volume_id_to_reset = kwargs['volume_id_to_reset']
snapshots = self._volume_snapshot_delete(
context, virt_dom,
volume_id_to_reset=volume_id_to_reset,
ignore_snapshots=ignore_snapshots,
delete_only=delete_only)
return snapshots
except Exception as ex:
LOG.exception(ex)
return []
def copy_backup_image_to_volume(self, context, instance_uuid,
instance_name, params):
volume_id = params['volume_id']
backup_path = params['backup_image_file_path']
progress_tracking_file_path = params['progress_tracking_file_path']
try:
volume = self._volume_api.get(context, volume_id)
if not volume:
raise Exception("Cinder volume by id %(volume_id)s not found" %
{'volume_id': volume_id})
except Exception as ex:
LOG.exception(ex)
volume_path = None
# find the device path that is mapped
bdms = block_device.BlockDeviceMappingList.get_by_instance_uuid(
context, instance_uuid)
for bdm in bdms:
if not bdm.volume_id or\
uuid.UUID(bdm.volume_id) != uuid.UUID(volume_id):
continue
conninfo = json.loads(bdm.connection_info)
if uuid.UUID(conninfo['serial']) != uuid.UUID(volume_id):
raise Exception("Serial attribute of connection information"
" does not match with volume_id")
if 'data' not in conninfo:
raise Exception("data attribute is "
" not set in the conninfo")
if 'device_path' in conninfo['data'] and conninfo['data']['device_path'] is not None:
volume_path = conninfo['data']['device_path']
try:
os.stat(volume_path)
except Exception as ex:
LOG.exception(ex)
raise
elif 'name' in conninfo['data'] and conninfo['data']['name'] is not None:
volume_path = conninfo['data']['name']
else:
raise Exception("Data attribute or device_path attribute is "
" not set in the conninfo")
# check if the path exists. Otherwise thows an exception
if not volume_path:
raise Exception("volume path for the volume id %s not found" %
volume_id)
break
try:
self.transfer_qemu_image_to_volume(
volume_path, backup_path,
progress_tracking_file_path)
except Exception as ex:
if 'cinder_backend' in self.__module__:
self.transfer_qemu_image_to_volume(
volume_path, backup_path,
progress_tracking_file_path, default_cache=True)
else:
raise Exception("Potential reason for this failure could " + \
"be ceph credentials are not available in " + \
"nova.conf, Please add them to " + \
"tvault-contego.conf")
def _create_qcow2_from_diff(self, prev_snap_path, curr_snap_path,
qcow2file, progress_tracker_path):
# ./qemu-img convert -f raw -O qcow2 -D <prevsnaphot> -F raw <currsnapshot> /tmp/q1.qcow2
qemu_img_bin = self.get_qemu_img_path()
cmdspec = [qemu_img_bin, 'convert', '-p', '-f',
'raw', '-O', 'qcow2', '-W', '-D',
prev_snap_path, '-F', 'raw', curr_snap_path,
qcow2file]
self._execute_qemu_img_and_track_progress(cmdspec, curr_snap_path,
qcow2file, progress_tracker_path)
qemuimages.qemu_img_info(qcow2file)
# remove backing file
out1, err1 = nova_utils.execute(qemu_img_bin, 'rebase',
'-u', qcow2file)
def get_dev_size(self, filename):
with open(filename, 'rb') as f:
f.seek(0, 2)
size = f.tell()
return size
def transfer_qemu_image_to_volume(
self,
volume_path,
backup_image_file_path,
progress_tracking_file_path,
default_cache=False):
cmdspec = [
'sudo',
'nova-rootwrap',
CONF.rootwrap_config,
'qemu-img',
'convert',
'-p',
]
if is_blk_device(volume_path) and \
check_for_odirect_support(backup_image_file_path,
volume_path, flag='oflag=direct'):
cmdspec += ['-t', 'none']
cmdspec += ['-O', 'raw', backup_image_file_path, volume_path]
if default_cache is True:
cmdspec.remove('-t')
cmdspec.remove('none')
cmd = " ".join(cmdspec)
LOG.debug(('transfer_qemu_image_to_volume cmd %s ' % cmd))
self._execute_qemu_img_and_track_progress(
cmdspec, volume_path, backup_image_file_path,
progress_tracking_file_path)
class LVMBackend(CinderBackend):
def __init__(self, **kwargs):
super(LVMBackend, self).__init__(**kwargs)
self.backend = 'lvm'
def upload_snapshot(self, disk_info, **kwargs):
context = kwargs['context']
params = kwargs['params']
if CONF.vault_storage_type != 'nfs' and CONF.vault_using_fuse != True:
raise Exception("Only NFS/Swift/S3 is supported for now")
curr_snap_path = '/dev/stack-volumes-lvmdriver-1/_snapshot-' + \
disk_info['id']
snapshot_vm_disk_resource_path = \
vault.get_snapshot_vm_disk_resource_path(params['metadata'])
if disk_info['prev_disk_info']:
prev_disk_info = disk_info['prev_disk_info']
prev_snap_path = '/dev/stack-volumes-lvmdriver-1/_snapshot-' + \
prev_disk_info['id']
vsize = str(disk_info['size'])
head, tail = os.path.split(snapshot_vm_disk_resource_path)
fileutils.ensure_tree(head)
try:
libvirt_utils.create_cow_image(
prev_snap_path,
snapshot_vm_disk_resource_path, vsize)
except:
# s3 sometimes may throw exception in create_cow_images
# due its eventual consistency nature
# give sometime for S3 to propagate the changes
time.sleep(30)
qemuimages.qemu_img_info(snapshot_vm_disk_resource_path)
self._create_qcow2_from_diff(prev_snap_path, curr_snap_path,
snapshot_vm_disk_resource_path)
else:
# full backup. If there any previous snapshots that
# triliodata created, delete all of them
head, tail = os.path.split(snapshot_vm_disk_resource_path)
fileutils.ensure_tree(head)
qemuimages.convert_image(
curr_snap_path,
snapshot_vm_disk_resource_path,
'qcow2')