Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# Copyright 2013 TrilioData Inc.
# All Rights Reserved.


"""
WLM-Cron Service
"""

import pickle
import time
from oslo_config import cfg

from workloadmgr import exception
from workloadmgr import flags
from workloadmgr import manager
from workloadmgr.openstack.common import log as logging
from workloadmgr.apscheduler.scheduler import Scheduler, SchedulerAlreadyRunningError
from workloadmgr.apscheduler.jobstores.sqlalchemy_store import SQLAlchemyJobStore
from workloadmgr.compute import nova
from munch import munchify
from workloadmgr import autolog
from workloadmgr.db import base
from workloadmgr.common import context as wlm_context
import workloadmgrclient.v1.client as wlmclient

LOG = logging.getLogger(__name__)
Logger = autolog.Logger(LOG)


FLAGS = flags.FLAGS

keystone_opts = [
    cfg.StrOpt('username',
               default='triliovault',
               help="Username name of service user"),
    cfg.StrOpt('password',
               secret=True,
               help="User's password"),
    cfg.StrOpt('project_name',
               default='services',
               help="User's project"),
    cfg.StrOpt('user_domain_id',
               default='default',
               help="User's domain id")
]

cfg.CONF.register_opts(keystone_opts, group='keystone_authtoken')

CONF = cfg.CONF


def _snapshot_create_callback(*args, **kwargs):
    try:
        db_base = base.Base()
        db_driver = db_base.db

        arg_str = autolog.format_args(args, kwargs)

        LOG.info("_snapshot_create_callback Enter - " + arg_str)

        workload_id = kwargs.get('workload_id', None)
        user_id = kwargs.get('user_id', None)
        project_id = kwargs.get('project_id', None)
        kwargs['tenant_id'] = kwargs.get('project_id', None)
        tenantcontext = nova._get_tenant_context(munchify(kwargs))

        workload = db_driver.workload_get(tenantcontext, workload_id)

        if workload.status == 'error':
            LOG.info(
                ("Workload %s is in error state. Cannot schedule snapshot operation") %
                (workload.display_name))

            LOG.info("_snapshot_create_callback Exit")
            return

        # wait for 5 minutes until the workload changes state to available
        for _ in range(10):
            if workload.status == "available" or workload.status == 'error':
                break
            time.sleep(30)
            workload = db_driver.workload_get(tenantcontext, workload_id)

        # if workload hasn't changed the status to available
        if workload.status != 'available':
            LOG.info(
                ("Workload %s is not in available state. Cannot schedule snapshot operation") %
                (workload.display_name))
            LOG.info("_snapshot_create_callback Exit")
            return

        # determine if the workload need to be full snapshot or incremental
        # the last full snapshot
        # if the last full snapshot is over policy based number of days, do
        # a full backup
        snapshots = db_driver.snapshot_get_all_by_project_workload(
            tenantcontext, project_id, workload_id)
        jobscheduler = pickle.loads(bytes(workload.jobschedule, 'utf-8'))

        #
        # if fullbackup_interval is -1, never take full backups
        # if fullbackup_interval is 0, always take full backups
        # if fullbackup_interval is +ve follow the interval
        #
        jobscheduler['fullbackup_interval'] = \
            'fullbackup_interval' in jobscheduler and \
            jobscheduler['fullbackup_interval'] or "-1"

        snapshot_type = "incremental"
        if int(jobscheduler['fullbackup_interval']) == 0:
            snapshot_type = "full"
        elif int(jobscheduler['fullbackup_interval']) < 0:
            snapshot_type = "incremental"
        elif int(jobscheduler['fullbackup_interval']) > 0:
            # check full backup policy here
            num_of_incr_in_current_chain = 0
            for snap in snapshots:
                if snap.status == 'available':
                    if snap.snapshot_type == 'full':
                        break;
                    else:
                        num_of_incr_in_current_chain = num_of_incr_in_current_chain + 1
            if num_of_incr_in_current_chain >= int(
                    jobscheduler['fullbackup_interval']):
                snapshot_type = "full"
            if snapshots.__len__ == 0:
                snapshot_type = 'full'

        full_snapshot = False
        if snapshot_type == 'full':
            full_snapshot = True

        client = _get_client()

        snapshot = client.workloads.snapshot(workload_id, full=full_snapshot,
                                             name='jobscheduler', is_scheduled=True)

    except Exception as ex:
        LOG.exception(ex)
        LOG.error(
            ("Error creating a snapshot for workload %s") % (workload_id))
        if 'workload' in locals():
            options = {'user_id': user_id,
                       'project_id': project_id,
                       'workload_id': workload_id,
                       'snapshot_type': "full",
                       'display_name': "jobscheduler",
                       'display_description': "",
                       'host': '',
                       'status': 'error',
                       'error_msg': str(ex),
                       'metadata': {}, }

            context = wlm_context.get_admin_context()
            db_driver.snapshot_create(context, options)

    LOG.info("_snapshot_create_callback Exit")


def _get_client():
    try:
        wlm = wlmclient.Client(auth_url=CONF.keystone_authtoken.www_authenticate_uri,
                               username=CONF.keystone_authtoken.username,
                               password=CONF.keystone_authtoken.password,
                               tenant_id=CONF.keystone_authtoken.project_name,
                               project_id=CONF.keystone_authtoken.project_name,
                               domain_name=CONF.keystone_authtoken.user_domain_id,
                               endpoint_type=CONF.clients.endpoint_type,
                               insecure=CONF.clients.insecure,
                               cacert=CONF.clients.cafile)
        return wlm
    except Exception as ex:
        LOG.exception(ex)
        raise ex


class CronManager(manager.Manager):
    """Chooses a host to create snapshot."""

    RPC_API_VERSION = '2.0'

    def __init__(self, service_name=None, *args, **kwargs):

        super(CronManager, self).__init__(*args, **kwargs)

    @autolog.log_method(logger=Logger)
    def init_host(self):
        context = wlm_context.get_admin_context()
        _jobstore = SQLAlchemyJobStore(self.db)
        _job_scheduler = Scheduler()
        _job_scheduler.add_jobstore(context, _jobstore, 'jobscheduler_store')

        self._scheduler = _job_scheduler
        self.workload_ensure_global_job_scheduler(context)

    @autolog.log_method(logger=Logger)
    def workload_add_scheduler_job(
            self, context, jobschedule, workload):
        try:
            LOG.info("Adding job for workload: %s" %(workload.get('id', None)))
            job = self.db.job_get(context, workload['id'])
            if job:
                msg = 'Workload %s job is already scheduled. Pause workload before changing job parameters' % job.workload_id
                raise exception.InvalidState(reason=msg)
            if jobschedule and len(jobschedule):
                if str(jobschedule.get('enabled', False)).lower() == 'true':
                    user_domain_id = getattr(context, 'user_domain_id',
                                             getattr(context, 'user_domain', 'default'))
                    kwargs = {'workload_id': workload.get('id', None),
                              'user_id': workload.get('user_id', None),
                              'project_id': workload.get('project_id', None),
                              'user_domain_id': user_domain_id,
                              'user': workload.get('user_id', None),
                              'tenant': workload.get('project_id', None),
                              }
                    self._scheduler.add_workloadmgr_job(
                        context,
                        _snapshot_create_callback,
                        jobschedule,
                        jobstore='jobscheduler_store',
                        kwargs=kwargs)
        except Exception as ex:
            LOG.exception(ex)
            raise ex

    @autolog.log_method(logger=Logger)
    def workload_ensure_global_job_scheduler(self, context):
        try:
            try:
                global_scheduler = self.db.setting_get(context,
                        'global-job-scheduler',
                        get_hidden=True,
                        cloud_setting=True)
                db_status = bool(int(global_scheduler['value']))
            except exception.SettingNotFound:
                raise
            if db_status:
                LOG.info("Starting global job scheduler")
                self._scheduler.start(context)
            else:
                LOG.info("Shutting down global job scheduler")
                self._scheduler.shutdown()
        except exception.SettingNotFound:
            LOG.info("Starting global job scheduler")
            self._scheduler.start(context)
        except SchedulerAlreadyRunningError:
            pass
        except Exception as ex:
            LOG.exception(ex)

    @autolog.log_method(logger=Logger)
    def workload_pause(self, ctxt, workload_id, request_spec=None, filter_properties=None):
        try:
            LOG.info("Disabling workload: %s" %(workload_id))
            job = self._scheduler.get_job_by_workload_id(ctxt, workload_id)
            if job:
                self._scheduler.unschedule_job(ctxt, job)
        except Exception as ex:
            LOG.exception(ex)

    @autolog.log_method(logger=Logger)
    def workload_get_global_job_scheduler(self, context, request_spec=None, filter_properties=None):
        """Method to return global job scheduler status"""
        try:
            try:
                global_scheduler = self.db.setting_get(context,
                        'global-job-scheduler',
                        get_hidden=True,
                        cloud_setting=True)
                db_status = bool(int(global_scheduler['value']))
            except exception.SettingNotFound:
                db_status = True

            cron_status = self._scheduler.running
            #Case where DB status for GJs and cron  engine status is not in sync
            if db_status != cron_status:
                #sync cron engine with DB status for cron
                LOG.info("Global job scheduler status is not in sync between "\
                 "DB (%s) and cron engine (%s), syncing with the DB status"\
                 %(db_status, cron_status))

                self.workload_ensure_global_job_scheduler(context)
                cron_status = self._scheduler.running
                if cron_status != db_status:
                    raise Exception("Global job scheduler status is not in sync between "\
                     "DB (%s) and cron engine (%s), Please retry after sometime"\
                     %(db_status, cron_status))
            return cron_status
        except Exception as ex:
            LOG.exception(ex)


    @autolog.log_method(logger=Logger)
    def workload_enable_global_job_scheduler(self, context, request_spec=None, filter_properties=None):
        """Method to enable global job scheduler status"""
        LOG.info("Starting global job scheduler")
        return self._scheduler.start(context)

    @autolog.log_method(logger=Logger)
    def workload_disable_global_job_scheduler(self, context, request_spec=None, filter_properties=None):
        """Method to disable global job scheduler status"""
        LOG.info("Disabling global job scheduler")
        return self._scheduler.shutdown()