Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
idna / lib / python2.7 / site-packages / contego / nova / extension / driver / backends / cinder_backend.py
Size: Mime:
# Copyright 2015 TrilioData Inc.
# All Rights Reserved.

import copy
import os
import stat
import uuid
import json
import math
from tempfile import mkstemp
from lxml import etree
import re
import time

try:
    from oslo_log import log as logging
except ImportError:
    from nova.openstack.common import log as logging

try:
    from oslo_config import cfg
except ImportError:
    from oslo.config import cfg

try:
    from oslo_utils import fileutils
except ImportError:
    from nova.openstack.common import fileutils
from oslo_privsep import priv_context

from keystoneclient.auth.identity import v2 as v2_auth
from keystoneclient import session

try:
    from nova import volume
except BaseException:
    pass

try:
    from nova.volume import cinder as volume1
except BaseException:
    pass
from nova import exception
from nova.objects import block_device
from nova.volume import encryptors

import nova.virt.libvirt.utils as libvirt_utils
from nova.virt.libvirt import config as vconfig
from nova import utils as nova_utils

from contego import utils as contego_utils
from contego.nova.extension.driver import qemuimages
from contego.nova.extension.driver import vault
from contego.nova.extension.driver import loopingcall

from backend import Backend

LOG = logging.getLogger(__name__)
CONF = cfg.CONF

if CONF.vault_storage_type.lower() in ('swift-s', 's3'):
    CONF.vault_using_fuse = True
else:
    CONF.vault_using_fuse = False

#Setting helper command for privsep
priv_context.init(root_helper=['sudo', 'nova-rootwrap', CONF.rootwrap_config ])

def is_blk_device(dev):
    try:
        if stat.S_ISBLK(os.stat(dev).st_mode):
            return True
        return False
    except Exception:
        LOG.debug('Path %s not found in is_blk_device check', dev)
        return False


def check_for_odirect_support(src, dest, flag='oflag=direct'):

    # Check whether O_DIRECT is supported
    try:
        nova_utils.execute('dd', 'count=0', 'if=%s' % src, 'of=%s' % dest,
                           flag, run_as_root=True)
        return True
    except Exception:
        return False


def get_new_context(context):
    try:
        auth_plugin = v2_auth.Password(
            auth_url=CONF.neutron.admin_auth_url,
            username="admin",
            password=CONF.neutron.admin_password,
            tenant_id=context.tenant)

        keystone_session = session.Session.load_from_conf_options(
            CONF, "neutron")
        new_token = auth_plugin.get_token(keystone_session)

        new_context = copy.deepcopy(context)
        new_context.auth_token = new_token
        return new_context
    except Exception:
        return context


class CinderBackend(Backend):
    def __init__(self, **kwargs):
        try:
            self._volume_api = volume.API()
        except BaseException:
            self._volume_api = volume1.API()
        self.virt_driver = kwargs['virt_driver']
        self.backend = 'cinder'

    def prepare_snapshot(self, devices, **kwargs):
        pass

    def _volume_snapshot_create(self, context, domain):
        """Perform volume snapshot.

           :param domain: VM that volume is attached to
           :param volume_id: volume UUID to snapshot
           :param new_file: relative path to new qcow2 file present on share

        """

        xml = domain.XMLDesc(0)
        xml_doc = etree.fromstring(xml)

        device_info = vconfig.LibvirtConfigGuest()
        device_info.parse_dom(xml_doc)

        cinder_disks_to_snap = []    # to be snapshotted by libvirt
        disks_to_skip = []          # local disks not snapshotted
        snapshots = []

        for guest_disk in device_info.devices:
            if (guest_disk.root_name != 'disk'):
                continue

            if (guest_disk.target_dev is None):
                continue

            if (guest_disk.serial is None):
                disks_to_skip.append(guest_disk.target_dev)
                continue

            try:
                self._volume_api.get(context, guest_disk.serial)
            except Exception as ex:
                disks_to_skip.append(guest_disk.target_dev)
                continue

            # disk is a Cinder volume with the correct volume_id
            disk_info = {
                'dev': guest_disk.target_dev,
                'serial': guest_disk.serial,
                'current_file': guest_disk.source_path,
                'source_protocol': guest_disk.source_protocol,
                'source_name': guest_disk.source_name,
                'source_hosts': guest_disk.source_hosts,
                'source_ports': guest_disk.source_ports
            }

            # Determine path for new_file based on current path
            cinder_disks_to_snap.append(disk_info)

        if not cinder_disks_to_snap:
            msg = 'Found no cinder disks to snapshot.'
            LOG.info(msg)
            return snapshots

        try:
            for disk_info in cinder_disks_to_snap:
                # Call volume api snapshot api to create a new snapshot
                # Use force as we know the volume is mapped to the vm
                snapshot = self._volume_api.create_snapshot_force(
                    context, disk_info['serial'],
                    'TrilioVaultSnapshot',
                    'TrilioVault initiated snapshot')
                snapshot['backend'] = disk_info['source_protocol'] or \
                    self.backend
                snapshot['dev'] = disk_info['dev']
                snapshot['path'] = disk_info['current_file'] or \
                    disk_info['source_name']
                snapshot['size'] = snapshot['size'] * 1024 * 1024 * 1024
                snapshot['backings'] = [{'path': snapshot['id'],
                                         'size': snapshot['size']}]
                snapshots.append(snapshot)
        except Exception as ex:
            LOG.exception('Error occurred during '
                          'creating snapshot of volume %s' %
                          disk_info['serial'])
            raise ex

        # wait until each snapshot gets to either available or error
        # state
        for snapshot in snapshots:
            # Call volume api snapshot api to create a new snapshot
            def _wait_for_snapshot():
                snapobj = self._volume_api.get_snapshot(
                    context, snapshot['id'])

                if snapobj['status'] == 'available' or \
                    snapobj['status'] == 'error':  # noqa
                    raise loopingcall.LoopingCallDone()

            timer = loopingcall.FixedIntervalLoopingCall(_wait_for_snapshot)
            timer.start(interval=10).wait()

        try:
            for snapshot in snapshots:
                snapobj = self._volume_api.get_snapshot(
                    context, snapshot['id'])
                if snapobj['status'] == 'error':
                    LOG.error(('snapshot %s failed' % snapobj['id']))
                    raise Exception(('snapshot %s failed' % snapobj['id']))
        except Exception as ex:
            for snapshot in snapshots:
                try:
                    self._volume_api.delete_snapshot(context, snapshot['id'])
                except Exception:
                    pass
            snapshots = []
            raise ex

        return snapshots

    def create_snapshot(self, devices, **kwargs):
        """Create snapshots of cinder volumes attached to the instance.

        """

        instance_uuid = kwargs['instance_uuid']
        instance_name = kwargs['instance_name']
        context = kwargs['context']

        try:
            virt_dom = self.virt_driver._conn.lookupByName(instance_name)
        except exception.InstanceNotFound:
            raise exception.InstanceNotRunning(instance_id=instance_uuid)

        try:
            snapshots = self._volume_snapshot_create(context, virt_dom)
        except Exception as ex:
            LOG.error('Error occurred during '
                      'create_snapshot. ')
            LOG.exception(ex)
            raise ex

        return snapshots

    def delete_snapshot(self, disk_info, **kwargs):
        """Deletes a cinder snapshot."""
        try:
            params = kwargs['params']

            workload_failed = params["workload_failed"]
            if workload_failed:
                # If the snapshot failed, delete the corresponding snapshot
                kwargs['delete_only'] = [uuid.UUID(disk_info['id'])]
            else:
                # Delete any snapshots that were left behind due to failed
                # backup operations
                kwargs['ignore_snapshots'] = [uuid.UUID(disk_info['id'])]
            kwargs['volume_id_to_reset'] = uuid.UUID(disk_info['volume_id'])

            self.reset_snapshot(None, **kwargs)
        except Exception as ex:
            LOG.debug(('Cannot delete snapshot %s'), disk_info['path'])
            LOG.exception(ex)

    def update_snapshot_info(self, disk_info, **kwargs):
        # TODO update the snapshot size
        return disk_info

    def check_prev_snapshot(self, disk_info, **kwargs):
        context = kwargs['context']

        prev_snap_info = None
        try:
            if disk_info['prev_disk_info']:
                prev_disk_info = disk_info['prev_disk_info']
                context = get_new_context(context)
                prev_snap_info = self._volume_api.get_snapshot(
                    context, prev_disk_info['id'])
        except Exception as ex:
            LOG.exception(ex)

        if prev_snap_info:
            return {'result': 'success'}
        else:
            return {'result': 'invalid'}

    def upload_snapshot(self, disk_info, **kwargs):
        context = kwargs['context']
        params = kwargs['params']

        if CONF.vault_storage_type != 'nfs' and CONF.vault_using_fuse != True:
            raise Exception("Only NFS/Swift/S3 is supported for now")

        def _create_volume_from_snapshot(snapshot_id, size):
            # map the current snapshot
            admin_context = context.elevated()
            sizeingb = int(math.ceil(size / 1024 / 1024 / 1024))
            snap_volume = self._volume_api.create(
                context, sizeingb,
                "temp_volume-%s" % snapshot_id,
                "from triliovault",
                snapshot={'id': snapshot_id},
                metadata={'fortriliovault': "True"})

            def _wait_for_volume():
                new_context = get_new_context(context)
                vol = self._volume_api.get(new_context, snap_volume['id'])

                if vol['status'] == 'available' or \
                    vol['status'] == 'error':  # noqa
                    raise loopingcall.LoopingCallDone()

            timer = loopingcall.FixedIntervalLoopingCall(_wait_for_volume)
            timer.start(interval=10).wait()

            new_context = get_new_context(context)
            vol = self._volume_api.get(new_context, snap_volume['id'])

            if vol['status'] != 'available':  # noqa
                return None

            connector = self.virt_driver.get_volume_connector(None)
            connection = self._volume_api.initialize_connection(
                admin_context, snap_volume['id'], connector)

            # Add to the initiator so the volume can be accessible from the
            # host
            encryptors.get_encryption_metadata(
                admin_context, self._volume_api,
                snap_volume['id'], connection)

            new_context = get_new_context(context)

            try:
                disk_info = {'bus': 'virtio', 'type': 'disk', 'dev': u'vdc'}
                try:
                    self.virt_driver._connect_volume(connection, disk_info)
                except BaseException:
                    conf = self.virt_driver.volume_driver_method(
                        'connect_volume', connection, disk_info)
                    if 'data' in connection:
                        connection['data']['device_path'] = conf.source_path
            except Exception as ex:
                LOG.exception(("Driver failed to attach volume "
                               "%(volume_id)s at %(mountpoint)s") %
                              {'volume_id': snap_volume['id'],
                               'mountpoint': snap_volume['mountpoint']},
                              context=context)
                self._volume_api.terminate_connection(
                    admin_context, snap_volume['id'],
                    connector)
                self._volume_api.delete(new_context, snap_volume['id'])
                raise ex

            return {'volume': snap_volume,
                    'connection': connection,
                    'path': ""}

        curr_snap_volume_info = _create_volume_from_snapshot(
            disk_info['id'], disk_info['size'])

        if curr_snap_volume_info is None:
            raise Exception("Cannot create volume from snapshot. "
                            "Please check cinder logs for the root cause")

        try:
            conn = curr_snap_volume_info['connection']
            curr_snap_path = conn['data']['device_path']
            snapshot_vm_disk_resource_path = \
                vault.get_snapshot_vm_disk_resource_path(params['metadata'])
            progress_tracker_path = vault.get_progress_tracker_path(params['metadata'])
            if disk_info['prev_disk_info']:
                prev_disk_info = disk_info['prev_disk_info']
                context = get_new_context(context)
                prev_snap_volume_info = _create_volume_from_snapshot(
                    prev_disk_info['id'], prev_disk_info['size'])
                if prev_snap_volume_info is None:
                    raise Exception("Cannot create volume from prev snapshot")

                try:
                    pconn = prev_snap_volume_info['connection']
                    prev_snap_path = pconn['data']['device_path']
                    vsize = str(prev_snap_volume_info['volume']['size']) + 'G'
                    head, tail = os.path.split(snapshot_vm_disk_resource_path)
                    fileutils.ensure_tree(head)

                    if CONF.vault_storage_type.lower() in ('nfs', 'swift-s', 's3'):
                        try:
                            libvirt_utils.create_cow_image(
                                None, snapshot_vm_disk_resource_path, vsize)
                        except:
                            # s3 sometimes may throw exception in create_cow_images
                            # due its eventual consistency nature
                            # give sometime for S3 to propagate the changes
                            time.sleep(30)
                            qemuimages.qemu_img_info(snapshot_vm_disk_resource_path)
                        self._create_qcow2_from_diff(
                            prev_snap_path, curr_snap_path,
                            snapshot_vm_disk_resource_path,
                            progress_tracker_path)
                    else:
                        qcow2file = self._get_temp_file_path(True)
                        libvirt_utils.create_cow_image(None, qcow2file, vsize)
                        self._create_qcow2_from_diff(
                            prev_snap_path, curr_snap_path,
                            qcow2file, progress_tracker_path)
                        vault.upload_snapshot_vm_disk_resource(
                            context, params['metadata'], qcow2file)
                        os.remove(qcow2file)

                finally:
                    try:
                        self.virt_driver._disconnect_volume(
                            prev_snap_volume_info['connection'],
                            disk_info['dev'])
                    except BaseException as ex:
                        LOG.exception(ex)
                        try:
                            self.virt_driver.volume_driver_method(
                                'disconnect_volume',
                                prev_snap_volume_info['connection'],
                                disk_info['dev'])
                        except Exception as ex:
                            LOG.exception(ex)
                            raise Exception("Cannot disconnect volume from host. "
                                            "Please check tvault-contego.log for detailed errors")

                    try:
                        context = get_new_context(context)
                        self._volume_api.delete(
                            context,
                            prev_snap_volume_info['volume']['id'])
                    except:
                        LOG.info("volume %s cannot be deleted because of token "
                                "expiration. Will cleanup during vast_finalize()" %
                                prev_snap_volume_info['volume']['id'])
                        pass
            else:
                head, tail = os.path.split(snapshot_vm_disk_resource_path)
                fileutils.ensure_tree(head)

                cmdspec = ['qemu-img', 'convert', '-p', '-O', 'qcow2',
                       curr_snap_path, snapshot_vm_disk_resource_path]
    
                self._execute_qemu_img_and_track_progress(
                     cmdspec, curr_snap_path,
                     snapshot_vm_disk_resource_path, progress_tracker_path) 
                qemuimages.qemu_img_info(snapshot_vm_disk_resource_path)
        finally:
            try:
                self.virt_driver._disconnect_volume(
                    curr_snap_volume_info['connection'],
                    disk_info['dev'])
            except BaseException:
                try:
                    self.virt_driver.volume_driver_method(
                        'disconnect_volume',
                        curr_snap_volume_info['connection'],
                        disk_info['dev'])
                except Exception as ex:
                    LOG.exception(ex)
                    raise Exception("Cannot disconnect volume from host. Please check "
                                    "tvault-contego.log for detailed errors")

            try:
                context = get_new_context(context)
                self._volume_api.delete(
                    context, curr_snap_volume_info['volume']['id'])
            except:
                LOG.info("volume %s cannot be deleted because of token "
                        "expiration. Will cleanup during vast_finalize()" %
                        curr_snap_volume_info['volume']['id'])
                pass

    def _volume_snapshot_delete(self, context, domain,
                                volume_id_to_reset=None,
                                ignore_snapshots=None,
                                delete_only=None):
        xml = domain.XMLDesc(0)
        xml_doc = etree.fromstring(xml)

        device_info = vconfig.LibvirtConfigGuest()
        device_info.parse_dom(xml_doc)

        for guest_disk in device_info.devices:
            if (guest_disk.root_name != 'disk'):
                continue

            if (guest_disk.target_dev is None):
                continue

            if (guest_disk.serial is None):
                continue

            if volume_id_to_reset and \
               volume_id_to_reset != uuid.UUID(guest_disk.serial):
                continue

            try:
                volume = self._volume_api.get(context, guest_disk.serial)
                if not volume:
                    continue
            except Exception:
                continue

            # Get the volume object of interest
            volumes = self._volume_api.get_all(context)
            for snapshot in self._volume_api.get_all_snapshots(context):
                try:
                    key = 'display_name'
                    if key not in snapshot:
                        key = 'name'

                    # if the snapshot volume is not for the volume we
                    # are interested in or if the snapshot name does
                    # not contain triliovault string
                    # we will ignore the snapshot
                    if snapshot['volume_id'] != volume['id'] or \
                        not 'triliovault' in snapshot[key].lower():  # noqa
                        continue

                    # delete temporary volumes created for this snapshot anyway
                    # irrespective of whether the snapshot in exclusion list or not
                    try:
                        for vol in volumes:
                            if vol[key] == 'temp_volume-%s' % snapshot['id']:
                                self._volume_api.delete(context, vol['id'])
                                break
                    except:
                        pass

                    if ignore_snapshots and \
                        uuid.UUID(snapshot['id']) in ignore_snapshots:  # noqa
                        continue

                    if delete_only and \
                        not uuid.UUID(snapshot['id']) in delete_only:  # noqa
                        continue
 
                    # we definitely need to delete the snapshot
                    self._volume_api.delete_snapshot(context, snapshot['id'])
                except BaseException:
                    pass
        return []

    def reset_snapshot(self, devices, **kwargs):
        # delete all triliovault created snapshots
        instance_uuid = kwargs['instance_uuid']
        instance_name = kwargs['instance_name']
        context = kwargs['context']

        try:
            virt_dom = self.virt_driver._conn.lookupByName(instance_name)
        except exception.InstanceNotFound:
            raise exception.InstanceNotRunning(instance_id=instance_uuid)

        # remove all snapshots that triliovault created and left behind
        try:
            ignore_snapshots = None
            if 'ignore_snapshots' in kwargs:
                ignore_snapshots = kwargs['ignore_snapshots']

            delete_only = None
            if 'delete_only' in kwargs:
                delete_only = kwargs['delete_only']

            volume_id_to_reset = None
            if 'volume_id_to_reset' in kwargs:
                volume_id_to_reset = kwargs['volume_id_to_reset']

            snapshots = self._volume_snapshot_delete(
                context, virt_dom,
                volume_id_to_reset=volume_id_to_reset,
                ignore_snapshots=ignore_snapshots,
                delete_only=delete_only)

            return snapshots
        except Exception as ex:
            LOG.exception(ex)

        return []

    def copy_backup_image_to_volume(self, context, instance_uuid,
                                    instance_name, params):

        volume_id = params['volume_id']
        backup_path = params['backup_image_file_path']
        progress_tracking_file_path = params['progress_tracking_file_path']

        try:
            volume = self._volume_api.get(context, volume_id)
            if not volume:
                raise Exception("Cinder volume by id %(volume_id)s not found" %
                                {'volume_id': volume_id})
        except Exception as ex:
            LOG.exception(ex)

        volume_path = None

        # find the device path that is mapped
        bdms = block_device.BlockDeviceMappingList.get_by_instance_uuid(
            context, instance_uuid)
        for bdm in bdms:
            if not bdm.volume_id or\
               uuid.UUID(bdm.volume_id) != uuid.UUID(volume_id):
                continue

            conninfo = json.loads(bdm.connection_info)

            if uuid.UUID(conninfo['serial']) != uuid.UUID(volume_id):
                raise Exception("Serial attribute of connection information"
                                " does not match with volume_id")

            if 'data' not in conninfo:
                raise Exception("data attribute is "
                                " not set in the conninfo")

            if 'device_path' in conninfo['data'] and conninfo['data']['device_path'] is not None:
                volume_path = conninfo['data']['device_path']
                try:
                    os.stat(volume_path)
                except Exception as ex:
                    LOG.exception(ex)
                    raise

            elif 'name' in conninfo['data'] and conninfo['data']['name'] is not None:
                volume_path = conninfo['data']['name']
            else:
                raise Exception("Data attribute or device_path attribute is "
                                " not set in the conninfo")

            # check if the path exists. Otherwise thows an exception
            if not volume_path:
                raise Exception("volume path for the volume id %s not found" %
                                volume_id)
            break

        try:
            self.transfer_qemu_image_to_volume(
                volume_path, backup_path,
                progress_tracking_file_path)
        except Exception as ex:
            if 'cinder_backend' in self.__module__:
                self.transfer_qemu_image_to_volume(
                    volume_path, backup_path,
                    progress_tracking_file_path, default_cache=True)
            else:
                raise Exception("Potential reason for this failure could " + \
                                "be ceph credentials are not available in " + \
                                "nova.conf, Please add them to " + \
                                "tvault-contego.conf")

    def _create_qcow2_from_diff(self, prev_snap_path, curr_snap_path,
                                qcow2file, progress_tracker_path):

        # ./qemu-img convert -f raw -O qcow2 -D <prevsnaphot> -F raw <currsnapshot> /tmp/q1.qcow2
        qemu_img_bin = self.get_qemu_img_path()
        cmdspec = [qemu_img_bin, 'convert', '-p', '-f',
                   'raw', '-O', 'qcow2', '-W', '-D',
                   prev_snap_path, '-F', 'raw', curr_snap_path,
                   qcow2file]
        self._execute_qemu_img_and_track_progress(cmdspec, curr_snap_path,
                                                  qcow2file, progress_tracker_path) 
        qemuimages.qemu_img_info(qcow2file)
        # remove backing file
        out1, err1 = nova_utils.execute(qemu_img_bin, 'rebase',
                                        '-u', qcow2file)


    def get_dev_size(self, filename):
        with open(filename, 'rb') as f:
            f.seek(0, 2)
            size = f.tell()
            return size

    def transfer_qemu_image_to_volume(
            self,
            volume_path,
            backup_image_file_path,
            progress_tracking_file_path,
            default_cache=False):

        cmdspec = [
            'sudo',
            'nova-rootwrap',        
            CONF.rootwrap_config,
            'qemu-img',
            'convert',
            '-p',
        ]

        if is_blk_device(volume_path) and \
            check_for_odirect_support(backup_image_file_path,
                                      volume_path, flag='oflag=direct'):
            cmdspec += ['-t', 'none']

        cmdspec += ['-O', 'raw', backup_image_file_path, volume_path]

        if default_cache is True:
            cmdspec.remove('-t')
            cmdspec.remove('none')
        cmd = " ".join(cmdspec)

        LOG.debug(('transfer_qemu_image_to_volume cmd %s ' % cmd))
        self._execute_qemu_img_and_track_progress(
             cmdspec, volume_path, backup_image_file_path,
             progress_tracking_file_path) 



class LVMBackend(CinderBackend):
    def __init__(self, **kwargs):
        super(LVMBackend, self).__init__(**kwargs)
        self.backend = 'lvm'

    def upload_snapshot(self, disk_info, **kwargs):
        context = kwargs['context']
        params = kwargs['params']

        if CONF.vault_storage_type != 'nfs' and CONF.vault_using_fuse != True:
            raise Exception("Only NFS/Swift/S3 is supported for now")

        curr_snap_path = '/dev/stack-volumes-lvmdriver-1/_snapshot-' + \
            disk_info['id']
        snapshot_vm_disk_resource_path = \
            vault.get_snapshot_vm_disk_resource_path(params['metadata'])
        if disk_info['prev_disk_info']:
            prev_disk_info = disk_info['prev_disk_info']

            prev_snap_path = '/dev/stack-volumes-lvmdriver-1/_snapshot-' + \
                prev_disk_info['id']
            vsize = str(disk_info['size'])
            head, tail = os.path.split(snapshot_vm_disk_resource_path)
            fileutils.ensure_tree(head)

            try:
                libvirt_utils.create_cow_image(
                    prev_snap_path,
                    snapshot_vm_disk_resource_path, vsize)
            except:
                # s3 sometimes may throw exception in create_cow_images
                # due its eventual consistency nature
                # give sometime for S3 to propagate the changes
                time.sleep(30)
                qemuimages.qemu_img_info(snapshot_vm_disk_resource_path)

            self._create_qcow2_from_diff(prev_snap_path, curr_snap_path,
                                         snapshot_vm_disk_resource_path)
        else:
            # full backup. If there any previous snapshots that
            # triliodata created, delete all of them
            head, tail = os.path.split(snapshot_vm_disk_resource_path)
            fileutils.ensure_tree(head)

            qemuimages.convert_image(
                curr_snap_path,
                snapshot_vm_disk_resource_path,
                'qcow2')