Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
@fbinhouse/devops-job-queue / src / forkedProcess.js
Size: Mime:
const fs = require('fs');
// const uuidv4 = require('uuid/v4');
const process = require('process');

class JobProcess {
    constructor() {
        this.executingJob = false;
        this.shuttingDown = false;

        this.childJobs = {}; // Stores the state of a child job by jobId
    }

    startup() {
        // console.log('Child process starting up', process.pid);
        process.on('message', (msg) => {
            if ('type' in msg && msg.type === 'runJob') {
                this.executingJob = true;
                setTimeout(() => {
                    this.executeJob({
                        id: msg.jobId,
                        type: msg.jobType,
                        params: msg.jobParams,
                    }).then((returnCode) => {
                        // We cannot auto kill the process here because we can't be sure that the master process has
                        // received the job success/error data yet.
                        // So now we don't die until the assigner says so.
                        /*
                         setTimeout(() => {
                         process.exit(returnCode);
                         }, 5000);
                         */
                    });
                }, 1);
            } else if ('type' in msg && msg.type === 'killJob' && 'exitCode' in msg) {
                process.exit(msg.exitCode);
            } else if ('type' in msg && msg.type === 'startingJob' && 'jobId' in msg && 'processId' in msg) {
                this.handleStartingJob(msg.jobId, msg.processId);
            } else if ('type' in msg && msg.type === 'runningJob' && 'jobId' in msg) {
                this.handleRunningJob(msg.jobId);
            } else if ('type' in msg && msg.type === 'updateJobState' && 'jobId' in msg && 'state' in msg) {
                this.handleUpdateJobState(msg.jobId, msg.state);
            } else if ('type' in msg && msg.type === 'reportProgress' && 'jobId' in msg && 'progress' in msg) {
                this.handleReportProgress(msg.jobId, msg.progress);
            } else if ('type' in msg && msg.type === 'errorJob' && 'jobId' in msg && 'error' in msg) {
                this.handleErrorJob(msg.jobId, msg.error);
            } else if ('type' in msg && msg.type === 'successJob' && 'jobId' in msg && 'success' in msg) {
                this.handleSuccessJob(msg.jobId, msg.success);
            } else if ('type' in msg && msg.type === 'shutdown') {
                this.shuttingDown = true;
                // console.log('Graceful shutdown of job process...');
            }
        });

        setTimeout(() => {
            if (!this.executingJob) {
                // console.error('Job process has been running for 10 seconds doing nothing. Dying now.');
                process.exit(2);
            }
        }, 10 * 1000);

        // Don't really think that listening to disconnect will help
        /*
        process.on('disconnect', function() {
            console.error('Parent exited, killing child');
            clearInterval(this.tickInterval);
            process.exit();
        });
        */

        process.on('SIGINT', function () {
            this.shuttingDown = true;
            // console.log("Child process received SIGINT signal");
        });

        process.on('SIGTERM', function () {
            this.shuttingDown = true;
            // console.log("Child process received SIGTERM signal");
        });
    }

    reportJobLog(id, logData) {
        process.send({ type: 'updateJobAddLog', jobId: id, logData: logData });
    }
    reportJobStatus(id, status) {
        process.send({ type: 'updateJobStatus', jobId: id, status: status });
    }
    reportJobState(id, state) {
        process.send({ type: 'updateJobState', jobId: id, state: state });
    }
    reportJobRunning(id) {
        process.send({ type: 'runningJob', jobId: id });
    }
    reportJobError(id, error, duration) {
        process.send({ type: 'errorJob', jobId: id, error: error, duration: duration });
    }
    reportJobSuccess(id, success, duration) {
        process.send({ type: 'successJob', jobId: id, success: success, duration: duration });
    }
    reportJobProgress(id, progress) {
        process.send({ type: 'reportProgress', jobId: id, progress: progress });
    }

    handleCreatedJob(jobId, queueId, jobType, jobParams, parentJobId) {
        this.childJobs[jobId] = {
            jobId: jobId,
            queueId: queueId,
            jobType: jobType,
            jobParams: jobParams,
            parentJobId: parentJobId,

            status: 0,
            state: {},
            logs: [],

            created: new Date(),
            latestUpdate: new Date(),
            processId: null,
        }
    }
    handleStartingJob(jobId, processId) {
        if (jobId in this.childJobs) {
            let job = this.childJobs[jobId];
            job.status = 1;
            job.latestUpdate = new Date();
            job.processId = processId;
        }
    }
    handleRunningJob(jobId) {
        if (jobId in this.childJobs) {
            let job = this.childJobs[jobId];
            job.status = 2;
            job.latestUpdate = new Date();
        }
    }
    handleUpdateJobState(jobId, newState) {
        if (jobId in this.childJobs) {
            let job = this.childJobs[jobId];
            job.state = newState;
            job.latestUpdate = new Date();
        }
    }
    handleReportProgress(jobId, progress) {
        if (jobId in this.childJobs) {
            let job = this.childJobs[jobId];
            job.progress = progress;
            job.latestUpdate = new Date();
        }
    }
    handleErrorJob(jobId, error) {
        if (jobId in this.childJobs) {
            let job = this.childJobs[jobId];
            job.status = 4;
            job.latestUpdate = new Date();
            job.state.error = error;
        }
    }
    handleSuccessJob(jobId, success) {
        if (jobId in this.childJobs) {
            let job = this.childJobs[jobId];
            job.status = 3;
            job.latestUpdate = new Date();
            job.state.success = success;
        }
    }

    executeJob(job) {
        let startDate = (new Date()).getTime();
        this.reportJobRunning(job.id);
        return Promise.resolve().then(() => {
            return this.obtainJobPromise(job).then(success => {
                if ((typeof success) === 'undefined') {
                    success = null;
                }
                let endDate = (new Date()).getTime();
                let totalSeconds = Math.round((endDate - startDate) / 1000);
                this.reportJobSuccess(job.id, success, totalSeconds);
                return 0; // Process exit return code
            })
        }).catch(error => {
            // console.error(error); // Reported in the assigner anyway
            let endDate = (new Date()).getTime();
            let totalSeconds = Math.round((endDate - startDate) / 1000);

            this.reportJobError(job.id, {
                name: error.name,
                message: error.message,
                stack: error.stack
            }, totalSeconds);
            return 1; // Process exit return code
        })
    }

    obtainJobPromise(job) {
        // Use the job type and params to obtain/create a promise which we can execute asynchronously

        let nextChildJobId = 1;
        let reporter = {
            addLog: (logData, sequenceId) => {
                if (typeof logData === 'string') {
                    logData = {
                        str: logData
                    }
                }
                this.reportJobLog(job.id, {
                    seq: sequenceId,
                    ts: new Date(),
                    data: logData,
                });
            },
            status: (status) => {
                this.reportJobStatus(job.id, status);
            },
            state: (state) => {
                this.reportJobState(job.id, state);
            },
            queueChildJob: (queueId, jobType, jobParams) => {
                let jobId = job.id + '-' + nextChildJobId++;
                this.handleCreatedJob(jobId, queueId, jobType, jobParams, job.id);
                process.send({ type: 'queueChildJob', jobId: jobId, queueId: queueId, jobType: jobType, jobParams: jobParams, parentJobId: job.id });
                return jobId;
            },
            getChildJob: (jobId) => {
                if (jobId in this.childJobs) {
                    return this.childJobs[jobId];
                } else {
                    return null;
                }
            },
            progress: (progress) => {
                this.reportJobProgress(job.id, progress);
            }
        }

        // Jobs are classes in files which are stored in the jobs directory.
        // They need to be instantiated and executed
        let jobsDirectory = process.argv[2];
        let jobClassFile = jobsDirectory + '/' + job.type + '.js';
        if (fs.existsSync(jobClassFile)) {
            let JobClass = require(jobClassFile);
            let jobInstance = new JobClass(job.id, job.params, reporter);
            try {
                jobInstance.assertJobType(job.type);
            } catch (e) {
                return Promise.reject(new Error('Invalid job type: ' + job.type + ' when using job class: ' + jobInstance.type));
            }
            return jobInstance.executeJob();
        } else {
            return Promise.reject(new Error('Could not find job class file at: ' + jobClassFile));
        }
    }
}

const jobProcess = new JobProcess();
jobProcess.startup();