Repository URL to install this package:
|
Version:
4.3.8.4 ▾
|
# Copyright 2013 TrilioData Inc.
# All Rights Reserved.
"""
WLM-Cron Service
"""
import pickle
import time
from oslo_config import cfg
from workloadmgr import exception
from workloadmgr import flags
from workloadmgr import manager
from workloadmgr.openstack.common import log as logging
from workloadmgr.apscheduler.scheduler import Scheduler, SchedulerAlreadyRunningError
from workloadmgr.apscheduler.jobstores.sqlalchemy_store import SQLAlchemyJobStore
from workloadmgr.compute import nova
from munch import munchify
from workloadmgr import autolog
from workloadmgr.db import base
from workloadmgr.common import context as wlm_context
import workloadmgrclient.v1.client as wlmclient
LOG = logging.getLogger(__name__)
Logger = autolog.Logger(LOG)
FLAGS = flags.FLAGS
keystone_opts = [
cfg.StrOpt('username',
default='triliovault',
help="Username name of service user"),
cfg.StrOpt('password',
secret=True,
help="User's password"),
cfg.StrOpt('project_name',
default='services',
help="User's project"),
cfg.StrOpt('user_domain_id',
default='default',
help="User's domain id")
]
cfg.CONF.register_opts(keystone_opts, group='keystone_authtoken')
CONF = cfg.CONF
def _snapshot_create_callback(*args, **kwargs):
try:
db_base = base.Base()
db_driver = db_base.db
arg_str = autolog.format_args(args, kwargs)
LOG.info("_snapshot_create_callback Enter - " + arg_str)
workload_id = kwargs.get('workload_id', None)
user_id = kwargs.get('user_id', None)
project_id = kwargs.get('project_id', None)
kwargs['tenant_id'] = kwargs.get('project_id', None)
tenantcontext = nova._get_tenant_context(munchify(kwargs))
workload = db_driver.workload_get(tenantcontext, workload_id)
if workload.status == 'error':
LOG.info(
("Workload %s is in error state. Cannot schedule snapshot operation") %
(workload.display_name))
LOG.info("_snapshot_create_callback Exit")
return
# wait for 5 minutes until the workload changes state to available
for _ in range(10):
if workload.status == "available" or workload.status == 'error':
break
time.sleep(30)
workload = db_driver.workload_get(tenantcontext, workload_id)
# if workload hasn't changed the status to available
if workload.status != 'available':
LOG.info(
("Workload %s is not in available state. Cannot schedule snapshot operation") %
(workload.display_name))
LOG.info("_snapshot_create_callback Exit")
return
# determine if the workload need to be full snapshot or incremental
# the last full snapshot
# if the last full snapshot is over policy based number of days, do
# a full backup
snapshots = db_driver.snapshot_get_all_by_project_workload(
tenantcontext, project_id, workload_id)
jobscheduler = pickle.loads(bytes(workload.jobschedule, 'utf-8'))
#
# if fullbackup_interval is -1, never take full backups
# if fullbackup_interval is 0, always take full backups
# if fullbackup_interval is +ve follow the interval
#
jobscheduler['fullbackup_interval'] = \
'fullbackup_interval' in jobscheduler and \
jobscheduler['fullbackup_interval'] or "-1"
snapshot_type = "incremental"
if int(jobscheduler['fullbackup_interval']) == 0:
snapshot_type = "full"
elif int(jobscheduler['fullbackup_interval']) < 0:
snapshot_type = "incremental"
elif int(jobscheduler['fullbackup_interval']) > 0:
# check full backup policy here
num_of_incr_in_current_chain = 0
for snap in snapshots:
if snap.status == 'available':
if snap.snapshot_type == 'full':
break;
else:
num_of_incr_in_current_chain = num_of_incr_in_current_chain + 1
if num_of_incr_in_current_chain >= int(
jobscheduler['fullbackup_interval']):
snapshot_type = "full"
if snapshots.__len__ == 0:
snapshot_type = 'full'
full_snapshot = False
if snapshot_type == 'full':
full_snapshot = True
client = _get_client()
snapshot = client.workloads.snapshot(workload_id, full=full_snapshot,
name='jobscheduler', is_scheduled=True)
except Exception as ex:
LOG.exception(ex)
LOG.error(
("Error creating a snapshot for workload %s") % (workload_id))
if 'workload' in locals():
options = {'user_id': user_id,
'project_id': project_id,
'workload_id': workload_id,
'snapshot_type': "full",
'display_name': "jobscheduler",
'display_description': "",
'host': '',
'status': 'error',
'error_msg': str(ex),
'metadata': {}, }
context = wlm_context.get_admin_context()
db_driver.snapshot_create(context, options)
LOG.info("_snapshot_create_callback Exit")
def _get_client():
try:
wlm = wlmclient.Client(auth_url=CONF.keystone_authtoken.www_authenticate_uri,
username=CONF.keystone_authtoken.username,
password=CONF.keystone_authtoken.password,
tenant_id=CONF.keystone_authtoken.project_name,
project_id=CONF.keystone_authtoken.project_name,
domain_name=CONF.keystone_authtoken.user_domain_id,
endpoint_type=CONF.clients.endpoint_type,
insecure=CONF.clients.insecure,
cacert=CONF.clients.cafile)
return wlm
except Exception as ex:
LOG.exception(ex)
raise ex
class CronManager(manager.Manager):
"""Chooses a host to create snapshot."""
RPC_API_VERSION = '2.0'
def __init__(self, service_name=None, *args, **kwargs):
super(CronManager, self).__init__(*args, **kwargs)
@autolog.log_method(logger=Logger)
def init_host(self):
context = wlm_context.get_admin_context()
_jobstore = SQLAlchemyJobStore(self.db)
_job_scheduler = Scheduler()
_job_scheduler.add_jobstore(context, _jobstore, 'jobscheduler_store')
self._scheduler = _job_scheduler
self.workload_ensure_global_job_scheduler(context)
@autolog.log_method(logger=Logger)
def workload_add_scheduler_job(
self, context, jobschedule, workload):
try:
LOG.info("Adding job for workload: %s" %(workload.get('id', None)))
job = self.db.job_get(context, workload['id'])
if job:
msg = 'Workload %s job is already scheduled. Pause workload before changing job parameters' % job.workload_id
raise exception.InvalidState(reason=msg)
if jobschedule and len(jobschedule):
if str(jobschedule.get('enabled', False)).lower() == 'true':
user_domain_id = getattr(context, 'user_domain_id',
getattr(context, 'user_domain', 'default'))
kwargs = {'workload_id': workload.get('id', None),
'user_id': workload.get('user_id', None),
'project_id': workload.get('project_id', None),
'user_domain_id': user_domain_id,
'user': workload.get('user_id', None),
'tenant': workload.get('project_id', None),
}
self._scheduler.add_workloadmgr_job(
context,
_snapshot_create_callback,
jobschedule,
jobstore='jobscheduler_store',
kwargs=kwargs)
except Exception as ex:
LOG.exception(ex)
raise ex
@autolog.log_method(logger=Logger)
def workload_ensure_global_job_scheduler(self, context):
try:
try:
global_scheduler = self.db.setting_get(context,
'global-job-scheduler',
get_hidden=True,
cloud_setting=True)
db_status = bool(int(global_scheduler['value']))
except exception.SettingNotFound:
raise
if db_status:
LOG.info("Starting global job scheduler")
self._scheduler.start(context)
else:
LOG.info("Shutting down global job scheduler")
self._scheduler.shutdown()
except exception.SettingNotFound:
LOG.info("Starting global job scheduler")
self._scheduler.start(context)
except SchedulerAlreadyRunningError:
pass
except Exception as ex:
LOG.exception(ex)
@autolog.log_method(logger=Logger)
def workload_pause(self, ctxt, workload_id, request_spec=None, filter_properties=None):
try:
LOG.info("Disabling workload: %s" %(workload_id))
job = self._scheduler.get_job_by_workload_id(ctxt, workload_id)
if job:
self._scheduler.unschedule_job(ctxt, job)
except Exception as ex:
LOG.exception(ex)
@autolog.log_method(logger=Logger)
def workload_get_global_job_scheduler(self, context, request_spec=None, filter_properties=None):
"""Method to return global job scheduler status"""
try:
try:
global_scheduler = self.db.setting_get(context,
'global-job-scheduler',
get_hidden=True,
cloud_setting=True)
db_status = bool(int(global_scheduler['value']))
except exception.SettingNotFound:
db_status = True
cron_status = self._scheduler.running
#Case where DB status for GJs and cron engine status is not in sync
if db_status != cron_status:
#sync cron engine with DB status for cron
LOG.info("Global job scheduler status is not in sync between "\
"DB (%s) and cron engine (%s), syncing with the DB status"\
%(db_status, cron_status))
self.workload_ensure_global_job_scheduler(context)
cron_status = self._scheduler.running
if cron_status != db_status:
raise Exception("Global job scheduler status is not in sync between "\
"DB (%s) and cron engine (%s), Please retry after sometime"\
%(db_status, cron_status))
return cron_status
except Exception as ex:
LOG.exception(ex)
@autolog.log_method(logger=Logger)
def workload_enable_global_job_scheduler(self, context, request_spec=None, filter_properties=None):
"""Method to enable global job scheduler status"""
LOG.info("Starting global job scheduler")
return self._scheduler.start(context)
@autolog.log_method(logger=Logger)
def workload_disable_global_job_scheduler(self, context, request_spec=None, filter_properties=None):
"""Method to disable global job scheduler status"""
LOG.info("Disabling global job scheduler")
return self._scheduler.shutdown()