Repository URL to install this package:
|
Version:
3.1.8 ▾
|
const { fork } = require('child_process');
const uuidv4 = require('uuid/v4');
const moment = require('moment-timezone');
const path = require('path');
const LogWriter = require('./logWriter');
class SimpleJobQueues {
constructor(db, jobsDirectory, eventHandler = null) {
this.db = db;
this.jobsDirectory = jobsDirectory;
this.eventHandler = eventHandler;
this.queueProcesses = {}; // Keeps track of which processes are running for each queue id
this.otherProcesses = []; // Keeps track of processes for jobs without a queue (queueId = null)
this.processByJobId = {}; // Keeps track of the process for each running jobId
this.logWriters = {}; // Keeps track of all job logWriters
// New Id structure. <date>-<instance>-<incrementing number>
this.instanceId = uuidv4().substring(0, 8);
this.idCounter = 1; // This gets reset to 1 when it is a new date
this.latestDate = null;
this.jobAssignerCount = 0;
this.triggerJobAssignment = true;
this.jobAssignerActive = false; // Whether the queue is started
this.shuttingDown = false; // Whether the queue is stopping
}
fireEvent(eventType, eventMeta = {}) {
if ((typeof this.eventHandler) === 'function') {
this.eventHandler(eventType, Object.assign({
ts: new Date()
}, eventMeta));
}
}
start() {
// Get the jobs which are in running state and log that they errored. This container is designed to only run 1 instance.
return this.db.getJobsStartingOrRunning().then(jobs => {
let promise = Promise.resolve();
jobs.map(job => {
promise = promise.then(() => {
this.fireEvent('startupJobCleanupWarning', {
jobId: job.id,
});
return this.db.errorJob(job.id, 'Found job in starting/running state upon startup. Something violently SIGKILLed the process running this job.');
});
});
return promise;
}).then(() => {
this.fireEvent('startup');
this.jobAssigner = setInterval(() => {
if ((this.triggerJobAssignment && this.jobAssignerCount > 2) || this.jobAssignerCount > 30) {
this.jobAssignerCount = 0;
this.triggerJobAssignment = false;
this.assignJobs();
} else {
// console.log("Not assigning any jobs yet, waiting: " + this.jobAssignerCount);
this.jobAssignerCount++;
}
}, 1000);
this.jobAssignerActive = true;
})
}
assignJobs() {
// We cannot just shutdown the job assigner now since a job might spawn more child jobs.
// The trick is that during shutdown mode, the assigner should only assign processes to child jobs
// Get all jobs with null queueId with status JOB_CREATED (0) and just execute them now
this.db.getJobsWaitingWithNoQueue().then(jobs => {
jobs.map(job => {
if (!this.shuttingDown || job.parentJobId !== null) {
this.startJob(job);
}
})
});
// Get the oldest job per queueId from all jobs with status JOB_CREATED (0)
this.db.getJobsWaitingByQueue().then(jobs => {
// console.log("JOBS IN QUEUE", jobs);
Object.keys(jobs).map(queueId => {
if (!(queueId in this.queueProcesses)) {
this.queueProcesses[queueId] = null;
}
if (this.queueProcesses[queueId] === null) {
let queueJobs = jobs[queueId];
let jobToExecute = null;
queueJobs.map(job => {
// Only execute first appropriate job per queue
if (jobToExecute === null && (!this.shuttingDown || job.parentJobId !== null)) {
jobToExecute = job;
}
});
if (jobToExecute !== null) {
this.startJob(jobToExecute);
}
} else {
// Process exists
// We detect the disconnect event on a child process so we can be sure that it is still running
}
})
})
}
startJob(job) {
// We cannot start a job which is a child of some other job which is no longer executing
let parentJobId = job.parentJobId;
if (parentJobId !== null) {
if (!(parentJobId in this.processByJobId)) {
// We cannot run this job because the parent job is dead
this.fireJobError(job.id, 'We cannot start this job because its parent job (' + job.parentJobId + ') is dead.');
this.triggerJobAssignment = true;
return;
}
}
let childProcess = fork(path.join(__dirname, 'forkedProcess.js'), [this.jobsDirectory]);
this.fireJobStarting(job, childProcess);
childProcess.on('message', (msg) => {
this.responseFromJobProcess(msg, childProcess);
});
childProcess.on('disconnect', () => {
this.fireEvent('jobProcessDied', {
jobId: job.id,
queueId: job.queueId,
processId: childProcess.pid,
});
if (job.queueId === null) {
var idx = this.otherProcesses.indexOf(childProcess);
if (idx !== -1) {
this.otherProcesses.splice(idx, 1);
} else {
this.fireEvent('warning', {
message: 'Could not find process in otherProcesses array',
processId: childProcess.pid,
});
}
} else {
this.queueProcesses[job.queueId] = null;
this.triggerJobAssignment = true;
}
delete this.processByJobId[job.id];
});
childProcess.send({
type: 'runJob',
jobId: job.id,
jobType: job.jobType,
jobParams: job.jobParams,
});
}
stop() {
if (this.shuttingDown) {
return Promise.reject(new Error('Queue is already shutting down, please wait...'));
}
this.shuttingDown = true;
this.fireEvent('shutdownStarted');
return new Promise((resolve, reject) => {
let checkCount = 0;
let checkInterval = setInterval(() => {
checkCount++;
let runningJobs = 0
this.otherProcesses.map(process => {
runningJobs++;
})
Object.keys(this.queueProcesses).map(queueId => {
if (this.queueProcesses[queueId] !== null) {
runningJobs++;
}
});
if (checkCount % 10 === 0) {
this.fireEvent('shutdownWaiting', {
jobsLeft: runningJobs,
});
}
if (runningJobs === 0) {
clearInterval(this.jobAssigner);
this.jobAssignerActive = false;
clearInterval(checkInterval);
this.fireEvent('shutdownFinished');
this.shuttingDown = false;
resolve('OK');
}
}, 500);
})
}
generateJobId() {
let currentDate = moment().tz('Europe/Stockholm').format('YYYYMMDD');
if (this.latestDate !== currentDate) {
this.latestDate = currentDate;
this.idCounter = 1;
}
let id = this.latestDate + '-' + this.instanceId + '-' + this.idCounter;
this.idCounter++;
return id;
}
queueJob(queueId, jobType, jobParams, parentJobId = null) {
let jobId = this.generateJobId();
return this.queueJobAsId(jobId, queueId, jobType, jobParams, parentJobId);
}
queueJobAsId(jobId, queueId, jobType, jobParams, parentJobId = null) {
this.fireEvent('addedJob', {
jobId,
queueId,
jobType,
parentJobId,
});
return this.db.insertJob(jobId, queueId, jobType, jobParams, parentJobId).then(() => {
this.triggerJobAssignment = true;
return jobId;
})
}
async retryJob(jobId) {
const job = await this.db.getJobById(jobId);
// The job must be in the error state
if (job.status !== 4) { // 4 = JOB_ERROR
throw new Error('Cannot retry this job, it is not in a failed state: ' + job.status);
}
// The job must not be a child of some other job
if (job.parentJobId !== null) {
throw new Error('Cannot retry a child job individually since unknown things could happen. You need to retry the whole parent job');
}
// Create a new job using the params of this failed job
let newJobId = this.generateJobId();
return this.queueJobAsId(newJobId, job.queueId, job.jobType, job.jobParams, null);
}
/*
getAllJobs() {
// Gets all jobs in the last week
return this.db.getAllJobs(24 * 7);
}
*/
getJobs(lastId = null, reverse = false) {
return this.db.getJobs(lastId, reverse);
}
getJobById(id) {
return this.db.getJobById(id);
}
getStats() {
// Gets the number of jobs at each status over the past 8 hours
return this.db.getJobCounts().then(statusStats => {
return {
jobAssignerActive: this.jobAssignerActive,
shuttingDown: this.shuttingDown,
status: statusStats,
}
});
}
deleteAllJobs() {
if (this.jobAssignerActive || this.shuttingDown) {
return Promise.reject(new Error('You must stop the queue first before you delete all the jobs'));
} else {
return this.db.deleteAllJobs();
}
}
addJobLogItemAsync(jobId, logData) {
// We create an async log writer which updates the mongodb every 5 seconds for each job with ALL the pending lines
if (!(jobId in this.logWriters)) {
this.logWriters[jobId] = new LogWriter(this.db, jobId);
this.logWriters[jobId].start();
}
this.logWriters[jobId].addLogItem(logData);
}
responseFromJobProcess(msg, childProcess) {
if ('type' in msg && 'jobId' in msg) {
let jobId = msg.jobId;
if (msg.type === 'updateJobAddLog') {
if ('logData' in msg) {
if (!('data' in msg.logData) || ((typeof msg.logData.data) !== 'object')) {
this.fireEvent('badJobMessageReceived', {
jobId: jobId,
type: 'updateJobAddLog',
data: msg,
});
} else {
// This is TOO inefficient
// this.db.addJobLogItem(jobId, msg.logData);
this.addJobLogItemAsync(jobId, msg.logData);
if ('str' in msg.logData.data) {
this.fireEvent('logItemString', {
jobId: jobId,
message: msg.logData.data.str
});
} else {
this.fireEvent('logItemOther', {
jobId: jobId,
data: msg.logData.data
});
}
}
} else {
this.fireEvent('badJobMessageReceived', {
jobId: jobId,
type: 'updateJobAddLog',
data: msg,
});
}
} else if (msg.type === 'runningJob') {
this.fireJobRunning(jobId);
} else if (msg.type === 'updateJobState') {
if ('state' in msg) {
this.fireJobStateUpdated(jobId, msg.state);
} else {
this.fireEvent('badJobMessageReceived', {
jobId: jobId,
type: 'updateJobState',
data: msg,
});
}
} else if (msg.type === 'errorJob') {
if ('error' in msg && 'duration' in msg) {
this.fireJobError(jobId, msg.error, msg.duration);
} else {
this.fireEvent('badJobMessageReceived', {
jobId: jobId,
type: 'errorJob',
data: msg,
});
}
} else if (msg.type === 'successJob') {
if ('success' in msg && 'duration' in msg) {
this.fireJobSuccess(jobId, msg.success, msg.duration);
} else {
this.fireEvent('badJobMessageReceived', {
jobId: jobId,
type: 'successJob',
data: msg,
});
}
} else if (msg.type === 'queueChildJob') {
if ('queueId' in msg && 'jobType' in msg && 'jobParams' in msg && 'parentJobId' in msg) {
this.queueJobAsId(msg.jobId, msg.queueId, msg.jobType, msg.jobParams, msg.parentJobId);
} else {
this.fireEvent('badJobMessageReceived', {
jobId: jobId,
type: 'queueChildJob',
data: msg,
});
}
} else if (msg.type === 'reportProgress') {
if ('progress' in msg) {
this.fireJobProgressReport(jobId, msg.progress);
} else {
this.fireEvent('badJobMessageReceived', {
jobId: jobId,
type: 'reportProgress',
data: msg,
});
}
} else {
this.fireEvent('badJobMessageReceived', {
jobId: jobId,
type: 'unknown',
data: msg,
});
}
} else {
this.fireEvent('badJobMessageReceived', {
message: 'Bad response from child process',
data: msg,
});
}
}
getParentJobId(jobId) {
// 2 dashes mean no parent, more than that and we can remove everything after the last dash
let parts = jobId.split('-');
if (parts.length < 4) {
return null;
} else {
parts.pop();
return parts.join('-');
}
}
fireJobStarting(job, childProcess) {
this.fireEvent('jobStarting', {
jobId: job.id,
queueId: job.queueId,
processId: childProcess.pid,
});
// Track the process
let queueId = job.queueId;
if (queueId === null) {
this.otherProcesses.push(childProcess);
} else {
this.queueProcesses[queueId] = childProcess;
}
// Also track the process by jobId
this.processByJobId[job.id] = childProcess;
// Store in database
this.db.startingJob(job.id, childProcess.pid);
// Notify the parent job process
let parentJobId = this.getParentJobId(job.id);
if (parentJobId !== null) {
if (parentJobId in this.processByJobId) {
this.processByJobId[parentJobId].send({ type: 'startingJob', jobId: job.id, processId: childProcess.pid });
}
}
}
fireJobRunning(jobId) {
this.fireEvent('jobRunning', {
jobId: jobId,
});
this.db.runningJob(jobId);
// Notify the parent job process
let parentJobId = this.getParentJobId(jobId);
if (parentJobId !== null) {
if (parentJobId in this.processByJobId) {
this.processByJobId[parentJobId].send({ type: 'runningJob', jobId: jobId });
}
}
}
fireJobStateUpdated(jobId, newState) {
this.fireEvent('jobStateUpdated', {
jobId: jobId,
newState: newState,
});
this.db.updateJobState(jobId, newState);
// Notify the parent job process
let parentJobId = this.getParentJobId(jobId);
if (parentJobId !== null) {
if (parentJobId in this.processByJobId) {
this.processByJobId[parentJobId].send({ type: 'updateJobState', jobId: jobId, state: newState });
}
}
}
fireJobProgressReport(jobId, progress) {
this.fireEvent('jobProgressReport', {
jobId: jobId,
progress: progress,
});
this.db.updateJobProgress(jobId, progress);
// Notify the parent job process
let parentJobId = this.getParentJobId(jobId);
if (parentJobId !== null) {
if (parentJobId in this.processByJobId) {
this.processByJobId[parentJobId].send({ type: 'reportProgress', jobId: jobId, progress: progress });
}
}
}
fireJobError(jobId, error, duration) {
this.fireEvent('jobError', {
jobId: jobId,
error: error,
duration: duration,
});
this.db.errorJob(jobId, error);
// Notify the parent job process
let parentJobId = this.getParentJobId(jobId);
if (parentJobId !== null) {
if (parentJobId in this.processByJobId) {
this.processByJobId[parentJobId].send({ type: 'errorJob', jobId: jobId, error: error });
}
}
// Send kill signal 1 to the job
// This is necessary because jobs processes don't automatically die unless told to by the queue.
if (jobId in this.processByJobId) {
this.processByJobId[jobId].send({ type: 'killJob', exitCode: 1 });
}
}
fireJobSuccess(jobId, success, duration) {
this.fireEvent('jobSuccess', {
jobId: jobId,
success: success,
duration: duration,
});
this.db.successJob(jobId, success);
// Notify the parent job process
let parentJobId = this.getParentJobId(jobId);
if (parentJobId !== null) {
if (parentJobId in this.processByJobId) {
this.processByJobId[parentJobId].send({ type: 'successJob', jobId: jobId, success: success });
}
}
// Send kill signal 0 to the job
// This is necessary because jobs processes don't automatically die unless told to by the queue.
if (jobId in this.processByJobId) {
this.processByJobId[jobId].send({ type: 'killJob', exitCode: 0 });
}
}
}
module.exports = SimpleJobQueues;