Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
"""
This module is the main part of the library. It houses the Scheduler class
and related exceptions.
"""

import collections
from threading import Thread, Event, Lock
from datetime import datetime, timedelta
from logging import getLogger
from oslo_config import cfg
import os
import sys
import time
import pickle
import uuid

from workloadmgr.apscheduler.util import *
from workloadmgr.apscheduler.triggers import SimpleTrigger, IntervalTrigger, CronTrigger, WorkloadMgrTrigger
from workloadmgr.apscheduler.jobstores.sqlalchemy_store import SQLAlchemyJobStore
from workloadmgr.apscheduler.job import Job, MaxInstancesReachedError
from workloadmgr.apscheduler.events import *
from workloadmgr.apscheduler.threadpool import ThreadPool
from workloadmgr.common import context as wlm_context

logger = getLogger(__name__)

global_job_scheduler_opts = [
    cfg.IntOpt('misfire_grace_time',
               default=600,
               help="Grace period during which the job scheduler "
               "allows jobs to run")
]

cfg.CONF.register_opts(global_job_scheduler_opts,
        group='global_job_scheduler')

CONF = cfg.CONF


class SchedulerAlreadyRunningError(Exception):
    """
    Raised when attempting to start or configure the scheduler when it's
    already running.
    """

    def __str__(self):
        return 'Scheduler is already running'


class Scheduler(object):
    """
    This class is responsible for scheduling jobs and triggering
    their execution.
    """

    _stopped = True
    _thread = None

    def __init__(self, gconfig={}, **options):
        self._wakeup = Event()
        self._jobstores = {}
        self._jobstores_lock = Lock()
        self._listeners = []
        self._listeners_lock = Lock()
        self._pending_jobs = []
        self.configure(gconfig, **options)

    def configure(self, gconfig={}, **options):
        """
        Reconfigures the scheduler with the given options. Can only be done
        when the scheduler isn't running.
        """
        logger.info('scheduler configure')
        if self.running:
            raise SchedulerAlreadyRunningError

        # Set general options
        config = combine_opts(gconfig, 'apscheduler.', options)
        self.misfire_grace_time = int(
                CONF.global_job_scheduler.misfire_grace_time)
        self.coalesce = asbool(config.pop('coalesce', True))
        self.daemonic = asbool(config.pop('daemonic', True))
        self.standalone = asbool(config.pop('standalone', False))

        # Configure the thread pool
        if 'threadpool' in config:
            self._threadpool = maybe_ref(config['threadpool'])
        else:
            logger.info('Creating thread pool')
            threadpool_opts = combine_opts(config, 'threadpool.')
            self._threadpool = ThreadPool(**threadpool_opts)
            self.threadpool_opts = threadpool_opts

        # Configure job stores
        jobstore_opts = combine_opts(config, 'jobstore.')
        jobstores = {}
        for key, value in list(jobstore_opts.items()):
            store_name, option = key.split('.', 1)
            opts_dict = jobstores.setdefault(store_name, {})
            opts_dict[option] = value

        for alias, opts in list(jobstores.items()):
            classname = opts.pop('class')
            cls = maybe_ref(classname)
            jobstore = cls(**opts)
            self.add_jobstore(jobstore, alias, True)

    def start(self, context):
        """
        Starts the scheduler in a new thread.
        In threaded mode (the default), this method will return immediately
        after starting the scheduler thread.
        In standalone mode, this method will block until there are no more
        scheduled jobs.
        """
        logger.info('scheduler start')
        if self.running:
            raise SchedulerAlreadyRunningError

        # Schedule all pending jobs
        for job, jobstore in self._pending_jobs:
            self._real_add_job(context, job, jobstore, False)
        del self._pending_jobs[:]

        self._stopped = False
        if self.standalone:
            self._main_loop()
        else:
            # Initialize threadpool before starting main loop
            if not self._threadpool:
                self._threadpool = ThreadPool(**self.threadpool_opts)

            self._thread = Thread(target=self._main_loop, name='APScheduler')
            self._thread.setDaemon(self.daemonic)
            self._thread.start()
        logger.info('thread pool shutdown flag %s' % int(self._threadpool._shutdown))

    def shutdown(self, wait=True, shutdown_threadpool=True,
                 close_jobstores=True):
        """
        Shuts down the scheduler and terminates the thread.
        Does not interrupt any currently running jobs.
        :param wait: ``True`` to wait until all currently executing jobs have
                     finished (if ``shutdown_threadpool`` is also ``True``)
        :param shutdown_threadpool: ``True`` to shut down the thread pool
        :param close_jobstores: ``True`` to close all job stores after shutdown
        """
        if not self.running:
            return

        self._stopped = True
        self._wakeup.set()

        # Stop the scheduler threads
        if self._thread:
            logger.info("Shutting down scheduler thread")

            self._thread.join()
            logger.info("Scheduler thread shutdown complete.")

        # Shut down the thread pool
        if shutdown_threadpool:
            logger.info("Shutting down threadpool")
            self._threadpool.shutdown(wait)
            self._threadpool = None
            logger.info("Threadpool shutdown complete.")

        # Close all job stores
        if close_jobstores:
            logger.info("Shutting down JobStore.")
            for jobstore in itervalues(self._jobstores):
                jobstore.close()
            logger.info("JobStore shutdown complete.")

    @property
    def running(self):
        thread_alive = self._thread and self._thread.is_alive()
        standalone = getattr(self, 'standalone', False)
        return not self._stopped and (standalone or thread_alive)

    def add_jobstore(self, context, jobstore, alias, quiet=False):
        """
        Adds a job store to this scheduler.
        :param jobstore: job store to be added
        :param alias: alias for the job store
        :param quiet: True to suppress scheduler thread wakeup
        :type jobstore: instance of
            :class:`~apscheduler.jobstores.base.JobStore`
        :type alias: str
        """
        try:
            self._jobstores_lock.acquire()
            if alias in self._jobstores:
                raise KeyError('Alias "%s" is already in use' % alias)
            self._jobstores[alias] = jobstore
            jobstore.load_jobs(context)
        except Exception as ex:
            logger.exception(ex)
        finally:
            self._jobstores_lock.release()

        # Notify listeners that a new job store has been added
        self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_ADDED, alias))

        # Notify the scheduler so it can scan the new job store for jobs
        if not quiet:
            self._wakeup.set()

    def remove_jobstore(self, alias, close=True):
        """
        Removes the job store by the given alias from this scheduler.
        :param close: ``True`` to close the job store after removing it
        :type alias: str
        """
        try:
            self._jobstores_lock.acquire()
            jobstore = self._jobstores.pop(alias)
            if not jobstore:
                raise KeyError('No such job store: %s' % alias)
        except Exception as ex:
            logger.exception(ex)
        finally:
            self._jobstores_lock.release()

        # Close the job store if requested
        if close:
            jobstore.close()

        # Notify listeners that a job store has been removed
        self._notify_listeners(JobStoreEvent(EVENT_JOBSTORE_REMOVED, alias))

    def add_listener(self, callback, mask=EVENT_ALL):
        """
        Adds a listener for scheduler events. When a matching event occurs,
        ``callback`` is executed with the event object as its sole argument.
        If the ``mask`` parameter is not provided, the callback will receive
        events of all types.
        :param callback: any callable that takes one argument
        :param mask: bitmask that indicates which events should be listened to
        """
        self._listeners_lock.acquire()
        try:
            self._listeners.append((callback, mask))
        except Exception as ex:
            logger.exception(ex)
        finally:
            self._listeners_lock.release()

    def remove_listener(self, callback):
        """
        Removes a previously added event listener.
        """
        self._listeners_lock.acquire()
        try:
            for i, (cb, _) in enumerate(self._listeners):
                if callback == cb:
                    del self._listeners[i]
        except Exception as ex:
            logger.exception(ex)
        finally:
            self._listeners_lock.release()

    def _notify_listeners(self, event):
        self._listeners_lock.acquire()
        try:
            listeners = tuple(self._listeners)
        except Exception as ex:
            logger.exception(ex)
        finally:
            self._listeners_lock.release()

        for cb, mask in listeners:
            if event.code & mask:
                try:
                    cb(event)
                except BaseException:
                    logger.exception('Error notifying listener')

    def _real_add_job(self, context, job, jobstore, wakeup):
        job.compute_next_run_time(datetime.utcnow())
        if not job.next_run_time:
            raise ValueError('Not adding job since it would never be run')

        try:
            self.log_thread_lock(sys._getframe().f_code.co_name, state='acquiring')
            self._jobstores_lock.acquire()
            self.log_thread_lock(sys._getframe().f_code.co_name)
            try:
                store = self._jobstores[jobstore]
            except KeyError:
                raise KeyError('No such job store: %s' % jobstore)
            try:
                store.add_job(context, job)
            except BaseException:
                # Retry Mysql going away
                store.add_job(context, job)
        except Exception as ex:
            logger.exception(ex)
        finally:
            self._jobstores_lock.release()
            self.log_thread_lock(sys._getframe().f_code.co_name, state='released')

        # Notify listeners that a new job has been added
        event = JobStoreEvent(EVENT_JOBSTORE_JOB_ADDED, jobstore, job)
        self._notify_listeners(event)

        logger.info('Added job "%s" to job store "%s"', job, jobstore)

        # Notify the scheduler about the new job
        if wakeup:
            self._wakeup.set()

    def add_job(self, context, trigger, func, args, kwargs, jobstore='default',
                **options):
        """
        Adds the given job to the job list and notifies the scheduler thread.
        Any extra keyword arguments are passed along to the constructor of the
        :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
        :param trigger: trigger that determines when ``func`` is called
        :param func: callable to run at the given time
        :param args: list of positional arguments to call func with
        :param kwargs: dict of keyword arguments to call func with
        :param jobstore: alias of the job store to store the job in
        :rtype: :class:`~apscheduler.job.Job`
        """

        processed_workloads = self._process_duplicate_jobs(datetime.utcnow())

        job = Job(trigger, func, args or [], kwargs or {},
                  options.pop('misfire_grace_time', self.misfire_grace_time),
                  options.pop('coalesce', self.coalesce), **options)
        self._real_add_job(context, job, jobstore, True)
        return job

    def _update_or_remove_job(self, job, alias, jobstore, now):
        try:
            context = wlm_context.get_admin_context()
            scheduler = jobstore.get_scheduler_data(context, job)
            if scheduler:
                job.trigger = WorkloadMgrTrigger(scheduler)

            # Update misfire_grace_time as per the configured value
            job.misfire_grace_time = int(
                CONF.global_job_scheduler.misfire_grace_time)
            # Update the job, but don't keep finished jobs around
            if job.compute_next_run_time(
                    now + timedelta(microseconds=1)):
                jobstore.update_job(job)
            else:
                self._remove_job(context, job, alias, jobstore)
        except Exception as ex:
            logger.exception(ex)

    def _process_duplicate_jobs(self, now):
        try:
            wl_ids_with_duplicate_trigger = []
            for alias, jobstore in iteritems(self._jobstores):
                workload_ids = [workload_id for workload_id in jobstore.jobs]
                duplicate_workload_ids = [
                    item for item, count in collections.Counter(workload_ids).items() if count > 1
                ]
                duplicate_workload_ids = set(duplicate_workload_ids)
                duplicate_jobs_by_workload_id = collections.defaultdict(list)
                if duplicate_workload_ids:
                    for job in jobstore.jobs:
                        if job.workload_id in duplicate_workload_ids:
                            duplicate_jobs_by_workload_id[job.workload_id].append(job)

                if duplicate_jobs_by_workload_id:
                    context = wlm_context.get_admin_context()
                    for workload_id, jobs in duplicate_jobs_by_workload_id.items():
                        processed = False
                        job_to_be_upadted = None
                        for job in jobs:
                            if not processed:
                                job_to_be_upadted = job
                                processed = True
                            try:
                                self.unschedule_job(context, job)
                            except Exception as ex:
                                logger.exception("Failed to remove job: {}".format(job))
                        if job_to_be_upadted:
                            wl_ids_with_duplicate_trigger.append(workload_id)
                            jobstore_name = "jobscheduler_store"
                            scheduler = jobstore.get_scheduler_data(context, job_to_be_upadted)
                            if scheduler:
                                job_to_be_upadted.trigger = WorkloadMgrTrigger(scheduler)
                            if not self.running:
                                self._pending_jobs.append((job_to_be_upadted, jobstore_name))
                                logger.info('Adding job tentatively -- it will be properly '
                                            'scheduled when the scheduler starts')
                            else:
                                self._real_add_job(context, job_to_be_upadted, jobstore_name, True)
            return wl_ids_with_duplicate_trigger
        except Exception as ex:
            logger.exception(ex)

    def _remove_job(self, context, job, alias, jobstore):
        jobstore.remove_job(context, job)

        # Notify listeners that a job has been removed
        event = JobStoreEvent(EVENT_JOBSTORE_JOB_REMOVED, alias, job)
        self._notify_listeners(event)

        logger.info('Removed job "%s"', job)

    def add_date_job(self, func, date, args=None, kwargs=None, **options):
        """
        Schedules a job to be completed on a specific date and time.
        Any extra keyword arguments are passed along to the constructor of the
        :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
        :param func: callable to run at the given time
        :param date: the date/time to run the job at
        :param name: name of the job
        :param jobstore: stored the job in the named (or given) job store
        :param misfire_grace_time: seconds after the designated run time that
            the job is still allowed to be run
        :type date: :class:`datetime.date`
        :rtype: :class:`~apscheduler.job.Job`
        """
        trigger = SimpleTrigger(date)
        return self.add_job(trigger, func, args, kwargs, **options)

    def add_interval_job(
            self,
            func,
            start_time,
            weeks=0,
            days=0,
            hours=0,
            minutes=0,
            seconds=0,
            start_date=None,
            args=None,
            kwargs=None,
            **options):
        """
        Schedules a job to be completed on specified intervals.
        Any extra keyword arguments are passed along to the constructor of the
        :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
        :param func: callable to run
        :param weeks: number of weeks to wait
        :param days: number of days to wait
        :param hours: number of hours to wait
        :param minutes: number of minutes to wait
        :param seconds: number of seconds to wait
        :param start_date: when to first execute the job and start the
            counter (default is after the given interval)
        :param args: list of positional arguments to call func with
        :param kwargs: dict of keyword arguments to call func with
        :param name: name of the job
        :param jobstore: alias of the job store to add the job to
        :param misfire_grace_time: seconds after the designated run time that
            the job is still allowed to be run
        :rtype: :class:`~apscheduler.job.Job`
        """
        interval = timedelta(weeks=weeks, days=days, hours=hours,
                             minutes=minutes, seconds=seconds)
        trigger = IntervalTrigger(interval, start_time, start_date)
        return self.add_job(trigger, func, args, kwargs, **options)

    def add_workloadmgr_job(self, context, func, jobschedule, args=None,
                            kwargs=None, **options):
        """
        Schedules a job to be completed on specified intervals.
        Any extra keyword arguments are passed along to the constructor of the
        :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
        :param func: callable to run
        :param jobschedule: how frequently the snapshot operation need to be scheduled
        :param args: list of positional arguments to call func with
        :param kwargs: dict of keyword arguments to call func with
        :param name: name of the job
        :param jobstore: alias of the job store to add the job to
        :param misfire_grace_time: seconds after the designated run time that
            the job is still allowed to be run
        :rtype: :class:`~apscheduler.job.Job`
        """
        trigger = WorkloadMgrTrigger(jobschedule)
        return self.add_job(context, trigger, func, args, kwargs, **options)

    def add_cron_job(self, func, year=None, month=None, day=None, week=None,
                     day_of_week=None, hour=None, minute=None, second=None,
                     start_date=None, args=None, kwargs=None, **options):
        """
        Schedules a job to be completed on times that match the given
        expressions.
        Any extra keyword arguments are passed along to the constructor of the
        :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
        :param func: callable to run
        :param year: year to run on
        :param month: month to run on
        :param day: day of month to run on
        :param week: week of the year to run on
        :param day_of_week: weekday to run on (0 = Monday)
        :param hour: hour to run on
        :param second: second to run on
        :param args: list of positional arguments to call func with
        :param kwargs: dict of keyword arguments to call func with
        :param name: name of the job
        :param jobstore: alias of the job store to add the job to
        :param misfire_grace_time: seconds after the designated run time that
            the job is still allowed to be run
        :return: the scheduled job
        :rtype: :class:`~apscheduler.job.Job`
        """
        trigger = CronTrigger(year=year, month=month, day=day, week=week,
                              day_of_week=day_of_week, hour=hour,
                              minute=minute, second=second,
                              start_date=start_date)
        return self.add_job(trigger, func, args, kwargs, **options)

    def cron_schedule(self, **options):
        """
        Decorator version of :meth:`add_cron_job`.
        This decorator does not wrap its host function.
        Unscheduling decorated functions is possible by passing the ``job``
        attribute of the scheduled function to :meth:`unschedule_job`.
        Any extra keyword arguments are passed along to the constructor of the
        :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
        """
        def inner(func):
            func.job = self.add_cron_job(func, **options)
            return func
        return inner

    def interval_schedule(self, **options):
        """
        Decorator version of :meth:`add_interval_job`.
        This decorator does not wrap its host function.
        Unscheduling decorated functions is possible by passing the ``job``
        attribute of the scheduled function to :meth:`unschedule_job`.
        Any extra keyword arguments are passed along to the constructor of the
        :class:`~apscheduler.job.Job` class (see :ref:`job_options`).
        """
        def inner(func):
            func.job = self.add_interval_job(func, **options)
            return func
        return inner

    def get_jobs(self, context):
        """
        Returns a list of all scheduled jobs.
        :return: list of :class:`~apscheduler.job.Job` objects
        """
        try:
            self.log_thread_lock(sys._getframe().f_code.co_name, state='acquiring')
            self._jobstores_lock.acquire()
            self.log_thread_lock(sys._getframe().f_code.co_name)
            jobs = []
            for jobstore in itervalues(self._jobstores):
                jobs.extend(jobstore.jobs)
            return jobs
        except Exception as ex:
            logger.exception(ex)
        finally:
            self._jobstores_lock.release()
            self.log_thread_lock(sys._getframe().f_code.co_name, state='released')

    def get_job_by_workload_id(self, context, workload_id):
        """
        Returns the job associated with workload_id
        :return: :class:`~apscheduler.job.Job` object
        """
        try:
            self.log_thread_lock(sys._getframe().f_code.co_name, state='acquiring')
            self._jobstores_lock.acquire()
            self.log_thread_lock(sys._getframe().f_code.co_name)
            jobs = []
            for jobstore in itervalues(self._jobstores):
                job = jobstore.get_job_by_workload_id(context, workload_id)
                if job:
                    return job
            return None
        except Exception as ex:
            logger.exception(ex)
        finally:
            self._jobstores_lock.release()
            self.log_thread_lock(sys._getframe().f_code.co_name, state='released')

    def unschedule_job(self, context, job):
        """
        Removes a job, preventing it from being run any more.
        """
        try:
            self.log_thread_lock(sys._getframe().f_code.co_name, state='acquiring')
            self._jobstores_lock.acquire()
            self.log_thread_lock(sys._getframe().f_code.co_name)
            for alias, jobstore in iteritems(self._jobstores):
                if job.workload_id in jobstore.jobs:
                    self._remove_job(context, job, alias, jobstore)
                    return
        except Exception as ex:
            # retry for OperationalError: (OperationalError)
            # (2006, 'MySQL server has gone away')
            logger.exception(
                "could not remove job from jobstore due to error: {}, retrying once more".format(
                    ex)
            )
            for alias, jobstore in iteritems(self._jobstores):
                if job.workload_id in jobstore.jobs:
                    self._remove_job(context, job, alias, jobstore)
                    return
        finally:
            self._jobstores_lock.release()
            self.log_thread_lock(sys._getframe().f_code.co_name, state='released')

        raise KeyError('Job "%s" is not scheduled in any job store' % job)

    def unschedule_func(self, func):
        """
        Removes all jobs that would execute the given function.
        """
        found = False
        try:
            self.log_thread_lock(sys._getframe().f_code.co_name, state='acquiring')
            self._jobstores_lock.acquire()
            self.log_thread_lock(sys._getframe().f_code.co_name)
            for alias, jobstore in iteritems(self._jobstores):
                for job.workload_id in jobstore.jobs:
                    if job.func == func:
                        self._remove_job(job, alias, jobstore)
                        found = True
        except Exception as ex:
            logger.exception(ex)
        finally:
            self._jobstores_lock.release()
            self.log_thread_lock(sys._getframe().f_code.co_name, state='released')

        if not found:
            raise KeyError('The given function is not scheduled in this '
                           'scheduler')

    def print_jobs(self, out=None):
        """
        Prints out a textual listing of all jobs currently scheduled on this
        scheduler.
        :param out: a file-like object to print to (defaults to **sys.stdout**
                    if nothing is given)
        """
        out = out or sys.stdout
        job_strs = []
        try:
            self.log_thread_lock(sys._getframe().f_code.co_name, state='acquiring')
            self._jobstores_lock.acquire()
            self.log_thread_lock(sys._getframe().f_code.co_name)
            for alias, jobstore in iteritems(self._jobstores):
                job_strs.append('Jobstore %s:' % alias)
                if jobstore.jobs:
                    for wlid, job in jobstore.jobs.items():
                        job_strs.append('    %s' % job)
                else:
                    job_strs.append('    No scheduled jobs')
        except Exception as ex:
            logger.exception(ex)
        finally:
            self._jobstores_lock.release()
            self.log_thread_lock(sys._getframe().f_code.co_name, state='released')

        out.write(os.linesep.join(job_strs) + os.linesep)

    def _run_job(self, job, run_times):
        """
        Acts as a harness that runs the actual job code in a thread.
        """
        for run_time in run_times:
            # See if the job missed its run time window, and handle possible
            # misfires accordingly
            difference = datetime.utcnow() - run_time
            grace_time = timedelta(
                          seconds=CONF.global_job_scheduler.misfire_grace_time)
            if difference > grace_time:
                # Notify listeners about a missed run
                event = JobEvent(EVENT_JOB_MISSED, job, run_time)
                self._notify_listeners(event)
                logger.warning('Run time of job "%s" was missed by %s',
                               job, difference)
            else:
                try:
                    job.add_instance()
                except MaxInstancesReachedError:
                    event = JobEvent(EVENT_JOB_MISSED, job, run_time)
                    self._notify_listeners(event)
                    logger.warning('Execution of job "%s" skipped: '
                                   'maximum number of running instances '
                                   'reached (%d)', job, job.max_instances)
                    break
                except Exception as ex:
                    logger.exception(ex)

                logger.info('Running job "%s" (scheduled at %s)', job,
                            run_time)

                try:
                    retval = job.func(*job.args, **job.kwargs)
                except BaseException:
                    # Notify listeners about the exception
                    exc, tb = sys.exc_info()[1:]
                    event = JobEvent(EVENT_JOB_ERROR, job, run_time,
                                     exception=exc, traceback=tb)
                    self._notify_listeners(event)

                    logger.exception('Job "%s" raised an exception', job)
                else:
                    # Notify listeners about successful execution
                    event = JobEvent(EVENT_JOB_EXECUTED, job, run_time,
                                     retval=retval)
                    self._notify_listeners(event)

                    logger.info('Job "%s" executed successfully', job)

                job.remove_instance()

                # If coalescing is enabled, don't attempt any further runs
                if job.coalesce:
                    break

    def _process_jobs(self, now):
        """
        Iterates through jobs in every jobstore, starts pending jobs
        and figures out the next wakeup time.
        """
        next_wakeup_time = None
        try:
            # processing duplicate jobs with same workload if present
            self._process_duplicate_jobs(now)

            self.log_thread_lock(sys._getframe().f_code.co_name, state='acquiring')
            self._jobstores_lock.acquire()
            self.log_thread_lock(sys._getframe().f_code.co_name)

            for alias, jobstore in iteritems(self._jobstores):
                for wlid, job in jobstore.jobs.items():
                    try:
                        if self._stopped:
                            break
                        run_times = job.get_run_times(now)
                        if run_times:
                            try:
                                self._threadpool.submit(self._run_job, job, run_times)
                            except Exception as ex:
                                logger.exception(ex)

                            # Update misfire_grace_timeas
                            # per the configured value
                            job.misfire_grace_time = int(
                                  CONF.global_job_scheduler.misfire_grace_time)
                            # Increase the job's run count
                            if job.coalesce:
                                job.runs += 1
                            else:
                                job.runs += len(run_times)
                            if job.compute_next_run_time(
                                    now + timedelta(microseconds=1)):
                                # Update the job, but don't keep finished jobs around
                                jobstore.update_job(job)
                        else:
                            self._update_or_remove_job(job, alias, jobstore, now)

                        if not next_wakeup_time:
                            next_wakeup_time = job.next_run_time
                        elif job.next_run_time:
                            next_wakeup_time = min(next_wakeup_time,
                                                   job.next_run_time)
                    except Exception as ex:
                        logger.exception(ex)
        except Exception as ex:
            logger.exception(ex)
        finally:
            self._jobstores_lock.release()
            self.log_thread_lock(sys._getframe().f_code.co_name, state='released')
        return next_wakeup_time

    def _main_loop(self):
        """Executes jobs on schedule."""

        logger.info('Scheduler started')
        self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_START))

        self._wakeup.clear()
        try:
            self.log_thread_lock(sys._getframe().f_code.co_name, state='acquiring')
            self._jobstores_lock.acquire()
            self.log_thread_lock(sys._getframe().f_code.co_name)

            for alias, jobstore in iteritems(self._jobstores):
                jobs_by_workload_id = {}
                jobs_to_delete = {}
                for wlid, job in jobstore.jobs.items():
                    if self._stopped:
                        break
                    jobs_to_delete[job.id] = job
                    if job.workload_id in jobs_by_workload_id:
                        if jobs_by_workload_id[job.workload_id].updated_at and job.updated_at:
                            if jobs_by_workload_id[job.workload_id].updated_at < job.updated_at:
                                jobs_by_workload_id[job.workload_id] = job
                        elif job.updated_at:
                            jobs_by_workload_id[job.workload_id] = job
                    else:
                        jobs_by_workload_id[job.workload_id] = job
                job_ids_to_persist = [job.id for workload_id, job in jobs_by_workload_id.items()]
                for job_id in job_ids_to_persist:
                    jobs_to_delete.pop(job_id, None)
                if jobs_to_delete:
                    context = wlm_context.get_admin_context()
                    jobstore.remove_multiple_jobs(context, jobs_to_delete)
        except Exception as ex:
            logger.exception(ex)
        finally:
            self._jobstores_lock.release()
            self.log_thread_lock(sys._getframe().f_code.co_name, state='released')

        while not self._stopped:
            try:
                logger.info('Looking for jobs to run')
                now = datetime.utcnow()
                next_wakeup_time = self._process_jobs(now)

                # Sleep until the next job is scheduled to be run,
                # a new job is added or the scheduler is stopped
                if next_wakeup_time is not None:
                    wait_seconds = time_difference(next_wakeup_time, now)
                    logger.info('Next wakeup is due at %s (in %f seconds)',
                                next_wakeup_time, wait_seconds)
                    try:
                        if self._wakeup.wait(wait_seconds):

                            if not self._stopped:
                                # We may have woken up by a add job method.
                                # lets wait few minutes before rushing to calculate
                                # next rune time so if the client is adding more jobs
                                # we don't get in the way of adding jobs. As long
                                # as we sleep less than grace period, we will be fine

                                time.sleep(180)
                    except IOError:  # Catch errno 514 on some Linux kernels
                        pass
                    except Exception as ex:
                        logger.exception(ex)
                    self._wakeup.clear()
                elif self.standalone:
                    logger.info('No jobs left; shutting down scheduler')
                    self.shutdown()
                    break
                else:
                    logger.info('No jobs; waiting until a job is added')
                    try:
                        self._wakeup.wait()
                    except IOError:  # Catch errno 514 on some Linux kernels
                        pass
                    except Exception as ex:
                        logger.exception(ex)
                    self._wakeup.clear()
            except Exception as ex:
                logger.exception(ex)
                logger.info('main_loop of global job scheduler encounted exeption')
                raise

        logger.info('Scheduler has been shut down')
        self._notify_listeners(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN))

    def log_thread_lock(self, func_name, state='acquired'):
        thread_id = None
        if hasattr(self, '_thread') and self._thread and self._thread.is_alive():
            thread_id = self._thread._ident
        logger.debug("Thread {}  {} lock inside {}".format(thread_id, state, func_name))