Repository URL to install this package:
|
Version:
2.0.0 ▾
|
const { fork } = require('child_process');
const uuidv4 = require('uuid/v4');
const moment = require('moment-timezone');
const path = require('path');
class SimpleJobQueues {
constructor(db, jobsDirectory, dumpLogsToConsole = true) {
this.db = db;
this.jobsDirectory = jobsDirectory;
this.dumpLogsToConsole = dumpLogsToConsole;
this.queueProcesses = {}; // Keeps track of which processes are running for each queue id
this.otherProcesses = []; // Keeps track of processes for jobs without a queue (queueId = null)
this.processByJobId = {}; // Keeps track of the process for each running jobId
// New Id structure. <date>-<instance>-<incrementing number>
this.instanceId = uuidv4().substring(0, 8);
this.idCounter = 1; // This gets reset to 1 when it is a new date
this.latestDate = null;
this.jobAssignerCount = 0;
this.triggerJobAssignment = true;
this.jobAssignerActive = false;
this.shuttingDown = false;
}
start() {
// Get the jobs which are in running state and log that they errored. This container is designed to only run 1 instance.
return this.db.getJobsStartingOrRunning().then(jobs => {
let promise = Promise.resolve();
jobs.map(job => {
promise = promise.then(() => {
console.warn('Detected job at starting/running status', job.jobId);
return this.db.errorJob(job.jobId, 'Found job in starting/running state upon startup. Something violently SIGKILLed the process running this job.');
});
});
return promise;
}).then(() => {
console.log('Starting up job assigner');
this.jobAssigner = setInterval(() => {
if ((this.triggerJobAssignment && this.jobAssignerCount > 2) || this.jobAssignerCount > 30) {
this.jobAssignerCount = 0;
this.triggerJobAssignment = false;
this.assignJobs();
} else {
// console.log("Not assigning any jobs yet, waiting: " + this.jobAssignerCount);
this.jobAssignerCount++;
}
}, 1000);
this.jobAssignerActive = true;
})
}
assignJobs() {
// We cannot just shutdown the job assigner now since a job might spawn more child jobs.
// The trick is that during shutdown mode, the assigner should only assign processes to child jobs
// Get all jobs with null queueId with status JOB_CREATED (0) and just execute them now
this.db.getJobsWaitingWithNoQueue().then(jobs => {
jobs.map(job => {
if (!this.shuttingDown || job.parentJobId !== null) {
this.startJob(job);
}
})
});
// Get the oldest job per queueId from all jobs with status JOB_CREATED (0)
this.db.getJobsWaitingByQueue().then(jobs => {
// console.log("JOBS IN QUEUE", jobs);
Object.keys(jobs).map(queueId => {
if (!(queueId in this.queueProcesses)) {
this.queueProcesses[queueId] = null;
}
if (this.queueProcesses[queueId] === null) {
let queueJobs = jobs[queueId];
let jobToExecute = null;
queueJobs.map(job => {
// Only execute first appropriate job per queue
if (jobToExecute === null && (!this.shuttingDown || job.parentJobId !== null)) {
jobToExecute = job;
}
});
if (jobToExecute !== null) {
this.startJob(jobToExecute);
}
} else {
// Process exists
// We detect the disconnect event on a child process so we can be sure that it is still running
}
})
})
}
startJob(job) {
// We cannot start a job which is a child of some other job which is no longer executing
let parentJobId = job.parentJobId;
if (parentJobId !== null) {
if (!(parentJobId in this.processByJobId)) {
// We cannot run this job because the parent job is dead
this.fireJobError(job.jobId, 'We cannot start this job because its parent job (' + job.parentJobId + ') is dead.');
this.triggerJobAssignment = true;
return;
}
}
console.log('Starting job', job.jobId, 'for queue', job.queueId);
let childProcess = fork(path.join(__dirname, 'forkedProcess.js'), [this.jobsDirectory]);
this.fireJobStarting(job, childProcess);
childProcess.on('message', (msg) => {
this.responseFromJobProcess(msg, childProcess);
});
childProcess.on('disconnect', () => {
console.log('Job process died', job.jobId, 'for queue', job.queueId, 'process', childProcess.pid);
if (job.queueId === null) {
var idx = this.otherProcesses.indexOf(childProcess);
if (idx !== -1) {
this.otherProcesses.splice(idx, 1);
} else {
console.warn('Could not find process in otherProcesses array', childProcess.pid);
}
} else {
this.queueProcesses[job.queueId] = null;
this.triggerJobAssignment = true;
}
delete this.processByJobId[job.jobId];
});
childProcess.send({
type: 'runJobThenDie',
jobId: job.jobId,
jobType: job.jobType,
jobParams: job.jobParams,
});
}
stop() {
this.shuttingDown = true;
return new Promise((resolve, reject) => {
let checkCount = 0;
let checkInterval = setInterval(() => {
checkCount++;
let runningJobs = 0
this.otherProcesses.map(process => {
runningJobs++;
})
Object.keys(this.queueProcesses).map(queueId => {
if (this.queueProcesses[queueId] !== null) {
runningJobs++;
}
});
if (checkCount % 10 === 0) {
console.log('Job processes still running: ' + runningJobs);
}
if (runningJobs === 0) {
console.log('Stopping the job assigner....');
clearInterval(this.jobAssigner);
this.jobAssignerActive = false;
clearInterval(checkInterval);
resolve('OK');
}
}, 500);
})
}
generateJobId() {
let currentDate = moment().tz('Europe/Stockholm').format('YYYYMMDD');
if (this.latestDate !== currentDate) {
this.latestDate = currentDate;
this.idCounter = 1;
}
let id = this.latestDate + '-' + this.instanceId + '-' + this.idCounter;
this.idCounter++;
return id;
}
queueJob(queueId, jobType, jobParams, parentJobId = null) {
let jobId = this.generateJobId();
return this.queueJobAsId(jobId, queueId, jobType, jobParams, parentJobId);
}
queueJobAsId(jobId, queueId, jobType, jobParams, parentJobId = null) {
console.log('Adding job', jobId, queueId, jobType, parentJobId);
return this.db.insertJob(jobId, queueId, jobType, jobParams, parentJobId).then(() => {
this.triggerJobAssignment = true;
return jobId;
})
}
getAllJobs() {
// Gets all jobs in the last week
return this.db.getAllJobs(24 * 7);
}
getJobById(id) {
return this.db.getJobById(id);
}
getStats() {
// Gets the number of jobs at each status over the past 8 hours
return this.db.getJobCounts(8).then(statusStats => {
return {
jobAssignerActive: this.jobAssignerActive,
shuttingDown: this.shuttingDown,
status: statusStats,
}
});
}
responseFromJobProcess(msg, childProcess) {
if ('type' in msg && 'jobId' in msg) {
let jobId = msg.jobId;
if (msg.type === 'updateJobAddLog') {
if ('logData' in msg) {
if (!('data' in msg.logData) || ((typeof msg.logData.data) !== 'object')) {
console.warn('Expecting data object to exist in - updateJobAddLog', msg);
} else {
this.db.addJobLogItem(jobId, msg.logData);
if (this.dumpLogsToConsole && 'str' in msg.logData.data) {
console.log(new Date().toISOString(), jobId, msg.logData.data.str);
}
}
} else {
console.warn('Bad response from child process - updateJobAddLog', msg);
}
} else if (msg.type === 'runningJob') {
this.fireJobRunning(jobId);
} else if (msg.type === 'updateJobState') {
if ('state' in msg) {
this.fireJobStateUpdated(jobId, msg.state);
} else {
console.warn('Bad response from child process - updateJobState', msg);
}
} else if (msg.type === 'errorJob') {
if ('error' in msg) {
this.fireJobError(jobId, msg.error);
} else {
console.warn('Bad response from child process - errorJob', msg);
}
} else if (msg.type === 'successJob') {
if ('success' in msg) {
this.fireJobSuccess(jobId, msg.success);
} else {
console.warn('Bad response from child process - successJob', msg);
}
} else if (msg.type === 'queueChildJob') {
if ('queueId' in msg && 'jobType' in msg && 'jobParams' in msg && 'parentJobId' in msg) {
this.queueJobAsId(msg.jobId, msg.queueId, msg.jobType, msg.jobParams, msg.parentJobId);
}
} else {
console.warn('Bad response from child process', msg);
}
} else {
console.warn('Bad response from child process', msg);
}
}
getParentJobId(jobId) {
// 2 dashes mean no parent, more than that and we can remove everything after the last dash
let parts = jobId.split('-');
if (parts.length < 4) {
return null;
} else {
parts.pop();
return parts.join('-');
}
}
fireJobStarting(job, childProcess) {
// Track the process
let queueId = job.queueId;
if (queueId === null) {
this.otherProcesses.push(childProcess);
} else {
this.queueProcesses[queueId] = childProcess;
}
// Also track the process by jobId
this.processByJobId[job.jobId] = childProcess;
// Store in database
this.db.startingJob(job.jobId, childProcess.pid);
// Notify the parent job process
let parentJobId = this.getParentJobId(job.jobId);
if (parentJobId !== null) {
if (parentJobId in this.processByJobId) {
this.processByJobId[parentJobId].send({ type: 'startingJob', jobId: job.jobId, processId: childProcess.pid });
}
}
}
fireJobRunning(jobId) {
this.db.runningJob(jobId);
// Notify the parent job process
let parentJobId = this.getParentJobId(jobId);
if (parentJobId !== null) {
if (parentJobId in this.processByJobId) {
this.processByJobId[parentJobId].send({ type: 'runningJob', jobId: jobId });
}
}
}
fireJobStateUpdated(jobId, newState) {
this.db.updateJobState(jobId, newState);
// Notify the parent job process
let parentJobId = this.getParentJobId(jobId);
if (parentJobId !== null) {
if (parentJobId in this.processByJobId) {
this.processByJobId[parentJobId].send({ type: 'updateJobState', jobId: jobId, state: newState });
}
}
}
fireJobError(jobId, error) {
this.db.errorJob(jobId, error);
// Notify the parent job process
let parentJobId = this.getParentJobId(jobId);
if (parentJobId !== null) {
if (parentJobId in this.processByJobId) {
this.processByJobId[parentJobId].send({ type: 'errorJob', jobId: jobId, error: error });
}
}
}
fireJobSuccess(jobId, success) {
this.db.successJob(jobId, success);
// Notify the parent job process
let parentJobId = this.getParentJobId(jobId);
if (parentJobId !== null) {
if (parentJobId in this.processByJobId) {
this.processByJobId[parentJobId].send({ type: 'successJob', jobId: jobId, success: success });
}
}
}
}
module.exports = SimpleJobQueues;