Repository URL to install this package:
Version:
6.0.19 ▾
|
# Copyright 2014 TrilioData Inc.
# All Rights Reserved.
"""Handles all requests relating to Contego functionality."""
import socket
try:
import urlparse
except ImportError:
import urllib.parse as urlparse
try:
from oslo_log import log as logging
except ImportError:
from nova.openstack.common import log as logging
try:
import oslo_messaging as messaging
except ImportError:
from oslo import messaging
try:
from oslo_config import cfg
except ImportError:
from oslo.config import cfg
from contego.objects import base as objects_base
from contego import rpc
# New API capabilities should be added here
CAPABILITIES = ["live-snapshot"]
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
contego_api_opts = [
cfg.StrOpt(
"contego_topic", default="contego", help="the topic Contego nodes listen on",
)
]
CONF.register_opts(contego_api_opts)
rpcapi_cap_opt = cfg.StrOpt(
"contego", help="Set a version cap for messages sent to contego services",
)
CONF.register_opt(rpcapi_cap_opt, "contego_upgrade_levels")
class API(base.Base):
"""Client side of the contego RPC API
API version history:
* 1.0 - Initial version.
"""
VERSION_ALIASES = {
"grizzly": "1.0",
"havana": "1.0",
"icehouse": "1.0",
"juno": "1.0",
"kilo": "1.0",
}
# Allow passing in dummy image_service, but normally use the default
def __init__(self, image_service=None, **kwargs):
super(API, self).__init__(**kwargs)
self.CAPABILITIES = CAPABILITIES
target = messaging.Target(topic=CONF.contego_topic, version="1.0")
version_cap = self.VERSION_ALIASES.get(
CONF.contego_upgrade_levels.contego, CONF.contego_upgrade_levels.contego,
)
serializer = objects_base.ContegoObjectSerializer()
self.client = rpc.get_client(
target, version_cap=version_cap, serializer=serializer
)
def call(self, context, method, host, **kwargs):
cctxt = self.client.prepare(server=host, version="1.0")
return cctxt.call(context, method, kwargs)
def cast(self, context, method, host, **kwargs):
cctxt = self.client.prepare(server=host, version="1.0")
return cctxt.cast(context, method, kwargs)
def get_info(self):
return {"capabilities": self.CAPABILITIES}
def get(self, context, instance_uuid, want_object=False):
"""Get a single instance with the given instance_uuid."""
if want_object:
instance = instance_obj.Instance.get_by_uuid(context, instance_uuid)
return instance
rv = self.db.instance_get_by_uuid(context, instance_uuid)
return dict(iter(rv.items()))
def _send_contego_message(
self,
method,
context,
instance,
host=None,
params={},
is_call=False,
timeout=6000,
):
"""Generic handler for RPC casts/calls to contego topic.
This only blocks for a response with is_call=True.
:param params: Optional dictionary of arguments to be passed to the
contego worker
:returns: None
"""
if not host:
host = instance["host"]
cctxt = self.client.prepare(server=host, version="1.0", timeout=timeout)
if instance is None:
return cctxt.call(context, method, params=params)
if is_call:
return cctxt.call(
context,
method,
instance_uuid=instance["uuid"],
instance_ref=instance,
params=params,
)
else:
return self.cast(
context,
method,
instance_uuid=instance["uuid"],
instance_ref=instance,
params=params,
)
def vast_prepare(self, context, instance_uuid, params):
"""
Prepare to VAST the instance
"""
instance = self.get(context, instance_uuid)
LOG.debug(("prepare to vast the instance") % locals())
return self._send_contego_message(
"vast_prepare",
context,
instance,
host=instance["host"],
params=params,
is_call=True,
)
def vast_freeze(self, context, instance_uuid, params):
"""
Freeze the instance
"""
instance = self.get(context, instance_uuid)
LOG.debug(("freeze the instance") % locals())
return self._send_contego_message(
"vast_freeze",
context,
instance,
host=instance["host"],
params=params,
is_call=True,
)
def vast_thaw(self, context, instance_uuid, params):
"""
Thaw the instance
"""
instance = self.get(context, instance_uuid)
LOG.debug(("thaw the instance") % locals())
return self._send_contego_message(
"vast_thaw",
context,
instance,
host=instance["host"],
params=params,
is_call=True,
)
def vast_instance(self, context, instance_uuid, params):
"""
VAST the instance
"""
instance = self.get(context, instance_uuid)
LOG.debug(("vast the instance") % locals())
return self._send_contego_message(
"vast_instance",
context,
instance,
host=instance["host"],
params=params,
is_call=True,
timeout=6000,
)
def vast_get_info(self, context, instance_uuid, params):
"""
Get details of a VASTed the instance
"""
instance = self.get(context, instance_uuid)
LOG.debug(("get details of vasted instance") % locals())
return self._send_contego_message(
"vast_get_info",
context,
instance,
host=instance["host"],
params=params,
is_call=True,
)
def vast_data_url(self, context, instance_uuid, params):
"""
Get URL for the vast data server
"""
instance = self.get(context, instance_uuid)
LOG.debug(("Get the URL for the vast data server") % locals())
return self._send_contego_message(
"vast_data_url",
context,
instance,
host=instance["host"],
params=params,
is_call=True,
)
def vast_data_transfer(self, context, instance_uuid, params):
"""
Initiate data tranfer
"""
instance = self.get(context, instance_uuid)
LOG.debug(("initiate data tranfer") % locals())
return self._send_contego_message(
"vast_data_transfer",
context,
instance,
host=instance["host"],
params=params,
is_call=True,
timeout=6000,
)
def vast_check_prev_snapshot(self, context, instance_uuid, params):
"""
Verify the existence of previous snapshot
"""
instance = self.get(context, instance_uuid)
LOG.debug(("Verifying previous snapshot") % locals())
return self._send_contego_message(
"vast_check_prev_snapshot",
context,
instance,
host=instance["host"],
params=params,
is_call=True,
timeout=6000,
)
def vast_async_task_status(self, context, instance_uuid, params):
"""
Get data transfer job status for the given token
"""
if "host" in params["metadata"]:
host = params["metadata"]["host"]
instance = {"uuid": instance_uuid}
else:
instance = self.get(context, instance_uuid)
host = instance["host"]
LOG.debug(("Get data transfer job status for the given token") % locals())
return self._send_contego_message(
"vast_async_task_status",
context,
instance,
host=host,
params=params,
is_call=True,
timeout=6000,
)
def vast_finalize(self, context, instance_uuid, params):
"""
Finalize VAST
"""
instance = self.get(context, instance_uuid)
LOG.debug(("finalize the vast for the instance") % locals())
return self._send_contego_message(
"vast_finalize",
context,
instance,
host=instance["host"],
params=params,
is_call=True,
timeout=6000,
)
def vast_reset(self, context, instance_uuid, params):
"""
Reset the instance so other openstack operations can be unblocked
"""
instance = self.get(context.elevated(read_deleted="yes"), instance_uuid)
LOG.debug(("vast reset the instance") % locals())
return self._send_contego_message(
"vast_reset",
context,
instance,
host=instance["host"],
params=params,
is_call=True,
)
def vast_commit_image(self, context, instance_uuid, params):
"""
Commit snapshot image
"""
instance = self.get(context, instance_uuid)
LOG.debug(("Commiting snapshot image") % locals())
return self._send_contego_message(
"vast_commit_image",
context,
instance,
host=instance["host"],
params=params,
is_call=True,
)
def map_snapshot_files(self, context, instance_uuid, params):
"""
Map snapshot files to file manager instance identified by instance_uuid
"""
instance = self.get(context, instance_uuid)
LOG.debug(("map snapshot files to instance") % locals())
return self._send_contego_message(
"map_snapshot_files",
context,
instance,
host=instance["host"],
params=params,
is_call=True,
timeout=6000,
)
def copy_backup_image_to_volume(self, context, instance_uuid, params):
"""
Copy backup image to volume
"""
instance = self.get(context, instance_uuid)
LOG.debug(
("Copy backup image to volume that is mapped to the instance") % locals()
)
return self._send_contego_message(
"copy_backup_image_to_volume",
context,
instance,
host=instance["host"],
params=params,
is_call=True,
timeout=6000,
)
def vast_config_backup(self, context, backup_id, params):
"""
Backup OpenStack configuration for given services
"""
host = params["host"]
return self._send_contego_message(
"vast_config_backup",
context,
None,
host=host,
params=params,
is_call=True,
timeout=6000,
)
def validate_trusted_user_and_key(self, context, params):
"""
Validate trusted user and private key
"""
host = params["host"]
controller_nodes = self.get_controller_nodes(context)
params["controller_nodes"] = controller_nodes["controller_nodes"]
return self._send_contego_message(
"validate_trusted_user_and_key",
context,
None,
host=host,
params=params,
is_call=True,
timeout=6000,
)
def get_controller_nodes(self, context):
"""
Return list of RabbitMQ hosts known to Nova
"""
def _get_hostname(address):
try:
return socket.gethostbyaddr(address)[0]
except Exception as ex:
return address
def _get_hosts_from_transport_url(url):
hosts = []
parsed_url = urlparse.urlparse(url)
parsed_hosts = parsed_url.netloc.split(",")
for host in parsed_hosts:
hosts.append(host.split("@")[1])
return hosts
try:
hosts = []
hosts = CONF.oslo_messaging_rabbit["rabbit_hosts"]
if hasattr(CONF, "transport_url") and getattr(
CONF, "transport_url"
) not in ["", None]:
transport_hosts = _get_hosts_from_transport_url(CONF.transport_url)
hosts.extend(transport_hosts)
hosts = [host.split(":")[0] for host in hosts]
hosts = list(set(hosts))
if "localhost" in hosts:
hosts.remove("localhost")
hosts = [_get_hostname(host) for host in hosts]
return {"controller_nodes": hosts}
except Exception as ex:
LOG.exception(ex)
return {"controller_nodes": []}
def validate_database_creds(self, context, params):
"""
Validate given database credentials.
"""
host = params["host"]
return self._send_contego_message(
"validate_database_creds",
context,
None,
host=host,
params=params,
is_call=True,
timeout=6000,
)
def populate_instance_with_virt_v2v(self, context, instance_uuid, params):
"""
Copy backup image to volume
"""
instance = self.get(context, instance_uuid)
LOG.debug(
("Run virt-v2v to migration vm from ESXi to compute node") % locals()
)
return self._send_contego_message(
"populate_instance_with_virt_v2v",
context,
instance,
host=instance["host"],
params=params,
is_call=True,
timeout=6000,
)