Repository URL to install this package:
Version:
5.0.6.dev15 ▾
|
# Copyright 2018 TrilioData Inc.
# All Rights Reserved .
"""Handles all requests relating to Contego functionality."""
import socket
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
try:
from oslo_log import log as logging
except ImportError:
from dmapi.openstack.common import log as logging
try:
import oslo_messaging as messaging
except ImportError:
from oslo import messaging
try:
from oslo_config import cfg
except ImportError:
from oslo.config import cfg
from oslo_utils import timeutils
from dmapi import context as dmapi_context
from dmapi import objects
from dmapi.db import base
from dmapi import rpc
from dmapi.objects import base as objects_base
# New API capabilities should be added here
CAPABILITIES = ['live-snapshot',
]
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
contego_api_opts = [cfg.StrOpt('contego_topic',
default='contego',
help='the topic Contego nodes listen on')]
CONF.register_opts(contego_api_opts)
rpcapi_cap_opt = cfg.StrOpt('contego',
help='Set a version cap for messages '
'sent to contego services')
CONF.register_opt(rpcapi_cap_opt, 'contego_upgrade_levels')
class API(base.Base):
"""Client side of the contego RPC API
API version history:
* 1.0 - Initial version.
"""
VERSION_ALIASES = {
'grizzly': '1.0',
'havana': '1.0',
'icehouse': '1.0',
'juno': '1.0',
'kilo': '1.0',
}
# Allow passing in dummy image_service, but normally use the default
def __init__(self, image_service=None, **kwargs):
super(API, self).__init__(**kwargs)
self.CAPABILITIES = CAPABILITIES
self.service_down_time = CONF.service_down_time
target = messaging.Target(topic=CONF.contego_topic, version='1.0')
version_cap = self.VERSION_ALIASES.get(
CONF.contego_upgrade_levels.contego,
CONF.contego_upgrade_levels.contego)
serializer = objects_base.ContegoObjectSerializer()
self.client = rpc.get_client(target,
version_cap=version_cap,
serializer=serializer)
objects.register_all()
def call(self, context, method, host, **kwargs):
cctxt = self.client.prepare(server=host, version='1.0')
return cctxt.call(context, method, kwargs)
def cast(self, context, method, host, **kwargs):
cctxt = self.client.prepare(server=host, version='1.0')
return cctxt.cast(context, method, kwargs)
def get_info(self):
return {'capabilities': self.CAPABILITIES}
def _get_service_detail(self, svc):
alive = self.is_up(svc)
state = (alive and "up") or "down"
active = 'disabled' if svc.get('disabled') else 'enabled'
service_detail = {'binary': svc['binary'],
'host': svc['host'],
'id': svc['uuid'],
'status': active,
'state': state,
'updated_at': svc['updated_at'],
'version': svc['version'],
'disabled_reason': svc['disabled_reason']}
return service_detail
def list_services(self, context):
ret_services ={'services': [] }
_services = objects.ServiceList.get_all(context)
if _services:
services = [self._get_service_detail(svc)
for svc in _services]
ret_services['services'] = services
return ret_services
def _send_contego_message(self, method, context, instance_uuid, params={},
is_call=False, timeout=6000):
"""Generic handler for RPC casts/calls to contego topic.
This only blocks for a response with is_call=True.
:param params: Optional dictionary of arguments to be passed to the
contego worker
:returns: None
"""
host = params['server_obj']['OS-EXT-SRV-ATTR:host']
cctxt = self.client.prepare(server=host, version='1.0',
timeout=timeout)
if is_call:
return cctxt.call(context, method, instance_uuid=instance_uuid,
params=params)
else:
return self.cast(context, method, instance_uuid=instance_uuid,
params=params)
def vast_prepare(self, context, instance_uuid, params):
"""
Prepare to VAST the instance
"""
LOG.debug(("prepare to vast the instance") % locals())
return self._send_contego_message('vast_prepare', context, instance_uuid,
params=params, is_call=True)
def vast_freeze(self, context, instance_uuid, params):
"""
Freeze the instance
"""
LOG.debug(("freeze the instance") % locals())
return self._send_contego_message('vast_freeze', context, instance_uuid,
params=params, is_call=True)
def vast_thaw(self, context, instance_uuid, params):
"""
Thaw the instance
"""
LOG.debug(("thaw the instance") % locals())
return self._send_contego_message('vast_thaw', context, instance_uuid,
params=params, is_call=True)
def vast_instance(self, context, instance_uuid, params):
"""
VAST the instance
"""
LOG.debug(("vast the instance") % locals())
return self._send_contego_message('vast_instance', context,
instance_uuid,
params=params, is_call=True,
timeout=6000)
def vast_get_info(self, context, instance_uuid, params):
"""
Get details of a VASTed the instance
"""
LOG.debug(("get details of vasted instance") % locals())
return self._send_contego_message('vast_get_info', context, instance_uuid,
params=params, is_call=True)
def vast_data_url(self, context, instance_uuid, params):
"""
Get URL for the vast data server
"""
LOG.debug(("Get the URL for the vast data server") % locals())
return self._send_contego_message('vast_data_url', context, instance_uuid,
params=params, is_call=True)
def vast_data_transfer(self, context, instance_uuid, params):
"""
Initiate data tranfer
"""
LOG.debug(("initiate data tranfer") % locals())
return self._send_contego_message('vast_data_transfer', context,
instance_uuid,
params=params, is_call=True,
timeout=6000)
def vast_check_prev_snapshot(self, context, instance_uuid, params):
"""
Verify the existence of previous snapshot
"""
LOG.debug(("Verifying previous snapshot") % locals())
return self._send_contego_message('vast_check_prev_snapshot', context,
instance_uuid,
params=params, is_call=True,
timeout=6000)
def vast_async_task_status(self, context, instance_uuid, params):
"""
Get data transfer job status for the given token
"""
LOG.debug(("Get data transfer job status for "
"the given token") % locals())
return self._send_contego_message('vast_async_task_status',
context, instance_uuid,
params=params,
is_call=True, timeout=6000)
def vast_finalize(self, context, instance_uuid, params):
"""
Finalize VAST
"""
LOG.debug(("finalize the vast for the instance") % locals())
return self._send_contego_message('vast_finalize', context, instance_uuid,
params=params,
is_call=True, timeout=6000)
def vast_reset(self, context, instance_uuid, params):
"""
Reset the instance so other openstack operations can be unblocked
"""
LOG.debug(("vast reset the instance") % locals())
return self._send_contego_message('vast_reset', context, instance_uuid,
params=params, is_call=True)
def vast_commit_image(self, context, instance_uuid, params):
"""
Commit snapshot image
"""
LOG.debug(("Commiting snapshot image") % locals())
return self._send_contego_message('vast_commit_image', context,
instance_uuid,
params=params, is_call=True)
def vast_clean_nbd_devices(self, context, instance_uuid, params):
"""
Clean nbd devices
"""
LOG.debug(("vast call to clean nbd devices") % locals())
return self._send_contego_message('vast_clean_nbd_devices', context, instance_uuid,
params=params, is_call=True)
def map_snapshot_files(self, context, instance_uuid, params):
"""
Map snapshot files to file manager instance identified by instance_uuid
"""
LOG.debug(("map snapshot files to instance") % locals())
return self._send_contego_message('map_snapshot_files', context,
instance_uuid, params=params,
is_call=True, timeout=6000)
def copy_backup_image_to_volume(self, context, instance_uuid, params):
"""
Copy backup image to volume
"""
LOG.debug(("Copy backup image to volume that is mapped "
"to the instance") % locals())
return self._send_contego_message('copy_backup_image_to_volume',
context, instance_uuid,
params=params,
is_call=True, timeout=6000)
'''
def vast_config_backup(self, context, backup_id, params):
"""
Backup OpenStack configuration for given services
"""
host = params['host']
return self._send_contego_message(
'vast_config_backup',
context,
None,
host=host,
params=params,
is_call=True,
timeout=6000)
def validate_trusted_user_and_key(self, context, params):
"""
Validate trusted user and private key
"""
host = params['host']
controller_nodes = self.get_controller_nodes(context)
params['controller_nodes'] = controller_nodes['controller_nodes']
return self._send_contego_message(
'validate_trusted_user_and_key',
context,
None,
host=host,
params=params,
is_call=True,
timeout=6000)
def get_controller_nodes(self, context):
"""
Return list of RabbitMQ hosts known to dmapi
"""
def _get_hostname(address):
try:
return socket.gethostbyaddr(address)[0]
except Exception as ex:
return address
def _get_hosts_from_transport_url(url):
hosts = []
for u in url.split(','):
parsed_url = urlparse(u)
hosts.append(parsed_url.hostname)
return hosts
try:
hosts = []
hosts = CONF.oslo_messaging_rabbit['rabbit_hosts']
if hasattr(
CONF,
'transport_url') and getattr(
CONF,
'transport_url') not in [
'',
None]:
transport_hosts = _get_hosts_from_transport_url(
CONF.transport_url)
hosts.extend(transport_hosts)
hosts = list(set(hosts))
for host in hosts[:]:
if 'localhost' in host:
hosts.remove(host)
hosts = [_get_hostname(host) for host in hosts]
return {'controller_nodes': hosts}
except Exception as ex:
LOG.exception(ex)
return {'controller_nodes': []}
def validate_database_creds(self, context, params):
"""
Validate given database credentials.
"""
host = params['host']
return self._send_contego_message('validate_database_creds',
context, None, host=host, params=params,
is_call=True, timeout=6000)
'''
def vast_disk_check(self, context, instance_uuid, params):
"""
Get disk check for VASTed instance
"""
LOG.debug(("Get disk check for vasted instance") % locals())
return self._send_contego_message('vast_disk_check', context, None,
params=params, is_call=True)
def is_up(self, service_ref):
"""
Check whether a service is up based on last heartbeat.
"""
last_heartbeat = (service_ref.get('last_seen_up') or
service_ref['created_at'])
# Timestamps in DB are UTC.
last_heartbeat = last_heartbeat.replace(tzinfo=None)
elapsed = timeutils.delta_seconds(last_heartbeat, timeutils.utcnow())
is_svc_up = abs(elapsed) <= self.service_down_time
if not is_svc_up:
LOG.debug('Seems service %(binary)s on host %(host)s is down. '
'Last heartbeat was %(lhb)s. Elapsed time is %(el)s',
{'binary': service_ref.get('binary'),
'host': service_ref.get('host'),
'lhb': str(last_heartbeat), 'el': str(elapsed)})
return is_svc_up