Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
tvault_configurator / apscheduler / jobstores / sqlalchemy_store.py
Size: Mime:
"""
Stores jobs in a database table using SQLAlchemy.
"""
import pickle
import logging

import sqlalchemy

from workloadmgr.apscheduler.jobstores.base import JobStore
from workloadmgr.apscheduler.job import Job
from workloadmgr.apscheduler.triggers.wlm import (
    WorkloadMgrTrigger, deserialize_trigger
)
from workloadmgr.openstack.common import timeutils
from workloadmgr.common import context as wlm_context

try:
    from sqlalchemy import *
except ImportError:  # pragma: nocover
    raise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed')

LOG = logging.getLogger(__name__)


class SQLAlchemyJobStore(JobStore):
    def __init__(self, db_driver, tablename='scheduled_jobs'):
        self.jobs = []
        self.db = db_driver

    def _dump_trigger(self, data):
        if isinstance(data, WorkloadMgrTrigger):
            try:
                result = data.serialize_trigger()
            except Exception as ex:
                LOG.exception("Exception occurred while dumping trigger: %s", ex)
                LOG.exception("Continuing with 'cp437' encoding")
                result = str(pickle.dumps(data, 0), 'cp437')
        else:
            LOG.info("Continuing with 'cp437' encoding")
            result = str(pickle.dumps(data, 0), 'cp437')
        return result

    def _load_trigger(self, data):
        try:
            return deserialize_trigger(data)
        except Exception as ex:
            LOG.exception("Exception occurred while loading trigger: %s", ex)
            LOG.exception("Continuing with 'cp437' decoding")
            return pickle.loads(bytes(data, 'cp437'))

    def add_job(self, context, job):
        try:
            job_dict = job.__getstate__()

            job_dict['workload_id'] = job_dict['kwargs']['workload_id']
            job_dict['trigger'] = self._dump_trigger(job_dict['trigger'])
            job_dict['args'] = str(pickle.dumps(job_dict['args'], 0), 'utf-8')
            job_dict['kwargs'] = str(pickle.dumps(job_dict['kwargs'], 0), 'utf-8')

            job_obj = self.db.job_create(context, job_dict)
            job.id = job_obj.id
            self.jobs.append(job)
            return job_obj
        except Exception as ex:
            LOG.exception(ex)
            raise ex

    def remove_job(self, context, job):
        try:
            self.db.job_delete(context, job.workload_id)
            self.jobs.remove(job)
        except Exception as ex:
            LOG.exception(ex)
            raise ex

    def remove_multiple_jobs(self, context, jobs_to_delete):
        try:
            if jobs_to_delete:
                self.db.job_multiple_delete(context, list(jobs_to_delete.keys()))
                for job_id, job in jobs_to_delete.items():
                    self.jobs.remove(job)
        except Exception as ex:
            LOG.exception(ex)
            raise ex

    def load_jobs(self, context):
        jobs = []
        db_jobs = self.db.job_get_all(context)
        for db_job in db_jobs:
            try:
                job = Job.__new__(Job)
                job_dict = dict(db_job)
                job_dict['trigger'] = self._load_trigger(job_dict['trigger'])
                job_dict['args'] = pickle.loads(bytes(job_dict['args'], 'utf-8'))
                job_dict['kwargs'] = pickle.loads(bytes(job_dict['kwargs'], 'utf-8'))

                job.__setstate__(job_dict)
                jobs.append(job)
            except Exception:
                job_name = job_dict.get('name', '(unknown)')
                LOG.exception('Unable to restore job "%s"', job_name)
        self.jobs = jobs

    def update_job(self, job):
        try:
            job_dict = job.__getstate__()
            job_dict['workload_id'] = job_dict['kwargs']['workload_id']
            job_dict['trigger'] = self._dump_trigger(job_dict['trigger'])
            job_dict['args'] = str(pickle.dumps(job_dict['args'], 0), 'utf-8')
            job_dict['kwargs'] = str(pickle.dumps(job_dict['kwargs'], 0), 'utf-8')
            admin_context = wlm_context.get_admin_context()
            job_obj = self.db.job_update(admin_context, job.id, job_dict)
            return job_obj
        except Exception as ex:
            LOG.exception(ex)
            raise ex

    def get_scheduler_data(self, context, job):
        scheduler_data = None
        try:
            job_dict = job.__getstate__()
            if job_dict.get('kwargs', {}).get('workload_id', None):
                workload_obj = self.db.workload_get(context, job_dict['kwargs']['workload_id'])
                if workload_obj and workload_obj.jobschedule:
                    scheduler_data = pickle.loads(bytes(workload_obj.jobschedule, 'utf-8'))
            return scheduler_data
        except Exception as ex:
            LOG.exception(ex)
            raise ex

    def close(self):
        pass