Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# Copyright 2018 TrilioData Inc.
# All Rights Reserved .

"""Handles all requests relating to Contego functionality."""

import socket

try:
    from urlparse import urlparse
except ImportError:
    from urllib.parse import urlparse

try:
    from oslo_log import log as logging
except ImportError:
    from dmapi.openstack.common import log as logging

try:
    import oslo_messaging as messaging
except ImportError:
    from oslo import messaging

try:
    from oslo_config import cfg
except ImportError:
    from oslo.config import cfg

from dmapi import context as dmapi_context
from dmapi import objects

from dmapi.db import base
from dmapi import rpc
from dmapi.objects import base as objects_base

# New API capabilities should be added here
CAPABILITIES = ['live-snapshot',
                ]

LOG = logging.getLogger(__name__)
CONF = cfg.CONF


contego_api_opts = [cfg.StrOpt('contego_topic',
                               default='contego_1',
                               help='the topic Contego nodes listen on')]
CONF.register_opts(contego_api_opts)

rpcapi_cap_opt = cfg.StrOpt('contego',
                            help='Set a version cap for messages '
                                 'sent to contego services')
CONF.register_opt(rpcapi_cap_opt, 'contego_upgrade_levels')


class API(base.Base):
    """Client side of the contego RPC API

    API version history:

    * 1.0 - Initial version.

    """

    VERSION_ALIASES = {
        'grizzly': '1.0',
        'havana': '1.0',
        'icehouse': '1.0',
        'juno': '1.0',
        'kilo': '1.0',
    }

    # Allow passing in dummy image_service, but normally use the default
    def __init__(self, image_service=None, **kwargs):
        super(API, self).__init__(**kwargs)
        self.CAPABILITIES = CAPABILITIES
        target = messaging.Target(topic=CONF.contego_topic, version='1.0')
        version_cap = self.VERSION_ALIASES.get(
            CONF.contego_upgrade_levels.contego,
            CONF.contego_upgrade_levels.contego)
        serializer = objects_base.ContegoObjectSerializer()

        self.client = rpc.get_client(target,
                                     version_cap=version_cap,
                                     serializer=serializer)
        objects.register_all()

    def call(self, context, method, host, **kwargs):
        cctxt = self.client.prepare(server=host, version='1.0')
        return cctxt.call(context, method, kwargs)

    def cast(self, context, method, host, **kwargs):
        cctxt = self.client.prepare(server=host, version='1.0')
        return cctxt.cast(context, method, kwargs)

    def get_info(self):
        return {'capabilities': self.CAPABILITIES}

    def get(self, context, instance_uuid):
        """Get a single instance with the given instance_uuid."""

        # Two part lookup 
        #ctxt = context.get_admin_context()
        ctxt = context
        mapping = objects.InstanceMapping.get_by_instance_uuid(ctxt, instance_uuid)

        if mapping.cell_mapping is None:
            raise Exception('Instance %s is not mapped to a cell' % instance_uuid)

        with dmapi_context.target_cell(ctxt, mapping.cell_mapping) as cctxt:
            instance = objects.Instance.get_by_uuid(
                cctxt, instance_uuid,
                expected_attrs=['info_cache', 'security_groups', 'metadata'])
            newcctxt = cctxt

        instance_dict = dict(iter(instance.items()))
        instance_dict['metadata'] = getattr(instance, 'metadata', {})
        return newcctxt, instance_dict

    def _send_contego_message(self, method, context, instance, host=None,
                              params={}, is_call=False, timeout=6000):
        """Generic handler for RPC casts/calls to contego topic.
           This only blocks for a response with is_call=True.

        :param params: Optional dictionary of arguments to be passed to the
                       contego worker

        :returns: None
        """
        if not host:
            host = instance['host']

        cctxt = self.client.prepare(server=host, version='1.0',
                                    timeout=timeout)

        if hasattr(context, 'mq_connection'):
            cctxt.transport = context.mq_connection

        if instance is None:
            return cctxt.call(context, method, params=params)

        if is_call:
            return cctxt.call(context, method, instance_uuid=instance['uuid'],
                              instance_ref=instance, params=params)
        else:
            return self.cast(context, method, instance_uuid=instance['uuid'],
                             instance_ref=instance, params=params)

    def vast_prepare(self, context, instance_uuid, params):
        """
        Prepare to VAST the instance
        """
        #
        context, instance = self.get(context, instance_uuid)

        LOG.debug(("prepare to vast the instance") % locals())
        return self._send_contego_message('vast_prepare', context, instance,
                                          host=instance['host'],
                                          params=params, is_call=True)

    def vast_freeze(self, context, instance_uuid, params):
        """
        Freeze the instance
        """
        context, instance = self.get(context, instance_uuid)
        LOG.debug(("freeze the instance") % locals())
        return self._send_contego_message('vast_freeze', context, instance,
                                          host=instance['host'],
                                          params=params, is_call=True)

    def vast_thaw(self, context, instance_uuid, params):
        """
        Thaw the instance
        """
        context, instance = self.get(context, instance_uuid)
        LOG.debug(("thaw the instance") % locals())
        return self._send_contego_message('vast_thaw', context, instance,
                                          host=instance['host'],
                                          params=params, is_call=True)

    def vast_instance(self, context, instance_uuid, params):
        """
        VAST the instance
        """
        context, instance = self.get(context, instance_uuid)
        LOG.debug(("vast the instance") % locals())
        return self._send_contego_message('vast_instance', context,
                                          instance, host=instance['host'],
                                          params=params, is_call=True,
                                          timeout=6000)

    def vast_get_instance_user_data(self, context, instance_uuid, params):
        """
        get userdata of the instance
        """
        context, instance = self.get(context, instance_uuid)
        LOG.debug(("get userdata of the instance") % locals())
        return {"user_data": instance.get('user_data', None)}

    def vast_get_info(self, context, instance_uuid, params):
        """
        Get details of a VASTed the instance
        """
        context, instance = self.get(context, instance_uuid)
        LOG.debug(("get details of vasted instance") % locals())
        return self._send_contego_message('vast_get_info', context, instance,
                                          host=instance['host'],
                                          params=params, is_call=True)

    def vast_data_url(self, context, instance_uuid, params):
        """
        Get URL for the vast data server
        """
        context, instance = self.get(context, instance_uuid)
        LOG.debug(("Get the URL for the vast data server") % locals())
        return self._send_contego_message('vast_data_url', context, instance,
                                          host=instance['host'],
                                          params=params, is_call=True)

    def vast_data_transfer(self, context, instance_uuid, params):
        """
        Initiate data tranfer
        """
        context, instance = self.get(context, instance_uuid)
        LOG.debug(("initiate data tranfer") % locals())
        return self._send_contego_message('vast_data_transfer', context,
                                          instance, host=instance['host'],
                                          params=params, is_call=True,
                                          timeout=6000)

    def vast_check_prev_snapshot(self, context, instance_uuid, params):
        """
        Verify the existence of previous snapshot
        """
        context, instance = self.get(context, instance_uuid)
        LOG.debug(("Verifying previous snapshot") % locals())
        return self._send_contego_message('vast_check_prev_snapshot', context,
                                          instance, host=instance['host'],
                                          params=params, is_call=True,
                                          timeout=6000)

    def vast_async_task_status(self, context, instance_uuid, params):
        """
        Get data transfer job status for the given token
        """
        if 'host' in params['metadata']:
            host = params['metadata']['host']
            instance = {'uuid': instance_uuid}
        else:
            context, instance = self.get(context, instance_uuid)
            host = instance['host']
        LOG.debug(("Get data transfer job status for "
                   "the given token") % locals())
        return self._send_contego_message('vast_async_task_status',
                                          context, instance,
                                          host=host, params=params,
                                          is_call=True, timeout=6000)

    def vast_finalize(self, context, instance_uuid, params):
        """
        Finalize VAST
        """
        context, instance = self.get(context, instance_uuid)
        LOG.debug(("finalize the vast for the instance") % locals())
        return self._send_contego_message('vast_finalize', context, instance,
                                          host=instance['host'], params=params,
                                          is_call=True, timeout=6000)

    def vast_reset(self, context, instance_uuid, params):
        """
        Reset the instance so other openstack operations can be unblocked
        """
        context, instance = self.get(context.elevated(read_deleted='yes'),
                            instance_uuid)
        LOG.debug(("vast reset the instance") % locals())
        return self._send_contego_message('vast_reset', context, instance,
                                          host=instance['host'], params=params,
                                          is_call=True)

    def vast_commit_image(self, context, instance_uuid, params):
        """
        Commit snapshot image
        """
        context, instance = self.get(context, instance_uuid)
        LOG.debug(("Commiting snapshot image") % locals())
        return self._send_contego_message('vast_commit_image', context,
                                          instance, host=instance['host'],
                                          params=params, is_call=True)

    def map_snapshot_files(self, context, instance_uuid, params):
        """
        Map snapshot files to file manager instance identified by instance_uuid
        """
        context, instance = self.get(context, instance_uuid)
        LOG.debug(("map snapshot files to instance") % locals())
        return self._send_contego_message('map_snapshot_files', context,
                                          instance, host=instance['host'],
                                          params=params,
                                          is_call=True, timeout=6000)

    def copy_backup_image_to_volume(self, context, instance_uuid, params):
        """
        Copy backup image to volume
        """
        context, instance = self.get(context, instance_uuid)
        LOG.debug(("Copy backup image to volume that is mapped "
                   "to the instance") % locals())
        return self._send_contego_message('copy_backup_image_to_volume',
                                          context, instance,
                                          host=instance['host'], params=params,
                                          is_call=True, timeout=6000)

    def vast_config_backup(self, context, backup_id, params):
        """
        Backup OpenStack configuration for given services
        """
        host = params['host']
        return self._send_contego_message(
            'vast_config_backup',
            context,
            None,
            host=host,
            params=params,
            is_call=True,
            timeout=6000)

    def validate_trusted_user_and_key(self, context, params):
        """
        Validate trusted user and private key
        """
        host = params['host']
        controller_nodes = self.get_controller_nodes(context)
        params['controller_nodes'] = controller_nodes['controller_nodes']
        return self._send_contego_message(
            'validate_trusted_user_and_key',
            context,
            None,
            host=host,
            params=params,
            is_call=True,
            timeout=6000)

    def get_controller_nodes(self, context):
        """
        Return list of RabbitMQ hosts known to dmapi
        """
        def _get_hostname(address):
            try:
                return socket.gethostbyaddr(address)[0]
            except Exception as ex:
                return address

        def _get_hosts_from_transport_url(url):
            hosts = []
            for u in url.split(','):
                parsed_url = urlparse(u)
                hosts.append(parsed_url.hostname)
            return hosts

        try:
            hosts = []
            hosts = CONF.oslo_messaging_rabbit['rabbit_hosts']
            if hasattr(
                CONF,
                'transport_url') and getattr(
                CONF,
                'transport_url') not in [
                '',
                    None]:
                transport_hosts = _get_hosts_from_transport_url(
                    CONF.transport_url)
                hosts.extend(transport_hosts)
            hosts = list(set(hosts))
            for host in hosts[:]:
                if 'localhost' in host:
                    hosts.remove(host)
            hosts = [_get_hostname(host) for host in hosts]
            return {'controller_nodes': hosts}
        except Exception as ex:
            LOG.exception(ex)
            return {'controller_nodes': []}

    def validate_database_creds(self, context, params):
        """
        Validate given database credentials.
        """
        host = params['host']
        return self._send_contego_message('validate_database_creds',
                                          context, None, host=host, params=params,
                                          is_call=True, timeout=6000)

    def vast_disk_check(self, context, instance_uuid, params):
        """
        Get disk check for VASTed instance 
        """
        context, instance = self.get(context, instance_uuid)
        LOG.debug(("Get disk check for vasted instance") % locals())
        return self._send_contego_message('vast_disk_check', context, None,
                                          host=instance['host'],
                                          params=params, is_call=True)