Repository URL to install this package:
|
Version:
4.1.142 ▾
|
"""
Stores jobs in a database table using SQLAlchemy.
"""
import pickle
import logging
import sqlalchemy
from workloadmgr.apscheduler.jobstores.base import JobStore
from workloadmgr.apscheduler.job import Job
from workloadmgr.apscheduler.triggers.wlm import (
WorkloadMgrTrigger, deserialize_trigger
)
from workloadmgr.openstack.common import timeutils
from workloadmgr.common import context as wlm_context
try:
from sqlalchemy import *
except ImportError: # pragma: nocover
raise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed')
LOG = logging.getLogger(__name__)
class SQLAlchemyJobStore(JobStore):
def __init__(self, db_driver, tablename='scheduled_jobs'):
self.jobs = []
self.db = db_driver
def _dump_trigger(self, data):
if isinstance(data, WorkloadMgrTrigger):
try:
result = data.serialize_trigger()
except Exception as ex:
LOG.exception("Exception occurred while dumping trigger: %s", ex)
LOG.exception("Continuing with 'cp437' encoding")
result = str(pickle.dumps(data, 0), 'cp437')
else:
LOG.info("Continuing with 'cp437' encoding")
result = str(pickle.dumps(data, 0), 'cp437')
return result
def _load_trigger(self, data):
try:
return deserialize_trigger(data)
except Exception as ex:
LOG.exception("Exception occurred while loading trigger: %s", ex)
LOG.exception("Continuing with 'cp437' decoding")
return pickle.loads(bytes(data, 'cp437'))
def add_job(self, context, job):
try:
job_dict = job.__getstate__()
job_dict['workload_id'] = job_dict['kwargs']['workload_id']
job_dict['trigger'] = self._dump_trigger(job_dict['trigger'])
job_dict['args'] = str(pickle.dumps(job_dict['args'], 0), 'utf-8')
job_dict['kwargs'] = str(pickle.dumps(job_dict['kwargs'], 0), 'utf-8')
job_obj = self.db.job_create(context, job_dict)
job.id = job_obj.id
self.jobs.append(job)
return job_obj
except Exception as ex:
LOG.exception(ex)
raise ex
def remove_job(self, context, job):
try:
self.db.job_delete(context, job.workload_id)
self.jobs.remove(job)
except Exception as ex:
LOG.exception(ex)
raise ex
def remove_multiple_jobs(self, context, jobs_to_delete):
try:
if jobs_to_delete:
self.db.job_multiple_delete(context, list(jobs_to_delete.keys()))
for job_id, job in jobs_to_delete.items():
self.jobs.remove(job)
except Exception as ex:
LOG.exception(ex)
raise ex
def load_jobs(self, context):
jobs = []
db_jobs = self.db.job_get_all(context)
for db_job in db_jobs:
try:
job = Job.__new__(Job)
job_dict = dict(db_job)
job_dict['trigger'] = self._load_trigger(job_dict['trigger'])
job_dict['args'] = pickle.loads(bytes(job_dict['args'], 'utf-8'))
job_dict['kwargs'] = pickle.loads(bytes(job_dict['kwargs'], 'utf-8'))
job.__setstate__(job_dict)
jobs.append(job)
except Exception:
job_name = job_dict.get('name', '(unknown)')
LOG.exception('Unable to restore job "%s"', job_name)
self.jobs = jobs
def update_job(self, job):
try:
job_dict = job.__getstate__()
job_dict['workload_id'] = job_dict['kwargs']['workload_id']
job_dict['trigger'] = self._dump_trigger(job_dict['trigger'])
job_dict['args'] = str(pickle.dumps(job_dict['args'], 0), 'utf-8')
job_dict['kwargs'] = str(pickle.dumps(job_dict['kwargs'], 0), 'utf-8')
admin_context = wlm_context.get_admin_context()
job_obj = self.db.job_update(admin_context, job.id, job_dict)
return job_obj
except Exception as ex:
LOG.exception(ex)
raise ex
def get_scheduler_data(self, context, job):
scheduler_data = None
try:
job_dict = job.__getstate__()
if job_dict.get('kwargs', {}).get('workload_id', None):
workload_obj = self.db.workload_get(context, job_dict['kwargs']['workload_id'])
if workload_obj and workload_obj.jobschedule:
scheduler_data = pickle.loads(bytes(workload_obj.jobschedule, 'utf-8'))
return scheduler_data
except Exception as ex:
LOG.exception(ex)
raise ex
def close(self):
pass