Repository URL to install this package:
|
Version:
3.1.8 ▾
|
const MongoClient = require('mongodb').MongoClient;
const ObjectId = require('mongodb').ObjectID
const JOB_CREATED = 0;
const JOB_STARTING = 1;
const JOB_RUNNING = 2;
const JOB_SUCCESS = 3;
const JOB_ERROR = 4;
class DatabaseLayer {
constructor(mongoDbUrl) {
this.mongoDbUrl = mongoDbUrl;
this.currentConnection = null;
// To expose externally
this.JOB_CREATED = JOB_CREATED;
this.JOB_STARTING = JOB_STARTING;
this.JOB_RUNNING = JOB_RUNNING;
this.JOB_SUCCESS = JOB_SUCCESS;
this.JOB_ERROR = JOB_ERROR;
}
getConnection() {
return new Promise((resolve, reject) => {
if (this.currentConnection !== null) {
// Even if the connection is dropped, it should auto reconnect by default
resolve(this.currentConnection);
} else {
MongoClient.connect(this.mongoDbUrl, {
useUnifiedTopology: true,
useNewUrlParser: true,
}, (err, db) => {
if (err) {
reject(err);
} else {
this.currentConnection = db;
resolve(db);
}
});
}
});
}
getCollection(con, name) {
return new Promise((resolve, reject) => {
con.collection(name, (err, col) => {
if (err) {
reject(err);
} else {
resolve(col);
}
});
});
}
_genericUpdateOne(collectionName, filter, updates, options) {
return this.getConnection().then(con => {
return this.getCollection(con.db(), collectionName).then(repo => {
return repo.updateOne(filter, updates, options).then(ret => {
if (!('result' in ret)) {
console.error(ret);
throw new Error('Unexpected mongodb response');
}
if (ret.result.ok !== 1) {
throw new Error('Expected mongodb result.ok to be 1 but was: ' + ret.result.ok);
}
// console.log("Result was", ret.result);
return ret.result;
});
})
});
}
_genericInsertOne(collectionName, document) {
return this.getConnection().then(con => {
return this.getCollection(con.db(), collectionName).then(repo => {
return repo.insertOne(document).then(ret => {
if ('insertedCount' in ret && ret.insertedCount === 1 && 'insertedId' in ret) {
// OK
return ret.insertedId;
}
console.error(ret);
throw new Error('Unexpected mongodb response');
});
})
});
}
_genericPagination(collectionName, lastId = null, numResults = 5, getAfterLastId = false) {
return this.getConnection().then(con => {
return this.getCollection(con.db(), collectionName).then(repo => {
let query = {};
if (lastId !== null && lastId !== '0') {
if (getAfterLastId) {
query['_id'] = { $gt: ObjectId(lastId) };
} else {
query['_id'] = { $lt: ObjectId(lastId) };
}
}
let cursor = repo.find(query).sort({
_id: getAfterLastId ? 1 : -1,
});
return cursor.count().then(cursorSize => {
return Promise.resolve(cursor.limit(numResults).toArray()).then(results => {
// Format the results using some generic pagination format
let lastId = null;
if (results.length > 0) {
lastId = results[results.length - 1]._id;
}
return {
results: results,
lastId: lastId,
totalResults: cursorSize,
moreResults: cursorSize > numResults,
}
})
})
})
});
}
insertJob(jobId, queueId, jobType, jobParams, parentJobId) {
return this._genericInsertOne('job', {
id: jobId,
queueId: queueId,
jobType: jobType,
jobParams: jobParams,
parentJobId: parentJobId,
status: 0,
state: {},
logs: [],
created: new Date(),
latestUpdate: new Date(),
processId: null,
});
}
updateJobState(jobId, newState) {
// Overwrites the current state with new state object
return this._genericUpdateOne('job', {
id: jobId,
}, {
$set: {
state: newState,
latestUpdate: new Date(),
},
});
}
updateJobProgress(jobId, progress) {
// Overwrites the current progress with new progress data
return this._genericUpdateOne('job', {
id: jobId,
}, {
$set: {
progress: progress,
latestUpdate: new Date(),
},
});
}
addJobLogItem(jobId, logItem) {
// Appends the logItem to the logs key
return this._genericUpdateOne('job', {
id: jobId,
}, {
$push: {
logs: logItem
},
$set: {
latestUpdate: new Date(),
},
});
}
addManyJobLogItems(jobId, manyLogItems) {
// Appends ALL the logItems to the logs key
// console.log('Pushing', manyLogItems.length, 'log items at once to jobId', jobId);
return this._genericUpdateOne('job', {
id: jobId,
}, {
$push: {
logs: {
$each: manyLogItems
}
},
$set: {
latestUpdate: new Date(),
},
}).then((result) => {
// console.log('DONE');
return result;
});
}
startingJob(jobId, processId) {
// Updates the job status
return this._genericUpdateOne('job', {
id: jobId,
}, {
$set: {
status: this.JOB_STARTING,
latestUpdate: new Date(),
processId: processId,
},
});
}
runningJob(jobId) {
// Updates the job status
return this._genericUpdateOne('job', {
id: jobId,
}, {
$set: {
status: this.JOB_RUNNING,
latestUpdate: new Date(),
},
});
}
errorJob(jobId, error) {
// Updates the job status and error in the state
return this._genericUpdateOne('job', {
id: jobId,
}, {
$set: {
status: this.JOB_ERROR,
latestUpdate: new Date(),
'state.error': error,
},
});
}
successJob(jobId, success) {
// Updates the job status and success in the state
return this._genericUpdateOne('job', {
id: jobId,
}, {
$set: {
status: this.JOB_SUCCESS,
latestUpdate: new Date(),
'state.success': success,
},
});
}
/*
getAllJobs(sinceHoursAgo) {
let hoursAgo = new Date();
hoursAgo.setHours(hoursAgo.getHours() - sinceHoursAgo);
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.find({
created: {
$gt: hoursAgo,
}
}, { sort: [
['created', 1]
] }).toArray();
});
});
}
*/
getJobs(lastId = null, reverse = false) {
return this._genericPagination('job', lastId, 50, reverse);
}
// Used by queue.assignJobs
getJobsWaitingByQueue() {
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.find({
status: this.JOB_CREATED,
}, { sort: [
['created', 1]
] }).toArray();
}).then(jobs => {
let nextJobs = {};
jobs.map(job => {
if ('queueId' in job && job.queueId !== null) {
if (!(job.queueId in nextJobs)) {
nextJobs[job.queueId] = [];
}
nextJobs[job.queueId].push(job);
}
});
return nextJobs;
})
});
}
// Used by queue.assignJobs
getJobsWaitingWithNoQueue() {
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.find({
status: JOB_CREATED,
queueId: null,
}, { sort: [
['created', 1]
] }).toArray();
});
});
}
// Used by queue.start during the startup process to error any jobs marked as running or starting
getJobsStartingOrRunning() {
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.find({
status: {
$in: [this.JOB_STARTING, this.JOB_RUNNING],
}
}).toArray();
});
});
}
// Used by queue.getJobById
getJobById(jobId) {
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.find({ id: jobId }).toArray();
}).then(result => {
if (result.length === 0) {
throw new Error('Could not find job: ' + jobId);
} else if (result.length === 1) {
return result[0];
} else {
throw new Error('Multiple jobs found with id: ' + jobId);
}
});
});
}
// Used by queue.getStats
getJobCounts() {
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.aggregate(
[
{
$group: {
_id: {
status: '$status',
},
count: {
$sum: 1
}
}
}
]
).toArray();
}).then(result => {
let statusTexts = {}
statusTexts[this.JOB_CREATED] = 'JOB_CREATED';
statusTexts[this.JOB_STARTING] = 'JOB_STARTING';
statusTexts[this.JOB_RUNNING] = 'JOB_RUNNING';
statusTexts[this.JOB_SUCCESS] = 'JOB_SUCCESS';
statusTexts[this.JOB_ERROR] = 'JOB_ERROR';
let results = {
'OTHER': 0,
}
Object.keys(statusTexts).map(statusId => {
results[statusTexts[statusId]] = 0;
})
result.map(row => {
let rowStatusId = row['_id'].status;
if (rowStatusId in statusTexts) {
results[statusTexts[rowStatusId]] = row.count;
} else {
results['OTHER'] += row.count;
}
});
return results;
});
});
}
deleteAllJobs() {
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.deleteMany({});
})
})
}
deleteAllOldJobs(days = 60) {
let expiryDate = new Date();
expiryDate.setDate(expiryDate.getDate() - days);
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.deleteMany({
created: {
$lte: expiryDate
}
});
})
})
}
}
module.exports = DatabaseLayer;