Repository URL to install this package:
Version:
4.2.64.8 ▾
|
python3-tvault-contego
/
usr
/
lib
/
python3
/
dist-packages
/
contego
/
nova
/
extension
/
manager.py
|
---|
# Copyright 2014 TrilioData Inc.
# All Rights Reserved.
"""
Handles all processes relating to Contego functionality
The :py:class:`ContegoManager` class is a :py:class:`nova.manager.Manager` that
handles RPC calls relating to Contego functionality creating instances.
"""
import uuid
import time
import greenlet
import os
from eventlet.green import threading as gthreading
from threading import Thread, Event, BoundedSemaphore
try:
from oslo_log import log as logging
except ImportError:
from nova.openstack.common import log as logging
try:
import oslo_messaging as messaging
except ImportError:
from oslo import messaging
try:
from eventlet import sleep
except ImportError:
from time import sleep
try:
from oslo_config import cfg
except ImportError:
from oslo.config import cfg
from nova import manager
from nova.compute import manager as compute_manager
from nova.compute import api as compute_api
# We need to import this module because other nova modules use the flags that
# it defines (without actually importing this module). So we need to ensure
# this module is loaded so that we can have access to those flags.
from nova.objects import instance as instance_obj
from contego import autolog
from contego import utils
from contego import async_utils
from contego.nova.extension.driver import vault
from contego.nova.extension.driver.libvirtdriver import LibvirtDriver
from contego.exception import RetryException, ErrorOccurred
extension_opts = [
cfg.IntOpt(
"max_uploads_pending",
default=3,
help="Maximum upload requests that a "
"hypervisor can handle. 3 is a guestimate "
"which can be overridden based on the "
"environemnt",
),
cfg.IntOpt(
"max_commit_pending",
default=3,
help="Maximum commit image requests that a "
"hypervisor can handle. 3 is a guestimate "
"which can be overridden based on the "
"environemnt",
),
]
LOG = logging.getLogger(__name__)
Logger = autolog.Logger(LOG)
CONF = cfg.CONF
CONF.register_opts(extension_opts)
upload_jobs_throttling = BoundedSemaphore(CONF.max_uploads_pending)
commit_jobs_throttling = BoundedSemaphore(CONF.max_commit_pending)
if CONF.vault_storage_type.lower() in ("swift-s", "s3"):
CONF.vault_using_fuse = True
else:
CONF.vault_using_fuse = False
def _lock_call(fn):
"""
A decorator to lock methods to ensure that mutliple operations do not
occur on the same instance at a time. Note that this is a local lock
only, so it just prevents concurrent operations on the same host.
"""
def wrapped_fn(self, context, **kwargs):
instance_uuid = kwargs.get("instance_uuid", None)
instance_ref = kwargs['params']['server_obj']
kwargs["instance_ref"] = instance_ref
LOG.debug("Locking instance %s (fn:%s)" % (instance_uuid, fn.__name__))
self._lock_instance(instance_uuid)
try:
return fn(self, context, **kwargs)
finally:
self._unlock_instance(instance_uuid)
LOG.debug(("Unlocked instance %s (fn: %s)" % (instance_uuid, fn.__name__)))
wrapped_fn.__name__ = fn.__name__
wrapped_fn.__doc__ = fn.__doc__
return wrapped_fn
def _log_error(operation):
""" Log exceptions with a common format. """
LOG.exception(("Error during %s") % operation)
def _retry_rpc(fn):
def wrapped_fn(*args, **kwargs):
timeout = CONF.contego_compute_timeout
i = 0
start = time.time()
while True:
try:
return fn(*args, **kwargs)
except messaging.MessagingTimeout:
elapsed = time.time() - start
if elapsed > timeout:
raise
LOG.debug(
("%s timing out after %d seconds, try %d."),
fn.__name__,
elapsed,
i,
)
i += 1
wrapped_fn.__name__ = fn.__name__
wrapped_fn.__doc__ = fn.__doc__
return wrapped_fn
class ContegoManager(manager.Manager):
def __init__(self, *args, **kwargs):
self.compute_manager = compute_manager.ComputeManager()
self.compute_api = compute_api.API()
self.driver = LibvirtDriver(self.compute_manager.driver)
# Use an eventlet green thread condition lock instead of the regular
# threading module. This is required for eventlet threads because
# they essentially run on a single system thread. All of the green
# threads will share the same base lock, defeating the point of
# using the it. Since the main threading module is not monkey patched
# we cannot use it directly.
self.cond = gthreading.Condition()
self.locked_instances = {}
self.ip_address = CONF.my_ip
self.snaps_data = {}
self.snaps_transfer = {}
super(ContegoManager, self).__init__(service_name="contego", *args, **kwargs)
def _lock_instance(self, instance_uuid):
self.cond.acquire()
try:
LOG.debug(("Acquiring lock for instance %s" % (instance_uuid)))
current_thread = id(greenlet.getcurrent())
while True:
(locking_thread, refcount) = self.locked_instances.get(
instance_uuid, (current_thread, 0)
)
if locking_thread != current_thread:
LOG.debug(
(
"Lock for instance %s already acquired "
"by %s (me: %s)"
% (instance_uuid, locking_thread, current_thread)
)
)
self.cond.wait()
else:
break
LOG.debug(
(
"Acquired lock for instance %s (me: %s, refcount=%s)"
% (instance_uuid, current_thread, refcount + 1)
)
)
self.locked_instances[instance_uuid] = (
locking_thread,
refcount + 1,
)
finally:
self.cond.release()
def _unlock_instance(self, instance_uuid):
self.cond.acquire()
try:
if instance_uuid in self.locked_instances:
(locking_thread, refcount) = self.locked_instances[instance_uuid]
if refcount == 1:
del self.locked_instances[instance_uuid]
# The lock is now available for other threads to
# take so wake them up.
self.cond.notifyAll()
else:
self.locked_instances[instance_uuid] = (
locking_thread,
refcount - 1,
)
finally:
self.cond.release()
@_lock_call
@autolog.log_method(logger=Logger)
def vast_prepare(self, context, instance_uuid, instance_ref, params):
"""
Prepare for the VAST
"""
return self.driver.vast_prepare(context, instance_uuid, instance_ref, params)
@_lock_call
@autolog.log_method(logger=Logger)
def vast_freeze(self, context, instance_uuid, instance_ref, params):
"""
Freeze
"""
return self.driver.vast_freeze(context, instance_uuid, instance_ref, params)
@_lock_call
@autolog.log_method(logger=Logger)
def vast_thaw(self, context, instance_uuid, instance_ref, params):
"""
Thaw
"""
return self.driver.vast_thaw(context, instance_uuid, instance_ref, params)
@_lock_call
@autolog.log_method(logger=Logger)
def vast_instance(self, context, instance_uuid, instance_ref, params):
"""
VAST the instance
"""
# snaps = self.driver.vast_instance(context, instance_uuid,
# instance_ref, params)
LOG.debug(("Initiating snapshot for instance: %s"), instance_uuid)
metadata = {}
metadata["resource_id"] = params["instance_vm_id"]
metadata["snapshot_id"] = params["snapshot_id"]
metadata["backend_endpoint"] = params["backend_endpoint"]
utils.update_progress(
vault.get_progress_tracker_path(metadata), self.ip_address + "\n"
)
utils.update_progress(
vault.get_progress_tracker_path(metadata), "In Progress\n"
)
self._vast_snapshot_async_fn(context, instance_uuid, instance_ref, params)
LOG.debug(("Scheduled snapshot for instance: %s"), instance_uuid)
sleep(1)
status = {"result": "success"}
return status
@autolog.log_method(logger=Logger)
@async_utils.run_async
def _vast_snapshot_async_fn(self, context, instance_uuid, instance_ref, params):
sleep(1)
try:
LOG.info(("Snapshot in progress: %s"), params["snapshot_id"])
snaps = self.driver.vast_instance(
context, instance_uuid, instance_ref, params
)
metadata = {}
metadata["resource_id"] = params["instance_vm_id"]
metadata["snapshot_id"] = params["snapshot_id"]
metadata["backend_endpoint"] = params["backend_endpoint"]
self.snaps_data[instance_uuid] = {"fetched": True, "snaps": snaps}
LOG.info(("Snapshot done: %s"), metadata["snapshot_id"])
utils.update_progress(
vault.get_progress_tracker_path(metadata), "Completed\n"
)
LOG.info(("Progress updated: %s"), metadata["snapshot_id"])
if CONF.vault_using_fuse:
time.sleep(4)
utils.update_progress(
vault.get_progress_tracker_path(metadata), "Completed\n"
)
except Exception as ex:
metadata = {}
metadata["resource_id"] = params["instance_vm_id"]
metadata["snapshot_id"] = params["snapshot_id"]
metadata["backend_endpoint"] = params["backend_endpoint"]
utils.update_progress(
vault.get_progress_tracker_path(metadata), "Error: " + str(ex) + "\n",
)
@_lock_call
@autolog.log_method(logger=Logger)
def vast_get_info(self, context, instance_uuid, instance_ref, params):
"""
Get the info(disks etc) of a VAST Snapshot
"""
disks_info = self.driver.vast_get_info(
context, instance_uuid, instance_ref, params
)
return disks_info
@_lock_call
@autolog.log_method(logger=Logger)
def vast_data_url(self, context, instance_uuid, instance_ref, params):
"""
Get the VAST Data URL
"""
return {"urls": []}
@autolog.log_method(logger=Logger)
@async_utils.run_async_by_throttle(commit_jobs_throttling)
def _vast_commit_image_async_fn(self, context, instance_uuid, instance_ref, params):
try:
LOG.debug(("Commit image in progress: %s"), instance_uuid)
self.driver.vast_commit_image(context, instance_uuid, instance_ref, params)
LOG.debug(("Commit image done: %s"), instance_uuid)
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]), "Completed\n",
)
except Exception as ex:
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]),
"Error: " + str(ex) + "\n",
)
@_lock_call
@autolog.log_method(logger=Logger)
def vast_commit_image(self, context, instance_uuid, instance_ref, params):
"""
Initiate async image commit job
"""
try:
status = {"result": "retry"}
LOG.debug(("Initiating image commit for instance: %s"), instance_uuid)
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]),
self.ip_address + "\n",
)
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]), "In Progress\n",
)
self._vast_commit_image_async_fn(
context, instance_uuid, instance_ref, params
)
LOG.debug(("Scheduled commit image for instance: %s"), instance_uuid)
status = {"result": "success"}
except RetryException as ex:
pass
return status
@autolog.log_method(logger=Logger)
@async_utils.run_async_by_throttle(upload_jobs_throttling)
def _vast_data_transfer_async_fn(
self, context, instance_uuid, instance_ref, params
):
try:
LOG.info(
("Data transfer in progress: %s %s"),
params["metadata"]["snapshot_id"],
params["metadata"]["vm_disk_resource_snap_id"],
)
ret = self.driver.vast_data_transfer(
context, instance_uuid, instance_ref, params
)
LOG.info(
("Data transfer done: %s %s"),
params["metadata"]["snapshot_id"],
params["metadata"]["vm_disk_resource_snap_id"],
)
self.snaps_transfer[params["metadata"]["vm_disk_resource_snap_id"]] = {
"fetched": True
}
sleep(1)
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]), "Completed\n",
)
LOG.info(("Progress updated: %s"), params["metadata"]["snapshot_id"])
except Exception as ex:
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]),
"Error: " + str(ex) + "\n",
)
@_lock_call
@autolog.log_method(logger=Logger)
def vast_data_transfer(self, context, instance_uuid, instance_ref, params):
"""
Initiate async data tranfer job
"""
try:
status = {"result": "retry"}
LOG.info(
("Initiating data transfer for instance: %s"),
params["metadata"]["vm_disk_resource_snap_id"],
)
params["metadata"]["resource_id"] = params["metadata"][
"vm_disk_resource_snap_id"
]
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]),
self.ip_address + "\n",
)
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]), "In Progress\n",
)
self._vast_data_transfer_async_fn(
context, instance_uuid, instance_ref, params
)
LOG.info(
("Scheduled data transfer for instance: %s"),
params["metadata"]["snapshot_id"],
)
sleep(1)
status = {"result": "success"}
except RetryException as ex:
pass
return status
@_lock_call
@autolog.log_method(logger=Logger)
def vast_check_prev_snapshot(self, context, instance_uuid, instance_ref, params):
"""
Check if the prev snapshot exists
"""
return self.driver.vast_check_prev_snapshot(
context, instance_uuid, instance_ref, params
)
@_lock_call
def vast_async_task_status(self, context, instance_uuid, instance_ref, params):
"""
Get data transfer job status
"""
async_task_status = {"status": []}
try:
progress_tracking_file_path = vault.get_progress_tracker_path(
params["metadata"]
)
LOG.info(("Get vast async snapshot: %s"), progress_tracking_file_path)
async_task_status["status"] = utils.get_progress(
progress_tracking_file_path
)
LOG.info(("Progress tracking status: %s"), async_task_status["status"])
if (
instance_uuid in self.snaps_data
and self.snaps_data[instance_uuid]["fetched"] is True
and ("fetched" in params and params["fetched"] is True)
):
snaps = self.snaps_data[instance_uuid]["snaps"]
self.snaps_data.pop(instance_uuid)
return snaps
if (
params["metadata"]["resource_id"] in self.snaps_transfer
and self.snaps_transfer[params["metadata"]["resource_id"]]["fetched"]
is True
):
self.snaps_transfer.pop(params["metadata"]["resource_id"])
return {"status": ["Completed"]}
except Exception as ex:
async_task_status["status"] = ["Error: " + str(ex) + "\n"]
return async_task_status
@async_utils.run_async
def _vast_finalize_async_fn(self, context, instance_uuid, instance_ref, params):
sleep(1)
try:
LOG.debug(("VAST Finalize in progress: %s"), instance_uuid)
self.driver.vast_finalize(context, instance_uuid, instance_ref, params)
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]),
"VAST Finalize Completed\n",
)
except Exception as ex:
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]),
"Error: " + str(ex) + "\n",
)
sleep(1)
@_lock_call
@autolog.log_method(logger=Logger)
def vast_finalize(self, context, instance_uuid, instance_ref, params):
"""
Finalize the VAST Snapshot
"""
LOG.debug(("Initiating VAST Finalize for instance: %s"), instance_uuid)
params["metadata"]["resource_id"] = params["metadata"]["snapshot_vm_id"]
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]), self.ip_address + "\n",
)
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]), "In Progress\n",
)
self._vast_finalize_async_fn(context, instance_uuid, instance_ref, params)
LOG.debug(("Finished VAST Finalize for instance: %s"), instance_uuid)
sleep(1)
return {"result": "success"}
@_lock_call
@autolog.log_method(logger=Logger)
def vast_reset(self, context, instance_uuid, instance_ref, params):
"""
VAST the instance
"""
self.driver.vast_reset(context, instance_uuid, instance_ref, params)
return
@autolog.log_method(logger=Logger)
@async_utils.run_async
def _map_snapshot_files_async_fn(
self, context, instance_uuid, instance_ref, params
):
try:
LOG.debug(("Map snapshot files in progress: %s"), instance_uuid)
self.driver.map_snapshot_files(context, instance_uuid, instance_ref, params)
LOG.debug(("Map snapshot files done: %s"), instance_uuid)
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]), "Completed\n",
)
if CONF.vault_using_fuse:
time.sleep(4)
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]), "Completed\n",
)
except Exception as ex:
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]),
"Error: " + str(ex) + "\n",
)
@_lock_call
@autolog.log_method(logger=Logger)
def map_snapshot_files(self, context, instance_uuid, instance_ref, params):
"""
Map snapshot files to fmsnapshot
"""
LOG.debug(("Initiating map snapshot files for instance: %s"), instance_uuid)
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]), self.ip_address + "\n",
)
utils.update_progress(
vault.get_progress_tracker_path(params["metadata"]), "In Progress\n",
)
self._map_snapshot_files_async_fn(context, instance_uuid, instance_ref, params)
LOG.debug(("Scheduled map snapshot files for instance: %s"), instance_uuid)
status = {"result": "success"}
return status
@async_utils.run_async
def _copy_backup_image_to_volume(
self, context, instance_uuid, instance_ref, params
):
sleep(1)
try:
LOG.debug(("Copy backup image to volume in progress: %s"), instance_uuid)
progress_tracking_file_path = params["progress_tracking_file_path"]
self.driver.copy_backup_image_to_volume(
context, instance_uuid, instance_ref, params
)
utils.update_progress(
progress_tracking_file_path,
"Copy backup image to volume is Completed\n",
)
except Exception as ex:
utils.update_progress(
progress_tracking_file_path, "Error: " + str(ex) + "\n"
)
sleep(1)
@_lock_call
@autolog.log_method(logger=Logger)
def copy_backup_image_to_volume(self, context, instance_uuid, instance_ref, params):
"""
Copy backup image to volume
"""
LOG.debug(
("Initiating copy_backup_image_to_volume for instance: %s"), instance_uuid,
)
progress_tracking_file_path = params["progress_tracking_file_path"]
progress_dir = os.path.split(progress_tracking_file_path)[0]
if not os.path.exists(progress_dir):
os.makedirs(progress_dir)
utils.update_progress(progress_tracking_file_path, self.ip_address + "\n")
utils.update_progress(progress_tracking_file_path, "In Progress\n")
self._copy_backup_image_to_volume(context, instance_uuid, instance_ref, params)
LOG.debug(
("Finished copy_backup_image_to_volume for instance: %s"), instance_uuid,
)
sleep(1)
return {"result": "success"}
@autolog.log_method(logger=Logger)
@async_utils.run_async
def _vast_config_backup_database_async_fn(
self, context, progress_tracking_file_path, params
):
try:
LOG.info(
("Config backup for database in progress: %s"), params["backup_id"],
)
vault.backup_database(context, progress_tracking_file_path, params)
metadata = params["metadata"]
LOG.info(("Config backup of remote files done: %s"), params["backup_id"])
utils.update_progress(
vault.get_progress_tracker_path(metadata), "Completed\n"
)
LOG.info(("Progress updated: %s"), params["backup_id"])
if CONF.vault_using_fuse:
time.sleep(4)
utils.update_progress(
vault.get_progress_tracker_path(metadata), "Completed\n"
)
except Exception as ex:
metadata = params["metadata"]
utils.update_progress(
vault.get_progress_tracker_path(metadata), "Error: " + str(ex) + "\n",
)
@autolog.log_method(logger=Logger)
@async_utils.run_async
def _vast_config_backup_async_fn(
self, context, progress_tracking_file_path, params
):
try:
LOG.info(("Config backup in progress: %s"), params["backup_id"])
vault.backup_config_files(context, progress_tracking_file_path, params)
metadata = params["metadata"]
LOG.info(("Config backup done: %s"), params["backup_id"])
utils.update_progress(
vault.get_progress_tracker_path(metadata), "Completed\n"
)
LOG.info(("Progress updated: %s"), params["backup_id"])
if CONF.vault_using_fuse:
time.sleep(4)
utils.update_progress(
vault.get_progress_tracker_path(metadata), "Completed\n"
)
except Exception as ex:
metadata = params["metadata"]
utils.update_progress(
vault.get_progress_tracker_path(metadata), "Error: " + str(ex) + "\n",
)
@_lock_call
@autolog.log_method(logger=Logger)
def vast_config_backup(self, context, params):
"""
Backup configuration files and database.
"""
LOG.debug("Initiating backup for OpenStack configuration.")
metadata = params["metadata"]
progress_tracking_file_path = vault.get_progress_tracker_path(metadata)
utils.update_progress(
progress_tracking_file_path, self.ip_address + "\nIn Progress\n"
)
if params.get("target") == "database":
self._vast_config_backup_database_async_fn(
context, progress_tracking_file_path, params
)
else:
self._vast_config_backup_async_fn(
context, progress_tracking_file_path, params
)
LOG.debug("Scheduled backup for OpenStack configuration")
sleep(1)
status = {"result": "success"}
return status
@_lock_call
@autolog.log_method(logger=Logger)
def validate_trusted_user_and_key(self, context, params):
"""
Validate trusted user and private key.
"""
LOG.debug("Validating trusted nodes.")
status = {"result": "success"}
try:
user = params["trust_creds"]["trusted_user"]["username"]
authorized_key = params["trust_creds"]["authorized_key"]
for controller_node in params["controller_nodes"]:
temp_folder = "/tmp/" + str(uuid.uuid4())
kwargs = {
"authorized_key": authorized_key,
"user": user,
"host": controller_node,
}
ssh_cmd = utils.get_cmd_specs("ssh", **kwargs)
cmdspec = ssh_cmd + ["sudo", "mkdir", temp_folder]
cmd = " ".join(cmdspec)
LOG.debug(("validate_trusted_user_and_key cmd %s " % cmd))
process = utils.run_cmd(cmdspec)
returncode = utils.poll_process(process, cmd, no_of_count=6)
if returncode:
message = (
"Error while making key based ssh connection to %s: %s"
% (controller_node, str(process.stderr.read(), "utf-8"),)
)
raise ErrorOccurred(reason=message)
return status
except Exception as ex:
LOG.exception(ex)
status = {"result": "failed"}
return status
@_lock_call
@autolog.log_method(logger=Logger)
def validate_database_creds(self, context, params):
"""
Validate given database credentials.
"""
LOG.debug("Validating database credentials")
status = {"result": "success"}
trusted_user = params["trust_creds"]["trusted_user"]["username"]
authorized_key = params["trust_creds"]["authorized_key"]
try:
for database, database_creds in params["databases"].items():
mysql_connect = "mysql -u%s -p%s" % (
database_creds["user"],
database_creds["password"],
)
kwargs = {
"authorized_key": authorized_key,
"user": trusted_user,
"host": database_creds["host"],
}
ssh_cmd = utils.get_cmd_specs("ssh", **kwargs)
cmdspec = ssh_cmd + [mysql_connect]
cmd = " ".join(cmdspec)
LOG.debug(("validate_database_creds cmd %s " % cmd))
process = utils.run_cmd(cmdspec)
returncode = utils.poll_process(process, cmd, no_of_count=6)
if returncode:
message = "Error while making connection to mysql: %s" % (
process.stderr.read()
)
raise ErrorOccurred(reason=message)
except Exception as ex:
LOG.exception(ex)
status = {"result": "failed"}
return status
@_lock_call
@autolog.log_method(logger=Logger)
def vast_disk_check(self, context, instance_uuid, instance_ref, params):
"""
Get disk integrity check of a VASTed instance
"""
disk_check = self.driver.vast_disk_check(
context, params )
return disk_check
@autolog.log_method(logger=Logger)
def vast_clean_nbd_devices(
self, context, instance_uuid, params
):
"""
clean nbd devices
"""
return self.driver.vast_clean_nbd_devices(
context, instance_uuid, params
)