Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:

const { fork } = require('child_process');

const uuidv4 = require('uuid/v4');
const moment = require('moment-timezone');
const path = require('path');
const LogWriter = require('./logWriter');

class SimpleJobQueues {
    constructor(db, jobsDirectory, eventHandler = null) {
        this.db = db;
        this.jobsDirectory = jobsDirectory;
        this.eventHandler = eventHandler;

        this.queueProcesses = {}; // Keeps track of which processes are running for each queue id
        this.otherProcesses = []; // Keeps track of processes for jobs without a queue (queueId = null)
        this.processByJobId = {}; // Keeps track of the process for each running jobId
        this.logWriters = {}; // Keeps track of all job logWriters

        // New Id structure.  <date>-<instance>-<incrementing number>
        this.instanceId = uuidv4().substring(0, 8);
        this.idCounter = 1; // This gets reset to 1 when it is a new date
        this.latestDate = null;

        this.jobAssignerCount = 0;
        this.triggerJobAssignment = true;
        this.jobAssignerActive = false; // Whether the queue is started
        this.shuttingDown = false; // Whether the queue is stopping
    }

    fireEvent(eventType, eventMeta = {}) {
        if ((typeof this.eventHandler) === 'function') {
            this.eventHandler(eventType, Object.assign({
                ts: new Date()
            }, eventMeta));
        }
    }

    start() {
        // Get the jobs which are in running state and log that they errored.  This container is designed to only run 1 instance.
        return this.db.getJobsStartingOrRunning().then(jobs => {
            let promise = Promise.resolve();

            jobs.map(job => {
                promise = promise.then(() => {
                    this.fireEvent('startupJobCleanupWarning', {
                        jobId: job.id,
                    });
                    return this.db.errorJob(job.id, 'Found job in starting/running state upon startup. Something violently SIGKILLed the process running this job.');
                });
            });

            return promise;
        }).then(() => {
            this.fireEvent('startup');
            this.jobAssigner = setInterval(() => {
                if ((this.triggerJobAssignment && this.jobAssignerCount > 2) || this.jobAssignerCount > 30) {
                    this.jobAssignerCount = 0;
                    this.triggerJobAssignment = false;
                    this.assignJobs();
                } else {
                    // console.log("Not assigning any jobs yet, waiting: " + this.jobAssignerCount);
                    this.jobAssignerCount++;
                }
            }, 1000);
            this.jobAssignerActive = true;
        })
    }

    assignJobs() {
        // We cannot just shutdown the job assigner now since a job might spawn more child jobs.
        // The trick is that during shutdown mode, the assigner should only assign processes to child jobs

        // Get all jobs with null queueId with status JOB_CREATED (0) and just execute them now
        this.db.getJobsWaitingWithNoQueue().then(jobs => {
            jobs.map(job => {
                if (!this.shuttingDown || job.parentJobId !== null) {
                    this.startJob(job);
                }
            })
        });

        // Get the oldest job per queueId from all jobs with status JOB_CREATED (0)
        this.db.getJobsWaitingByQueue().then(jobs => {
            // console.log("JOBS IN QUEUE", jobs);
            Object.keys(jobs).map(queueId => {
                if (!(queueId in this.queueProcesses)) {
                    this.queueProcesses[queueId] = null;
                }

                if (this.queueProcesses[queueId] === null) {
                    let queueJobs = jobs[queueId];

                    let jobToExecute = null;
                    queueJobs.map(job => {
                        // Only execute first appropriate job per queue
                        if (jobToExecute === null && (!this.shuttingDown || job.parentJobId !== null)) {
                            jobToExecute = job;
                        }
                    });

                    if (jobToExecute !== null) {
                        this.startJob(jobToExecute);
                    }
                } else {
                    // Process exists
                    // We detect the disconnect event on a child process so we can be sure that it is still running
                }
            })
        })
    }

    startJob(job) {
        // We cannot start a job which is a child of some other job which is no longer executing
        let parentJobId = job.parentJobId;
        if (parentJobId !== null) {
            if (!(parentJobId in this.processByJobId)) {
                // We cannot run this job because the parent job is dead
                this.fireJobError(job.id, 'We cannot start this job because its parent job (' + job.parentJobId + ') is dead.');
                this.triggerJobAssignment = true;
                return;
            }
        }

        let childProcess = fork(path.join(__dirname, 'forkedProcess.js'), [this.jobsDirectory]);

        this.fireJobStarting(job, childProcess);

        childProcess.on('message', (msg) => {
            this.responseFromJobProcess(msg, childProcess);
        });

        childProcess.on('disconnect', () => {
            this.fireEvent('jobProcessDied', {
                jobId: job.id,
                queueId: job.queueId,
                processId: childProcess.pid,
            });
            if (job.queueId === null) {
                var idx = this.otherProcesses.indexOf(childProcess);
                if (idx !== -1) {
                    this.otherProcesses.splice(idx, 1);
                } else {
                    this.fireEvent('warning', {
                        message: 'Could not find process in otherProcesses array',
                        processId: childProcess.pid,
                    });
                }
            } else {
                this.queueProcesses[job.queueId] = null;
                this.triggerJobAssignment = true;
            }

            delete this.processByJobId[job.id];
        });

        childProcess.send({
            type: 'runJob',
            jobId: job.id,
            jobType: job.jobType,
            jobParams: job.jobParams,
        });
    }

    stop() {
        if (this.shuttingDown) {
            return Promise.reject(new Error('Queue is already shutting down, please wait...'));
        }
        this.shuttingDown = true;
        this.fireEvent('shutdownStarted');

        return new Promise((resolve, reject) => {
            let checkCount = 0;
            let checkInterval = setInterval(() => {
                checkCount++;
                let runningJobs = 0

                this.otherProcesses.map(process => {
                    runningJobs++;
                })

                Object.keys(this.queueProcesses).map(queueId => {
                    if (this.queueProcesses[queueId] !== null) {
                        runningJobs++;
                    }
                });

                if (checkCount % 10 === 0) {
                    this.fireEvent('shutdownWaiting', {
                        jobsLeft: runningJobs,
                    });
                }

                if (runningJobs === 0) {
                    clearInterval(this.jobAssigner);
                    this.jobAssignerActive = false;
                    clearInterval(checkInterval);
                    this.fireEvent('shutdownFinished');
                    this.shuttingDown = false;
                    resolve('OK');
                }
            }, 500);
        })
    }

    generateJobId() {
        let currentDate = moment().tz('Europe/Stockholm').format('YYYYMMDD');

        if (this.latestDate !== currentDate) {
            this.latestDate = currentDate;
            this.idCounter = 1;
        }

        let id = this.latestDate + '-' + this.instanceId + '-' + this.idCounter;
        this.idCounter++;
        return id;
    }

    queueJob(queueId, jobType, jobParams, parentJobId = null) {
        let jobId = this.generateJobId();
        return this.queueJobAsId(jobId, queueId, jobType, jobParams, parentJobId);
    }

    queueJobAsId(jobId, queueId, jobType, jobParams, parentJobId = null) {
        this.fireEvent('addedJob', {
            jobId,
            queueId,
            jobType,
            parentJobId,
        });
        return this.db.insertJob(jobId, queueId, jobType, jobParams, parentJobId).then(() => {
            this.triggerJobAssignment = true;
            return jobId;
        })
    }

    async retryJob(jobId) {
        const job = await this.db.getJobById(jobId);

        // The job must be in the error state
        if (job.status !== 4) { // 4 = JOB_ERROR
            throw new Error('Cannot retry this job, it is not in a failed state: ' + job.status);
        }

        // The job must not be a child of some other job
        if (job.parentJobId !== null) {
            throw new Error('Cannot retry a child job individually since unknown things could happen.  You need to retry the whole parent job');
        }

        // Create a new job using the params of this failed job
        let newJobId = this.generateJobId();
        return this.queueJobAsId(newJobId, job.queueId, job.jobType, job.jobParams, null);
    }

    /*
    getAllJobs() {
        // Gets all jobs in the last week
        return this.db.getAllJobs(24 * 7);
    }
    */

    getJobs(lastId = null, reverse = false) {
        return this.db.getJobs(lastId, reverse);
    }

    getJobById(id) {
        return this.db.getJobById(id);
    }

    getStats() {
        // Gets the number of jobs at each status over the past 8 hours
        return this.db.getJobCounts().then(statusStats => {
            return {
                jobAssignerActive: this.jobAssignerActive,
                shuttingDown: this.shuttingDown,
                status: statusStats,
            }
        });
    }

    deleteAllJobs() {
        if (this.jobAssignerActive || this.shuttingDown) {
            return Promise.reject(new Error('You must stop the queue first before you delete all the jobs'));
        } else {
            return this.db.deleteAllJobs();
        }
    }

    addJobLogItemAsync(jobId, logData) {
        // We create an async log writer which updates the mongodb every 5 seconds for each job with ALL the pending lines
        if (!(jobId in this.logWriters)) {
            this.logWriters[jobId] = new LogWriter(this.db, jobId);
            this.logWriters[jobId].start();
        }
        this.logWriters[jobId].addLogItem(logData);
    }

    responseFromJobProcess(msg, childProcess) {
        if ('type' in msg && 'jobId' in msg) {
            let jobId = msg.jobId;

            if (msg.type === 'updateJobAddLog') {
                if ('logData' in msg) {
                    if (!('data' in msg.logData) || ((typeof msg.logData.data) !== 'object')) {
                        this.fireEvent('badJobMessageReceived', {
                            jobId: jobId,
                            type: 'updateJobAddLog',
                            data: msg,
                        });
                    } else {
                        // This is TOO inefficient
                        // this.db.addJobLogItem(jobId, msg.logData);
                        this.addJobLogItemAsync(jobId, msg.logData);

                        if ('str' in msg.logData.data) {
                            this.fireEvent('logItemString', {
                                jobId: jobId,
                                message: msg.logData.data.str
                            });
                        } else {
                            this.fireEvent('logItemOther', {
                                jobId: jobId,
                                data: msg.logData.data
                            });
                        }
                    }
                } else {
                    this.fireEvent('badJobMessageReceived', {
                        jobId: jobId,
                        type: 'updateJobAddLog',
                        data: msg,
                    });
                }
            } else if (msg.type === 'runningJob') {
                this.fireJobRunning(jobId);
            } else if (msg.type === 'updateJobState') {
                if ('state' in msg) {
                    this.fireJobStateUpdated(jobId, msg.state);
                } else {
                    this.fireEvent('badJobMessageReceived', {
                        jobId: jobId,
                        type: 'updateJobState',
                        data: msg,
                    });
                }
            } else if (msg.type === 'errorJob') {
                if ('error' in msg && 'duration' in msg) {
                    this.fireJobError(jobId, msg.error, msg.duration);
                } else {
                    this.fireEvent('badJobMessageReceived', {
                        jobId: jobId,
                        type: 'errorJob',
                        data: msg,
                    });
                }
            } else if (msg.type === 'successJob') {
                if ('success' in msg && 'duration' in msg) {
                    this.fireJobSuccess(jobId, msg.success, msg.duration);
                } else {
                    this.fireEvent('badJobMessageReceived', {
                        jobId: jobId,
                        type: 'successJob',
                        data: msg,
                    });
                }
            } else if (msg.type === 'queueChildJob') {
                if ('queueId' in msg && 'jobType' in msg && 'jobParams' in msg && 'parentJobId' in msg) {
                    this.queueJobAsId(msg.jobId, msg.queueId, msg.jobType, msg.jobParams, msg.parentJobId);
                } else {
                    this.fireEvent('badJobMessageReceived', {
                        jobId: jobId,
                        type: 'queueChildJob',
                        data: msg,
                    });
                }
            } else if (msg.type === 'reportProgress') {
                if ('progress' in msg) {
                    this.fireJobProgressReport(jobId, msg.progress);
                } else {
                    this.fireEvent('badJobMessageReceived', {
                        jobId: jobId,
                        type: 'reportProgress',
                        data: msg,
                    });
                }
            } else {
                this.fireEvent('badJobMessageReceived', {
                    jobId: jobId,
                    type: 'unknown',
                    data: msg,
                });
            }
        } else {
            this.fireEvent('badJobMessageReceived', {
                message: 'Bad response from child process',
                data: msg,
            });
        }
    }

    getParentJobId(jobId) {
        // 2 dashes mean no parent, more than that and we can remove everything after the last dash
        let parts = jobId.split('-');
        if (parts.length < 4) {
            return null;
        } else {
            parts.pop();
            return parts.join('-');
        }
    }

    fireJobStarting(job, childProcess) {
        this.fireEvent('jobStarting', {
            jobId: job.id,
            queueId: job.queueId,
            processId: childProcess.pid,
        });

        // Track the process
        let queueId = job.queueId;
        if (queueId === null) {
            this.otherProcesses.push(childProcess);
        } else {
            this.queueProcesses[queueId] = childProcess;
        }

        // Also track the process by jobId
        this.processByJobId[job.id] = childProcess;

        // Store in database
        this.db.startingJob(job.id, childProcess.pid);

        // Notify the parent job process
        let parentJobId = this.getParentJobId(job.id);
        if (parentJobId !== null) {
            if (parentJobId in this.processByJobId) {
                this.processByJobId[parentJobId].send({ type: 'startingJob', jobId: job.id, processId: childProcess.pid });
            }
        }
    }

    fireJobRunning(jobId) {
        this.fireEvent('jobRunning', {
            jobId: jobId,
        });

        this.db.runningJob(jobId);

        // Notify the parent job process
        let parentJobId = this.getParentJobId(jobId);
        if (parentJobId !== null) {
            if (parentJobId in this.processByJobId) {
                this.processByJobId[parentJobId].send({ type: 'runningJob', jobId: jobId });
            }
        }
    }

    fireJobStateUpdated(jobId, newState) {
        this.fireEvent('jobStateUpdated', {
            jobId: jobId,
            newState: newState,
        });

        this.db.updateJobState(jobId, newState);

        // Notify the parent job process
        let parentJobId = this.getParentJobId(jobId);
        if (parentJobId !== null) {
            if (parentJobId in this.processByJobId) {
                this.processByJobId[parentJobId].send({ type: 'updateJobState', jobId: jobId, state: newState });
            }
        }
    }

    fireJobProgressReport(jobId, progress) {
        this.fireEvent('jobProgressReport', {
            jobId: jobId,
            progress: progress,
        });

        this.db.updateJobProgress(jobId, progress);

        // Notify the parent job process
        let parentJobId = this.getParentJobId(jobId);
        if (parentJobId !== null) {
            if (parentJobId in this.processByJobId) {
                this.processByJobId[parentJobId].send({ type: 'reportProgress', jobId: jobId, progress: progress });
            }
        }
    }

    fireJobError(jobId, error, duration) {
        this.fireEvent('jobError', {
            jobId: jobId,
            error: error,
            duration: duration,
        });

        this.db.errorJob(jobId, error);

        // Notify the parent job process
        let parentJobId = this.getParentJobId(jobId);
        if (parentJobId !== null) {
            if (parentJobId in this.processByJobId) {
                this.processByJobId[parentJobId].send({ type: 'errorJob', jobId: jobId, error: error });
            }
        }

        // Send kill signal 1 to the job
        // This is necessary because jobs processes don't automatically die unless told to by the queue.
        if (jobId in this.processByJobId) {
            this.processByJobId[jobId].send({ type: 'killJob', exitCode: 1 });
        }
    }

    fireJobSuccess(jobId, success, duration) {
        this.fireEvent('jobSuccess', {
            jobId: jobId,
            success: success,
            duration: duration,
        });

        this.db.successJob(jobId, success);

        // Notify the parent job process
        let parentJobId = this.getParentJobId(jobId);
        if (parentJobId !== null) {
            if (parentJobId in this.processByJobId) {
                this.processByJobId[parentJobId].send({ type: 'successJob', jobId: jobId, success: success });
            }
        }

        // Send kill signal 0 to the job
        // This is necessary because jobs processes don't automatically die unless told to by the queue.
        if (jobId in this.processByJobId) {
            this.processByJobId[jobId].send({ type: 'killJob', exitCode: 0 });
        }
    }
}

module.exports = SimpleJobQueues;