Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:

const MongoClient = require('mongodb').MongoClient;

const JOB_CREATED = 0;
const JOB_STARTING = 1;
const JOB_RUNNING = 2;
const JOB_SUCCESS = 3;
const JOB_ERROR = 4;

class DatabaseLayer {
    constructor(mongoDbUrl) {
        this.mongoDbUrl = mongoDbUrl;
        this.currentConnection = null;

        this.JOB_CREATED = JOB_CREATED;
        this.JOB_STARTING = JOB_STARTING;
        this.JOB_RUNNING = JOB_RUNNING;
        this.JOB_SUCCESS = JOB_SUCCESS;
        this.JOB_ERROR = JOB_ERROR;
    }

    getConnection() {
        return new Promise((resolve, reject) => {
            if (this.currentConnection !== null) {
                // Even if the connection is dropped, it should auto reconnect by default
                resolve(this.currentConnection);
            } else {
                MongoClient.connect(this.mongoDbUrl, { useNewUrlParser: true }, (err, db) => {
                    if (err) {
                        reject(err);
                    } else {
                        this.currentConnection = db;
                        resolve(db);
                    }
                });
            }
        });
    }

    getCollection(con, name) {
        return new Promise((resolve, reject) => {
            con.collection(name, (err, col) => {
                if (err) {
                    reject(err);
                } else {
                    resolve(col);
                }
            });
        });
    }

    insertJob(jobId, queueId, jobType, jobParams, parentJobId) {
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.insertOne({
                    _id: jobId,
                    jobId: jobId,
                    queueId: queueId,
                    jobType: jobType,
                    jobParams: jobParams,
                    parentJobId: parentJobId,

                    status: 0,
                    state: {},
                    logs: [],

                    created: new Date(),
                    latestUpdate: new Date(),
                    processId: null,
                }).then(ret => {
                    if (!('result' in ret)) {
                        console.error(ret);
                        throw new Error('Unexpected mongodb response');
                    }
                    if (ret.result.ok !== 1) {
                        throw new Error('Expected mongodb result.ok to be 1 but was: ' + ret.result.ok);
                    }
                    return jobId;
                });
            })
        });
    }

    updateJobState(jobId, newState) {
        // Overwrites the current state with new state object
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.updateOne({
                    _id: jobId,
                }, {
                    $set: {
                        state: newState,
                        latestUpdate: new Date(),
                    },
                });
            })
        });
    }

    addJobLogItem(jobId, logItem) {
        // Appends the logItem to the logs key
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.updateOne({
                    _id: jobId,
                }, {
                    $push: {
                        logs: logItem
                    },
                    $set: {
                        latestUpdate: new Date(),
                    },
                });
            })
        });
    }

    startingJob(jobId, processId) {
        // Updates the job status
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.updateOne({
                    _id: jobId,
                }, {
                    $set: {
                        status: this.JOB_STARTING,
                        latestUpdate: new Date(),
                        processId: processId,
                    },
                });
            })
        });
    }

    runningJob(jobId) {
        // Updates the job status
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.updateOne({
                    _id: jobId,
                }, {
                    $set: {
                        status: this.JOB_RUNNING,
                        latestUpdate: new Date(),
                    },
                });
            })
        });
    }

    errorJob(jobId, error) {
        // Updates the job status
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.updateOne({
                    _id: jobId,
                }, {
                    $set: {
                        status: this.JOB_ERROR,
                        latestUpdate: new Date(),
                        'state.error': error,
                    },
                });
            })
        });
    }

    successJob(jobId, success) {
        // Updates the job status
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.updateOne({
                    _id: jobId,
                }, {
                    $set: {
                        status: this.JOB_SUCCESS,
                        latestUpdate: new Date(),
                        'state.success': success,
                    },
                });
            })
        });
    }

    getAllJobs(sinceHoursAgo) {
        let hoursAgo = new Date();
        hoursAgo.setHours(hoursAgo.getHours() - sinceHoursAgo);

        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.find({
                    created: {
                        $gt: hoursAgo,
                    }
                }, { sort: [
                    ['created', 1]
                ] }).toArray();
            });
        });
    }

    getJobsWaitingByQueue() {
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.find({
                    status: this.JOB_CREATED,
                }, { sort: [
                    ['created', 1]
                ] }).toArray();
            }).then(jobs => {
                let nextJobs = {};
                jobs.map(job => {
                    if ('queueId' in job && job.queueId !== null) {
                        if (!(job.queueId in nextJobs)) {
                            nextJobs[job.queueId] = [];
                        }
                        nextJobs[job.queueId].push(job);
                    }
                });
                return nextJobs;
            })
        });
    }

    getJobsWaitingWithNoQueue() {
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.find({
                    status: JOB_CREATED,
                    queueId: null,
                }, { sort: [
                    ['created', 1]
                ] }).toArray();
            });
        });
    }

    getJobsStartingOrRunning() {
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.find({
                    status: {
                        $in: [this.JOB_STARTING, this.JOB_RUNNING],
                    }
                }).toArray();
            }).then(jobs => {
                jobs.map(job => {
                    job.jobId = job._id;
                });
                return jobs;
            });
        });
    }

    getJobById(jobId) {
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.find({ _id: jobId }).toArray();
            }).then(result => {
                if (result.length === 0) {
                    throw new Error('Could not find job: ' + jobId);
                } else if (result.length === 1) {
                    return result[0];
                } else {
                    throw new Error('Multiple jobs found with id: ' + jobId);
                }
            });
        });
    }

    getJobCounts(hours) {
        let hoursAgo = new Date();
        hoursAgo.setHours(hoursAgo.getHours() - hours);

        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.find({
                    created: {
                        $gt: hoursAgo,
                    }
                }).toArray();
            }).then(jobs => {
                let counts = {
                    JOB_CREATED: 0,
                    JOB_STARTING: 0,
                    JOB_RUNNING: 0,
                    JOB_SUCCESS: 0,
                    JOB_ERROR: 0,
                    OTHER: 0,
                };

                jobs.map(job => {
                    if (job.status === this.JOB_CREATED) {
                        counts['JOB_CREATED']++;
                    } else if (job.status === this.JOB_STARTING) {
                        counts['JOB_STARTING']++;
                    } else if (job.status === this.JOB_RUNNING) {
                        counts['JOB_RUNNING']++;
                    } else if (job.status === this.JOB_SUCCESS) {
                        counts['JOB_SUCCESS']++;
                    } else if (job.status === this.JOB_ERROR) {
                        counts['JOB_ERROR']++;
                    } else {
                        counts['OTHER']++;
                    }
                });

                return counts;
            });
        });
    }
}

module.exports = DatabaseLayer;