Repository URL to install this package:
|
Version:
3.1.8 ▾
|
const fs = require('fs');
// const uuidv4 = require('uuid/v4');
const process = require('process');
class JobProcess {
constructor() {
this.executingJob = false;
this.shuttingDown = false;
this.childJobs = {}; // Stores the state of a child job by jobId
}
startup() {
// console.log('Child process starting up', process.pid);
process.on('message', (msg) => {
if ('type' in msg && msg.type === 'runJob') {
this.executingJob = true;
setTimeout(() => {
this.executeJob({
id: msg.jobId,
type: msg.jobType,
params: msg.jobParams,
}).then((returnCode) => {
// We cannot auto kill the process here because we can't be sure that the master process has
// received the job success/error data yet.
// So now we don't die until the assigner says so.
/*
setTimeout(() => {
process.exit(returnCode);
}, 5000);
*/
});
}, 1);
} else if ('type' in msg && msg.type === 'killJob' && 'exitCode' in msg) {
process.exit(msg.exitCode);
} else if ('type' in msg && msg.type === 'startingJob' && 'jobId' in msg && 'processId' in msg) {
this.handleStartingJob(msg.jobId, msg.processId);
} else if ('type' in msg && msg.type === 'runningJob' && 'jobId' in msg) {
this.handleRunningJob(msg.jobId);
} else if ('type' in msg && msg.type === 'updateJobState' && 'jobId' in msg && 'state' in msg) {
this.handleUpdateJobState(msg.jobId, msg.state);
} else if ('type' in msg && msg.type === 'reportProgress' && 'jobId' in msg && 'progress' in msg) {
this.handleReportProgress(msg.jobId, msg.progress);
} else if ('type' in msg && msg.type === 'errorJob' && 'jobId' in msg && 'error' in msg) {
this.handleErrorJob(msg.jobId, msg.error);
} else if ('type' in msg && msg.type === 'successJob' && 'jobId' in msg && 'success' in msg) {
this.handleSuccessJob(msg.jobId, msg.success);
} else if ('type' in msg && msg.type === 'shutdown') {
this.shuttingDown = true;
// console.log('Graceful shutdown of job process...');
}
});
setTimeout(() => {
if (!this.executingJob) {
// console.error('Job process has been running for 10 seconds doing nothing. Dying now.');
process.exit(2);
}
}, 10 * 1000);
// Don't really think that listening to disconnect will help
/*
process.on('disconnect', function() {
console.error('Parent exited, killing child');
clearInterval(this.tickInterval);
process.exit();
});
*/
process.on('SIGINT', function () {
this.shuttingDown = true;
// console.log("Child process received SIGINT signal");
});
process.on('SIGTERM', function () {
this.shuttingDown = true;
// console.log("Child process received SIGTERM signal");
});
}
reportJobLog(id, logData) {
process.send({ type: 'updateJobAddLog', jobId: id, logData: logData });
}
reportJobStatus(id, status) {
process.send({ type: 'updateJobStatus', jobId: id, status: status });
}
reportJobState(id, state) {
process.send({ type: 'updateJobState', jobId: id, state: state });
}
reportJobRunning(id) {
process.send({ type: 'runningJob', jobId: id });
}
reportJobError(id, error, duration) {
process.send({ type: 'errorJob', jobId: id, error: error, duration: duration });
}
reportJobSuccess(id, success, duration) {
process.send({ type: 'successJob', jobId: id, success: success, duration: duration });
}
reportJobProgress(id, progress) {
process.send({ type: 'reportProgress', jobId: id, progress: progress });
}
handleCreatedJob(jobId, queueId, jobType, jobParams, parentJobId) {
this.childJobs[jobId] = {
jobId: jobId,
queueId: queueId,
jobType: jobType,
jobParams: jobParams,
parentJobId: parentJobId,
status: 0,
state: {},
logs: [],
created: new Date(),
latestUpdate: new Date(),
processId: null,
}
}
handleStartingJob(jobId, processId) {
if (jobId in this.childJobs) {
let job = this.childJobs[jobId];
job.status = 1;
job.latestUpdate = new Date();
job.processId = processId;
}
}
handleRunningJob(jobId) {
if (jobId in this.childJobs) {
let job = this.childJobs[jobId];
job.status = 2;
job.latestUpdate = new Date();
}
}
handleUpdateJobState(jobId, newState) {
if (jobId in this.childJobs) {
let job = this.childJobs[jobId];
job.state = newState;
job.latestUpdate = new Date();
}
}
handleReportProgress(jobId, progress) {
if (jobId in this.childJobs) {
let job = this.childJobs[jobId];
job.progress = progress;
job.latestUpdate = new Date();
}
}
handleErrorJob(jobId, error) {
if (jobId in this.childJobs) {
let job = this.childJobs[jobId];
job.status = 4;
job.latestUpdate = new Date();
job.state.error = error;
}
}
handleSuccessJob(jobId, success) {
if (jobId in this.childJobs) {
let job = this.childJobs[jobId];
job.status = 3;
job.latestUpdate = new Date();
job.state.success = success;
}
}
executeJob(job) {
let startDate = (new Date()).getTime();
this.reportJobRunning(job.id);
return Promise.resolve().then(() => {
return this.obtainJobPromise(job).then(success => {
if ((typeof success) === 'undefined') {
success = null;
}
let endDate = (new Date()).getTime();
let totalSeconds = Math.round((endDate - startDate) / 1000);
this.reportJobSuccess(job.id, success, totalSeconds);
return 0; // Process exit return code
})
}).catch(error => {
// console.error(error); // Reported in the assigner anyway
let endDate = (new Date()).getTime();
let totalSeconds = Math.round((endDate - startDate) / 1000);
this.reportJobError(job.id, {
name: error.name,
message: error.message,
stack: error.stack
}, totalSeconds);
return 1; // Process exit return code
})
}
obtainJobPromise(job) {
// Use the job type and params to obtain/create a promise which we can execute asynchronously
let nextChildJobId = 1;
let reporter = {
addLog: (logData, sequenceId) => {
if (typeof logData === 'string') {
logData = {
str: logData
}
}
this.reportJobLog(job.id, {
seq: sequenceId,
ts: new Date(),
data: logData,
});
},
status: (status) => {
this.reportJobStatus(job.id, status);
},
state: (state) => {
this.reportJobState(job.id, state);
},
queueChildJob: (queueId, jobType, jobParams) => {
let jobId = job.id + '-' + nextChildJobId++;
this.handleCreatedJob(jobId, queueId, jobType, jobParams, job.id);
process.send({ type: 'queueChildJob', jobId: jobId, queueId: queueId, jobType: jobType, jobParams: jobParams, parentJobId: job.id });
return jobId;
},
getChildJob: (jobId) => {
if (jobId in this.childJobs) {
return this.childJobs[jobId];
} else {
return null;
}
},
progress: (progress) => {
this.reportJobProgress(job.id, progress);
}
}
// Jobs are classes in files which are stored in the jobs directory.
// They need to be instantiated and executed
let jobsDirectory = process.argv[2];
let jobClassFile = jobsDirectory + '/' + job.type + '.js';
if (fs.existsSync(jobClassFile)) {
let JobClass = require(jobClassFile);
let jobInstance = new JobClass(job.id, job.params, reporter);
try {
jobInstance.assertJobType(job.type);
} catch (e) {
return Promise.reject(new Error('Invalid job type: ' + job.type + ' when using job class: ' + jobInstance.type));
}
return jobInstance.executeJob();
} else {
return Promise.reject(new Error('Could not find job class file at: ' + jobClassFile));
}
}
}
const jobProcess = new JobProcess();
jobProcess.startup();