Repository URL to install this package:
|
Version:
2.0.0 ▾
|
const MongoClient = require('mongodb').MongoClient;
const JOB_CREATED = 0;
const JOB_STARTING = 1;
const JOB_RUNNING = 2;
const JOB_SUCCESS = 3;
const JOB_ERROR = 4;
class DatabaseLayer {
constructor(mongoDbUrl) {
this.mongoDbUrl = mongoDbUrl;
this.currentConnection = null;
this.JOB_CREATED = JOB_CREATED;
this.JOB_STARTING = JOB_STARTING;
this.JOB_RUNNING = JOB_RUNNING;
this.JOB_SUCCESS = JOB_SUCCESS;
this.JOB_ERROR = JOB_ERROR;
}
getConnection() {
return new Promise((resolve, reject) => {
if (this.currentConnection !== null) {
// Even if the connection is dropped, it should auto reconnect by default
resolve(this.currentConnection);
} else {
MongoClient.connect(this.mongoDbUrl, { useNewUrlParser: true }, (err, db) => {
if (err) {
reject(err);
} else {
this.currentConnection = db;
resolve(db);
}
});
}
});
}
getCollection(con, name) {
return new Promise((resolve, reject) => {
con.collection(name, (err, col) => {
if (err) {
reject(err);
} else {
resolve(col);
}
});
});
}
insertJob(jobId, queueId, jobType, jobParams, parentJobId) {
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.insertOne({
_id: jobId,
jobId: jobId,
queueId: queueId,
jobType: jobType,
jobParams: jobParams,
parentJobId: parentJobId,
status: 0,
state: {},
logs: [],
created: new Date(),
latestUpdate: new Date(),
processId: null,
}).then(ret => {
if (!('result' in ret)) {
console.error(ret);
throw new Error('Unexpected mongodb response');
}
if (ret.result.ok !== 1) {
throw new Error('Expected mongodb result.ok to be 1 but was: ' + ret.result.ok);
}
return jobId;
});
})
});
}
updateJobState(jobId, newState) {
// Overwrites the current state with new state object
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.updateOne({
_id: jobId,
}, {
$set: {
state: newState,
latestUpdate: new Date(),
},
});
})
});
}
addJobLogItem(jobId, logItem) {
// Appends the logItem to the logs key
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.updateOne({
_id: jobId,
}, {
$push: {
logs: logItem
},
$set: {
latestUpdate: new Date(),
},
});
})
});
}
startingJob(jobId, processId) {
// Updates the job status
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.updateOne({
_id: jobId,
}, {
$set: {
status: this.JOB_STARTING,
latestUpdate: new Date(),
processId: processId,
},
});
})
});
}
runningJob(jobId) {
// Updates the job status
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.updateOne({
_id: jobId,
}, {
$set: {
status: this.JOB_RUNNING,
latestUpdate: new Date(),
},
});
})
});
}
errorJob(jobId, error) {
// Updates the job status
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.updateOne({
_id: jobId,
}, {
$set: {
status: this.JOB_ERROR,
latestUpdate: new Date(),
'state.error': error,
},
});
})
});
}
successJob(jobId, success) {
// Updates the job status
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.updateOne({
_id: jobId,
}, {
$set: {
status: this.JOB_SUCCESS,
latestUpdate: new Date(),
'state.success': success,
},
});
})
});
}
getAllJobs(sinceHoursAgo) {
let hoursAgo = new Date();
hoursAgo.setHours(hoursAgo.getHours() - sinceHoursAgo);
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.find({
created: {
$gt: hoursAgo,
}
}, { sort: [
['created', 1]
] }).toArray();
});
});
}
getJobsWaitingByQueue() {
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.find({
status: this.JOB_CREATED,
}, { sort: [
['created', 1]
] }).toArray();
}).then(jobs => {
let nextJobs = {};
jobs.map(job => {
if ('queueId' in job && job.queueId !== null) {
if (!(job.queueId in nextJobs)) {
nextJobs[job.queueId] = [];
}
nextJobs[job.queueId].push(job);
}
});
return nextJobs;
})
});
}
getJobsWaitingWithNoQueue() {
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.find({
status: JOB_CREATED,
queueId: null,
}, { sort: [
['created', 1]
] }).toArray();
});
});
}
getJobsStartingOrRunning() {
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.find({
status: {
$in: [this.JOB_STARTING, this.JOB_RUNNING],
}
}).toArray();
}).then(jobs => {
jobs.map(job => {
job.jobId = job._id;
});
return jobs;
});
});
}
getJobById(jobId) {
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.find({ _id: jobId }).toArray();
}).then(result => {
if (result.length === 0) {
throw new Error('Could not find job: ' + jobId);
} else if (result.length === 1) {
return result[0];
} else {
throw new Error('Multiple jobs found with id: ' + jobId);
}
});
});
}
getJobCounts(hours) {
let hoursAgo = new Date();
hoursAgo.setHours(hoursAgo.getHours() - hours);
return this.getConnection().then(con => {
return this.getCollection(con.db(), 'job').then(job => {
return job.find({
created: {
$gt: hoursAgo,
}
}).toArray();
}).then(jobs => {
let counts = {
JOB_CREATED: 0,
JOB_STARTING: 0,
JOB_RUNNING: 0,
JOB_SUCCESS: 0,
JOB_ERROR: 0,
OTHER: 0,
};
jobs.map(job => {
if (job.status === this.JOB_CREATED) {
counts['JOB_CREATED']++;
} else if (job.status === this.JOB_STARTING) {
counts['JOB_STARTING']++;
} else if (job.status === this.JOB_RUNNING) {
counts['JOB_RUNNING']++;
} else if (job.status === this.JOB_SUCCESS) {
counts['JOB_SUCCESS']++;
} else if (job.status === this.JOB_ERROR) {
counts['JOB_ERROR']++;
} else {
counts['OTHER']++;
}
});
return counts;
});
});
}
}
module.exports = DatabaseLayer;