Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:

const MongoClient = require('mongodb').MongoClient;
const ObjectId = require('mongodb').ObjectID

const JOB_CREATED = 0;
const JOB_STARTING = 1;
const JOB_RUNNING = 2;
const JOB_SUCCESS = 3;
const JOB_ERROR = 4;

class DatabaseLayer {
    constructor(mongoDbUrl) {
        this.mongoDbUrl = mongoDbUrl;
        this.currentConnection = null;

        // To expose externally
        this.JOB_CREATED = JOB_CREATED;
        this.JOB_STARTING = JOB_STARTING;
        this.JOB_RUNNING = JOB_RUNNING;
        this.JOB_SUCCESS = JOB_SUCCESS;
        this.JOB_ERROR = JOB_ERROR;
    }

    getConnection() {
        return new Promise((resolve, reject) => {
            if (this.currentConnection !== null) {
                // Even if the connection is dropped, it should auto reconnect by default
                resolve(this.currentConnection);
            } else {
                MongoClient.connect(this.mongoDbUrl, {
                    useUnifiedTopology: true,
                    useNewUrlParser: true,
                }, (err, db) => {
                    if (err) {
                        reject(err);
                    } else {
                        this.currentConnection = db;
                        resolve(db);
                    }
                });
            }
        });
    }

    getCollection(con, name) {
        return new Promise((resolve, reject) => {
            con.collection(name, (err, col) => {
                if (err) {
                    reject(err);
                } else {
                    resolve(col);
                }
            });
        });
    }

    _genericUpdateOne(collectionName, filter, updates, options) {
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), collectionName).then(repo => {
                return repo.updateOne(filter, updates, options).then(ret => {
                    if (!('result' in ret)) {
                        console.error(ret);
                        throw new Error('Unexpected mongodb response');
                    }
                    if (ret.result.ok !== 1) {
                        throw new Error('Expected mongodb result.ok to be 1 but was: ' + ret.result.ok);
                    }
                    // console.log("Result was", ret.result);
                    return ret.result;
                });
            })
        });
    }

    _genericInsertOne(collectionName, document) {
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), collectionName).then(repo => {
                return repo.insertOne(document).then(ret => {
                    if ('insertedCount' in ret && ret.insertedCount === 1 && 'insertedId' in ret) {
                        // OK
                        return ret.insertedId;
                    }
                    console.error(ret);
                    throw new Error('Unexpected mongodb response');
                });
            })
        });
    }

    _genericPagination(collectionName, lastId = null, numResults = 5, getAfterLastId = false) {
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), collectionName).then(repo => {
                let query = {};
                if (lastId !== null && lastId !== '0') {
                    if (getAfterLastId) {
                        query['_id'] = { $gt: ObjectId(lastId) };
                    } else {
                        query['_id'] = { $lt: ObjectId(lastId) };
                    }
                }
                let cursor = repo.find(query).sort({
                    _id: getAfterLastId ? 1 : -1,
                });
                return cursor.count().then(cursorSize => {
                    return Promise.resolve(cursor.limit(numResults).toArray()).then(results => {
                        // Format the results using some generic pagination format
                        let lastId = null;
                        if (results.length > 0) {
                            lastId = results[results.length - 1]._id;
                        }
                        return {
                            results: results,
                            lastId: lastId,
                            totalResults: cursorSize,
                            moreResults: cursorSize > numResults,
                        }
                    })
                })
            })
        });
    }

    insertJob(jobId, queueId, jobType, jobParams, parentJobId) {
        return this._genericInsertOne('job', {
            id: jobId,
            queueId: queueId,
            jobType: jobType,
            jobParams: jobParams,
            parentJobId: parentJobId,

            status: 0,
            state: {},
            logs: [],

            created: new Date(),
            latestUpdate: new Date(),
            processId: null,
        });
    }

    updateJobState(jobId, newState) {
        // Overwrites the current state with new state object
        return this._genericUpdateOne('job', {
            id: jobId,
        }, {
            $set: {
                state: newState,
                latestUpdate: new Date(),
            },
        });
    }

    updateJobProgress(jobId, progress) {
        // Overwrites the current progress with new progress data
        return this._genericUpdateOne('job', {
            id: jobId,
        }, {
            $set: {
                progress: progress,
                latestUpdate: new Date(),
            },
        });
    }

    addJobLogItem(jobId, logItem) {
        // Appends the logItem to the logs key
        return this._genericUpdateOne('job', {
            id: jobId,
        }, {
            $push: {
                logs: logItem
            },
            $set: {
                latestUpdate: new Date(),
            },
        });
    }

    addManyJobLogItems(jobId, manyLogItems) {
        // Appends ALL the logItems to the logs key
        // console.log('Pushing', manyLogItems.length, 'log items at once to jobId', jobId);
        return this._genericUpdateOne('job', {
            id: jobId,
        }, {
            $push: {
                logs: {
                    $each: manyLogItems
                }
            },
            $set: {
                latestUpdate: new Date(),
            },
        }).then((result) => {
            // console.log('DONE');
            return result;
        });
    }

    startingJob(jobId, processId) {
        // Updates the job status
        return this._genericUpdateOne('job', {
            id: jobId,
        }, {
            $set: {
                status: this.JOB_STARTING,
                latestUpdate: new Date(),
                processId: processId,
            },
        });
    }

    runningJob(jobId) {
        // Updates the job status
        return this._genericUpdateOne('job', {
            id: jobId,
        }, {
            $set: {
                status: this.JOB_RUNNING,
                latestUpdate: new Date(),
            },
        });
    }

    errorJob(jobId, error) {
        // Updates the job status and error in the state
        return this._genericUpdateOne('job', {
            id: jobId,
        }, {
            $set: {
                status: this.JOB_ERROR,
                latestUpdate: new Date(),
                'state.error': error,
            },
        });
    }

    successJob(jobId, success) {
        // Updates the job status and success in the state
        return this._genericUpdateOne('job', {
            id: jobId,
        }, {
            $set: {
                status: this.JOB_SUCCESS,
                latestUpdate: new Date(),
                'state.success': success,
            },
        });
    }

    /*
    getAllJobs(sinceHoursAgo) {
        let hoursAgo = new Date();
        hoursAgo.setHours(hoursAgo.getHours() - sinceHoursAgo);

        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.find({
                    created: {
                        $gt: hoursAgo,
                    }
                }, { sort: [
                    ['created', 1]
                ] }).toArray();
            });
        });
    }
    */

    getJobs(lastId = null, reverse = false) {
        return this._genericPagination('job', lastId, 50, reverse);
    }

    // Used by queue.assignJobs
    getJobsWaitingByQueue() {
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.find({
                    status: this.JOB_CREATED,
                }, { sort: [
                    ['created', 1]
                ] }).toArray();
            }).then(jobs => {
                let nextJobs = {};
                jobs.map(job => {
                    if ('queueId' in job && job.queueId !== null) {
                        if (!(job.queueId in nextJobs)) {
                            nextJobs[job.queueId] = [];
                        }
                        nextJobs[job.queueId].push(job);
                    }
                });
                return nextJobs;
            })
        });
    }

    // Used by queue.assignJobs
    getJobsWaitingWithNoQueue() {
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.find({
                    status: JOB_CREATED,
                    queueId: null,
                }, { sort: [
                    ['created', 1]
                ] }).toArray();
            });
        });
    }

    // Used by queue.start during the startup process to error any jobs marked as running or starting
    getJobsStartingOrRunning() {
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.find({
                    status: {
                        $in: [this.JOB_STARTING, this.JOB_RUNNING],
                    }
                }).toArray();
            });
        });
    }

    // Used by queue.getJobById
    getJobById(jobId) {
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.find({ id: jobId }).toArray();
            }).then(result => {
                if (result.length === 0) {
                    throw new Error('Could not find job: ' + jobId);
                } else if (result.length === 1) {
                    return result[0];
                } else {
                    throw new Error('Multiple jobs found with id: ' + jobId);
                }
            });
        });
    }

    // Used by queue.getStats
    getJobCounts() {
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.aggregate(
                    [
                        {
                            $group: {
                                _id: {
                                    status: '$status',
                                },
                                count: {
                                    $sum: 1
                                }
                            }
                        }
                    ]
                ).toArray();
            }).then(result => {
                let statusTexts = {}
                statusTexts[this.JOB_CREATED] = 'JOB_CREATED';
                statusTexts[this.JOB_STARTING] = 'JOB_STARTING';
                statusTexts[this.JOB_RUNNING] = 'JOB_RUNNING';
                statusTexts[this.JOB_SUCCESS] = 'JOB_SUCCESS';
                statusTexts[this.JOB_ERROR] = 'JOB_ERROR';

                let results = {
                    'OTHER': 0,
                }
                Object.keys(statusTexts).map(statusId => {
                    results[statusTexts[statusId]] = 0;
                })

                result.map(row => {
                    let rowStatusId = row['_id'].status;
                    if (rowStatusId in statusTexts) {
                        results[statusTexts[rowStatusId]] = row.count;
                    } else {
                        results['OTHER'] += row.count;
                    }
                });

                return results;
            });
        });
    }

    deleteAllJobs() {
        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.deleteMany({});
            })
        })
    }

    deleteAllOldJobs(days = 60) {
        let expiryDate = new Date();
        expiryDate.setDate(expiryDate.getDate() - days);

        return this.getConnection().then(con => {
            return this.getCollection(con.db(), 'job').then(job => {
                return job.deleteMany({
                    created: {
                        $lte: expiryDate
                    }
                });
            })
        })
    }
}

module.exports = DatabaseLayer;