Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# Copyright 2014 TrilioData Inc.
# All Rights Reserved.
"""Handles all requests relating to Contego functionality."""


import socket

try:
    import urlparse
except ImportError:
    import urllib.parse as urlparse

try:
    from oslo_log import log as logging
except ImportError:
    from nova.openstack.common import log as logging

try:
    import oslo_messaging as messaging
except ImportError:
    from oslo import messaging

try:
    from oslo_config import cfg
except ImportError:
    from oslo.config import cfg

from contego.objects import base as objects_base
from contego import rpc

# New API capabilities should be added here
CAPABILITIES = ["live-snapshot"]

LOG = logging.getLogger(__name__)
CONF = cfg.CONF

contego_api_opts = [
    cfg.StrOpt(
        "contego_topic", default="contego", help="the topic Contego nodes listen on",
    )
]
CONF.register_opts(contego_api_opts)

rpcapi_cap_opt = cfg.StrOpt(
    "contego", help="Set a version cap for messages sent to contego services",
)
CONF.register_opt(rpcapi_cap_opt, "contego_upgrade_levels")


class API(base.Base):
    """Client side of the contego RPC API

    API version history:

    * 1.0 - Initial version.

    """

    VERSION_ALIASES = {
        "grizzly": "1.0",
        "havana": "1.0",
        "icehouse": "1.0",
        "juno": "1.0",
        "kilo": "1.0",
    }

    # Allow passing in dummy image_service, but normally use the default
    def __init__(self, image_service=None, **kwargs):
        super(API, self).__init__(**kwargs)
        self.CAPABILITIES = CAPABILITIES
        target = messaging.Target(topic=CONF.contego_topic, version="1.0")
        version_cap = self.VERSION_ALIASES.get(
            CONF.contego_upgrade_levels.contego, CONF.contego_upgrade_levels.contego,
        )
        serializer = objects_base.ContegoObjectSerializer()
        self.client = rpc.get_client(
            target, version_cap=version_cap, serializer=serializer
        )

    def call(self, context, method, host, **kwargs):
        cctxt = self.client.prepare(server=host, version="1.0")
        return cctxt.call(context, method, kwargs)

    def cast(self, context, method, host, **kwargs):
        cctxt = self.client.prepare(server=host, version="1.0")
        return cctxt.cast(context, method, kwargs)

    def get_info(self):
        return {"capabilities": self.CAPABILITIES}

    def get(self, context, instance_uuid, want_object=False):
        """Get a single instance with the given instance_uuid."""
        if want_object:
            instance = instance_obj.Instance.get_by_uuid(context, instance_uuid)
            return instance

        rv = self.db.instance_get_by_uuid(context, instance_uuid)
        return dict(iter(rv.items()))

    def _send_contego_message(
        self,
        method,
        context,
        instance,
        host=None,
        params={},
        is_call=False,
        timeout=6000,
    ):
        """Generic handler for RPC casts/calls to contego topic.
           This only blocks for a response with is_call=True.

        :param params: Optional dictionary of arguments to be passed to the
                       contego worker

        :returns: None
        """
        if not host:
            host = instance["host"]

        cctxt = self.client.prepare(server=host, version="1.0", timeout=timeout)

        if instance is None:
            return cctxt.call(context, method, params=params)

        if is_call:
            return cctxt.call(
                context,
                method,
                instance_uuid=instance["uuid"],
                instance_ref=instance,
                params=params,
            )
        else:
            return self.cast(
                context,
                method,
                instance_uuid=instance["uuid"],
                instance_ref=instance,
                params=params,
            )

    def vast_prepare(self, context, instance_uuid, params):
        """
        Prepare to VAST the instance
        """
        instance = self.get(context, instance_uuid)
        LOG.debug(("prepare to vast the instance") % locals())
        return self._send_contego_message(
            "vast_prepare",
            context,
            instance,
            host=instance["host"],
            params=params,
            is_call=True,
        )

    def vast_freeze(self, context, instance_uuid, params):
        """
        Freeze the instance
        """
        instance = self.get(context, instance_uuid)
        LOG.debug(("freeze the instance") % locals())
        return self._send_contego_message(
            "vast_freeze",
            context,
            instance,
            host=instance["host"],
            params=params,
            is_call=True,
        )

    def vast_thaw(self, context, instance_uuid, params):
        """
        Thaw the instance
        """
        instance = self.get(context, instance_uuid)
        LOG.debug(("thaw the instance") % locals())
        return self._send_contego_message(
            "vast_thaw",
            context,
            instance,
            host=instance["host"],
            params=params,
            is_call=True,
        )

    def vast_instance(self, context, instance_uuid, params):
        """
        VAST the instance
        """
        instance = self.get(context, instance_uuid)
        LOG.debug(("vast the instance") % locals())
        return self._send_contego_message(
            "vast_instance",
            context,
            instance,
            host=instance["host"],
            params=params,
            is_call=True,
            timeout=6000,
        )

    def vast_get_info(self, context, instance_uuid, params):
        """
        Get details of a VASTed the instance
        """
        instance = self.get(context, instance_uuid)
        LOG.debug(("get details of vasted instance") % locals())
        return self._send_contego_message(
            "vast_get_info",
            context,
            instance,
            host=instance["host"],
            params=params,
            is_call=True,
        )

    def vast_data_url(self, context, instance_uuid, params):
        """
        Get URL for the vast data server
        """
        instance = self.get(context, instance_uuid)
        LOG.debug(("Get the URL for the vast data server") % locals())
        return self._send_contego_message(
            "vast_data_url",
            context,
            instance,
            host=instance["host"],
            params=params,
            is_call=True,
        )

    def vast_data_transfer(self, context, instance_uuid, params):
        """
        Initiate data tranfer
        """
        instance = self.get(context, instance_uuid)
        LOG.debug(("initiate data tranfer") % locals())
        return self._send_contego_message(
            "vast_data_transfer",
            context,
            instance,
            host=instance["host"],
            params=params,
            is_call=True,
            timeout=6000,
        )

    def vast_check_prev_snapshot(self, context, instance_uuid, params):
        """
        Verify the existence of previous snapshot
        """
        instance = self.get(context, instance_uuid)
        LOG.debug(("Verifying previous snapshot") % locals())
        return self._send_contego_message(
            "vast_check_prev_snapshot",
            context,
            instance,
            host=instance["host"],
            params=params,
            is_call=True,
            timeout=6000,
        )

    def vast_async_task_status(self, context, instance_uuid, params):
        """
        Get data transfer job status for the given token
        """
        if "host" in params["metadata"]:
            host = params["metadata"]["host"]
            instance = {"uuid": instance_uuid}
        else:
            instance = self.get(context, instance_uuid)
            host = instance["host"]
        LOG.debug(("Get data transfer job status for the given token") % locals())
        return self._send_contego_message(
            "vast_async_task_status",
            context,
            instance,
            host=host,
            params=params,
            is_call=True,
            timeout=6000,
        )

    def vast_finalize(self, context, instance_uuid, params):
        """
        Finalize VAST
        """
        instance = self.get(context, instance_uuid)
        LOG.debug(("finalize the vast for the instance") % locals())
        return self._send_contego_message(
            "vast_finalize",
            context,
            instance,
            host=instance["host"],
            params=params,
            is_call=True,
            timeout=6000,
        )

    def vast_reset(self, context, instance_uuid, params):
        """
        Reset the instance so other openstack operations can be unblocked
        """
        instance = self.get(context.elevated(read_deleted="yes"), instance_uuid)
        LOG.debug(("vast reset the instance") % locals())
        return self._send_contego_message(
            "vast_reset",
            context,
            instance,
            host=instance["host"],
            params=params,
            is_call=True,
        )

    def vast_commit_image(self, context, instance_uuid, params):
        """
        Commit snapshot image
        """
        instance = self.get(context, instance_uuid)
        LOG.debug(("Commiting snapshot image") % locals())
        return self._send_contego_message(
            "vast_commit_image",
            context,
            instance,
            host=instance["host"],
            params=params,
            is_call=True,
        )

    def map_snapshot_files(self, context, instance_uuid, params):
        """
        Map snapshot files to file manager instance identified by instance_uuid
        """
        instance = self.get(context, instance_uuid)
        LOG.debug(("map snapshot files to instance") % locals())
        return self._send_contego_message(
            "map_snapshot_files",
            context,
            instance,
            host=instance["host"],
            params=params,
            is_call=True,
            timeout=6000,
        )

    def copy_backup_image_to_volume(self, context, instance_uuid, params):
        """
        Copy backup image to volume
        """
        instance = self.get(context, instance_uuid)
        LOG.debug(
            ("Copy backup image to volume that is mapped to the instance") % locals()
        )
        return self._send_contego_message(
            "copy_backup_image_to_volume",
            context,
            instance,
            host=instance["host"],
            params=params,
            is_call=True,
            timeout=6000,
        )

    def vast_config_backup(self, context, backup_id, params):
        """
        Backup OpenStack configuration for given services
        """
        host = params["host"]
        return self._send_contego_message(
            "vast_config_backup",
            context,
            None,
            host=host,
            params=params,
            is_call=True,
            timeout=6000,
        )

    def validate_trusted_user_and_key(self, context, params):
        """
        Validate trusted user and private key
        """
        host = params["host"]
        controller_nodes = self.get_controller_nodes(context)
        params["controller_nodes"] = controller_nodes["controller_nodes"]
        return self._send_contego_message(
            "validate_trusted_user_and_key",
            context,
            None,
            host=host,
            params=params,
            is_call=True,
            timeout=6000,
        )

    def get_controller_nodes(self, context):
        """
        Return list of RabbitMQ hosts known to Nova
        """

        def _get_hostname(address):
            try:
                return socket.gethostbyaddr(address)[0]
            except Exception as ex:
                return address

        def _get_hosts_from_transport_url(url):
            hosts = []
            parsed_url = urlparse.urlparse(url)
            parsed_hosts = parsed_url.netloc.split(",")
            for host in parsed_hosts:
                hosts.append(host.split("@")[1])
            return hosts

        try:
            hosts = []
            hosts = CONF.oslo_messaging_rabbit["rabbit_hosts"]
            if hasattr(CONF, "transport_url") and getattr(
                CONF, "transport_url"
            ) not in ["", None]:
                transport_hosts = _get_hosts_from_transport_url(CONF.transport_url)
                hosts.extend(transport_hosts)
            hosts = [host.split(":")[0] for host in hosts]
            hosts = list(set(hosts))
            if "localhost" in hosts:
                hosts.remove("localhost")
            hosts = [_get_hostname(host) for host in hosts]
            return {"controller_nodes": hosts}
        except Exception as ex:
            LOG.exception(ex)
            return {"controller_nodes": []}

    def validate_database_creds(self, context, params):
        """
        Validate given database credentials.
        """
        host = params["host"]
        return self._send_contego_message(
            "validate_database_creds",
            context,
            None,
            host=host,
            params=params,
            is_call=True,
            timeout=6000,
        )

    def populate_instance_with_virt_v2v(self, context, instance_uuid, params):
        """
        Copy backup image to volume
        """
        instance = self.get(context, instance_uuid)
        LOG.debug(
            ("Run virt-v2v to migration vm from ESXi to compute node") % locals()
        )
        return self._send_contego_message(
            "populate_instance_with_virt_v2v",
            context,
            instance,
            host=instance["host"],
            params=params,
            is_call=True,
            timeout=6000,
        )