Repository URL to install this package:
|
Version:
2.0.0 ▾
|
const fs = require('fs');
// const uuidv4 = require('uuid/v4');
const process = require('process');
class JobProcess {
constructor() {
this.executingJob = false;
this.shuttingDown = false;
this.childJobs = {}; // Stores the state of a child job by jobId
}
startup() {
console.log('Child process starting up', process.pid);
process.on('message', (msg) => {
if ('type' in msg && msg.type === 'runJobThenDie') {
this.executingJob = true;
setTimeout(() => {
this.executeJob({
id: msg.jobId,
type: msg.jobType,
params: msg.jobParams,
}).then((returnCode) => {
process.exit(returnCode);
});
}, 1);
} else if ('type' in msg && msg.type === 'startingJob' && 'jobId' in msg && 'processId' in msg) {
this.handleStartingJob(msg.jobId, msg.processId);
} else if ('type' in msg && msg.type === 'runningJob' && 'jobId' in msg) {
this.handleRunningJob(msg.jobId);
} else if ('type' in msg && msg.type === 'updateJobState' && 'jobId' in msg && 'state' in msg) {
this.handleUpdateJobState(msg.jobId, msg.state);
} else if ('type' in msg && msg.type === 'errorJob' && 'jobId' in msg && 'error' in msg) {
this.handleErrorJob(msg.jobId, msg.error);
} else if ('type' in msg && msg.type === 'successJob' && 'jobId' in msg && 'success' in msg) {
this.handleSuccessJob(msg.jobId, msg.success);
} else if ('type' in msg && msg.type === 'shutdown') {
this.shuttingDown = true;
console.log('Graceful shutdown of job process...');
}
});
setTimeout(() => {
if (!this.executingJob) {
console.error('Job process has been running for 10 seconds doing nothing. Dying now.');
process.exit(2);
}
}, 10 * 1000);
// Don't really think that listening to disconnect will help
/*
process.on('disconnect', function() {
console.error('Parent exited, killing child');
clearInterval(this.tickInterval);
process.exit();
});
*/
process.on('SIGINT', function () {
this.shuttingDown = true;
// console.log("Child process received SIGINT signal");
});
process.on('SIGTERM', function () {
this.shuttingDown = true;
// console.log("Child process received SIGTERM signal");
});
}
reportJobLog(id, logData) {
process.send({ type: 'updateJobAddLog', jobId: id, logData: logData });
}
reportJobStatus(id, status) {
process.send({ type: 'updateJobStatus', jobId: id, status: status });
}
reportJobState(id, state) {
process.send({ type: 'updateJobState', jobId: id, state: state });
}
reportJobRunning(id) {
process.send({ type: 'runningJob', jobId: id });
}
reportJobError(id, error) {
process.send({ type: 'errorJob', jobId: id, error: error });
}
reportJobSuccess(id, success) {
process.send({ type: 'successJob', jobId: id, success: success });
}
handleCreatedJob(jobId, queueId, jobType, jobParams, parentJobId) {
this.childJobs[jobId] = {
jobId: jobId,
queueId: queueId,
jobType: jobType,
jobParams: jobParams,
parentJobId: parentJobId,
status: 0,
state: {},
logs: [],
created: new Date(),
latestUpdate: new Date(),
processId: null,
}
}
handleStartingJob(jobId, processId) {
if (jobId in this.childJobs) {
let job = this.childJobs[jobId];
job.status = 1;
job.latestUpdate = new Date();
job.processId = processId;
}
}
handleRunningJob(jobId) {
if (jobId in this.childJobs) {
let job = this.childJobs[jobId];
job.status = 2;
job.latestUpdate = new Date();
}
}
handleUpdateJobState(jobId, newState) {
if (jobId in this.childJobs) {
let job = this.childJobs[jobId];
job.state = newState;
job.latestUpdate = new Date();
}
}
handleErrorJob(jobId, error) {
if (jobId in this.childJobs) {
let job = this.childJobs[jobId];
job.status = 4;
job.latestUpdate = new Date();
job.state.error = error;
}
}
handleSuccessJob(jobId, success) {
if (jobId in this.childJobs) {
let job = this.childJobs[jobId];
job.status = 3;
job.latestUpdate = new Date();
job.state.success = success;
}
}
executeJob(job) {
this.reportJobRunning(job.id);
return Promise.resolve().then(() => {
return this.obtainJobPromise(job).then(success => {
if ((typeof success) === 'undefined') {
success = null;
}
this.reportJobSuccess(job.id, success);
return 0; // Process exit return code
})
}).catch(error => {
console.error(error);
this.reportJobError(job.id, {
name: error.name,
message: error.message,
stack: error.stack
});
return 1; // Process exit return code
})
}
obtainJobPromise(job) {
// Use the job type and params to obtain/create a promise which we can execute asynchronously
let nextChildJobId = 1;
let reporter = {
addLog: (logData) => {
if (typeof logData === 'string') {
logData = {
str: logData
}
}
this.reportJobLog(job.id, {
ts: new Date(),
data: logData,
});
},
status: (status) => {
this.reportJobStatus(job.id, status);
},
state: (state) => {
this.reportJobState(job.id, state);
},
queueChildJob: (queueId, jobType, jobParams) => {
let jobId = job.id + '-' + nextChildJobId++;
this.handleCreatedJob(jobId, queueId, jobType, jobParams, job.id);
process.send({ type: 'queueChildJob', jobId: jobId, queueId: queueId, jobType: jobType, jobParams: jobParams, parentJobId: job.id });
return jobId;
},
getChildJob: (jobId) => {
if (jobId in this.childJobs) {
return this.childJobs[jobId];
} else {
return null;
}
},
}
// Jobs are classes in files which are stored in the jobs directory.
// They need to be instantiated and executed
let jobsDirectory = process.argv[2];
let jobClassFile = jobsDirectory + '/' + job.type + '.js';
if (fs.existsSync(jobClassFile)) {
let JobClass = require(jobClassFile);
let jobInstance = new JobClass(job.id, job.params, reporter);
try {
jobInstance.assertJobType(job.type);
} catch (e) {
return Promise.reject(new Error('Invalid job type: ' + job.type + ' when using job class: ' + jobInstance.type));
}
return jobInstance.executeJob();
} else {
return Promise.reject(new Error('Could not find job class file at: ' + jobClassFile));
}
}
}
const jobProcess = new JobProcess();
jobProcess.startup();