Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
crazyfactory/jobs / src / JobManager.php
Size: Mime:
<?php

namespace CrazyFactory\Jobs;

use Traversable;

class JobManager implements \IteratorAggregate, \ArrayAccess
{
    /* @var JobConfig[] $jobsDictionary */
    protected $jobsDictionary = [];

    /* @var JobConfig $defaultConfig */
    protected $defaultConfig = null;

    /* @var IJobResultProcessor[] $resultProcessors */
    protected $resultProcessors = [];

    /* @var IJobRuntimeProcessor[] $runtimeProcessors */
    protected $runtimeProcessors = [];

    /* @var IJobLockProvider $lockProvider */
    protected $lockProvider = null;

    /**
     * Add a processor, which will be called after execution of any job.
     *
     * @param IJobResultProcessor $processor
     *
     * @return self
     */
    public function withResultProcessor(IJobResultProcessor $processor)
    {
        if ($processor !== null) {
            array_push($this->resultProcessors, $processor);
        }

        return $this;
    }

    /**
     * Add a processor, which will be called regularly during execution of any job.
     *
     * @param IJobRuntimeProcessor $processor
     *
     * @return self
     */
    public function withRuntimeProcessor(IJobRuntimeProcessor $processor)
    {
        if ($processor !== null) {
            array_push($this->runtimeProcessors, $processor);
        }

        return $this;
    }


    /**
     * Add a single lock provider, which will be called for jobs configured to run as singleton.
     *
     * @param IJobLockProvider|null $lockProvider
     *
     * @return self
     */
    public function withLockProvider(IJobLockProvider $lockProvider = null)
    {
        $this->lockProvider = $lockProvider;

        return $this;
    }


    /**
     * Sets the default config to be merged with any job config before execution
     *
     * @param JobConfig|null $defaultConfig
     *
     * @return self
     */
    public function withDefaultConfig(JobConfig $defaultConfig = null)
    {
        $this->defaultConfig = $defaultConfig;

        return $this;
    }

    /**
     * Add multiple job configurations at once. These can be plain arrays and will be converted automatically.
     * If the $treatAsDefault key is set and matched, this entry will be used as a default configuration.
     *
     * @param array[]|JobConfig[] $jobDictionary
     * @param string              $treatKeyAsDefault
     *
     * @return self
     */
    public function withJobs(array $jobDictionary, $treatKeyAsDefault = "default")
    {
        // a default config may be passed in with these jobs
        if (!empty($treatKeyAsDefault) && !empty($jobDictionary[$treatKeyAsDefault])) {

            $defaultConfig = $jobDictionary[$treatKeyAsDefault];
            $this->defaultConfig = $defaultConfig instanceof JobConfig
                ? $defaultConfig
                : new JobConfig($defaultConfig);

            unset($jobDictionary[$treatKeyAsDefault]);
        }

        // add jobs
        foreach ($jobDictionary as $key => $jobConfig) {
            // convert array literals on the fly
            if (!($jobConfig instanceof JobConfig)) {
                if (empty($jobConfig['name'])) {
                    $jobConfig['name'] = $key;
                }
                $jobConfig = new JobConfig($jobConfig);
            }

            // add to dictionary
            $this->jobsDictionary[$jobConfig->name] = $jobConfig;
        }

        return $this;
    }

    /**
     * @param JobConfig|array $jobConfig
     *
     * @return self
     */
    public function withJob(JobConfig $jobConfig)
    {
        $this->jobsDictionary[$jobConfig->name] = $jobConfig;

        return $this;
    }

    /**
     * Return true if a named job exists.
     *
     * @param string $name
     *
     * @return bool
     */
    public function has($name)
    {
        return !empty($this->jobsDictionary[$name]);
    }

    /**
     * Runs a configured job and exits the script with the same code.
     *
     * @param string         $name
     * @param bool           $exitAfterExecution Set to true if you want the exit code returned instead of exiting
     * @param JobResult|null $jobResult
     *
     * @return int the exit code of the job. will only be returned if $exitAfterExecution is set to false
     */
    public function run($name, $exitAfterExecution = true, JobResult &$jobResult = null)
    {
        $jobConfig = $this->get($name, true);

        // No config
        if (!$jobConfig) {
            echo "[MANAGER] ERR: Job not found!";
            $exitCode = 1;
        }
        // Job disabled
        else if (!$jobConfig->enabled) {
            echo "[MANAGER] job execution skipped";
            $exitCode = 0;
        }
        // Execute!
        else {
            $jobResult = $this->executeJobConfig($jobConfig);
            $exitCode = $jobResult->getCode();

            // Process results
            $this->callResultProcessor($jobConfig, $jobResult);
        }

        // Return or exit with the same exit code as the job
        if (!$exitAfterExecution) {
            return $exitCode;
        }
        exit($exitCode);
    }

    /**
     * Gets a config back from the dictionary
     *
     * @param string $name
     * @param bool   $mergeWithDefaults
     *
     * @return JobConfig
     * @internal param null|string $error
     *
     */
    public function get($name, $mergeWithDefaults = false)
    {
        $jobConfig = isset($this->jobsDictionary[$name]) ? $this->jobsDictionary[$name] : null;


        // Null OR no merge intended? return directly
        if (!$jobConfig || !$mergeWithDefaults) {
            return $jobConfig;
        }

        // merge jobConfig into customized default if existing
        if ($this->defaultConfig) {
            $jobConfig = $this->defaultConfig->merge($jobConfig);
        }

        // merge into absolute default
        return JobConfig::getDefault()->merge($jobConfig);
    }

    /**
     * Gets all registered configs as an array
     *
     * @return JobConfig[]
     */
    public function getAll()
    {
        return array_slice($this->jobsDictionary, 0, null, true);
    }

    /**
     * @return JobConfig
     */
    public function getDefault() {
        return $this->defaultConfig;
    }

    /**
     * @param JobConfig $jobConfig
     *
     * @return JobResult
     */
    protected function executeJobConfig(JobConfig $jobConfig)
    {
        // Prefix output
        foreach ([
                     '' . PHP_EOL,
                     'job: ' . $jobConfig->name . PHP_EOL,
                     'cmd: ' . $jobConfig->cmd . PHP_EOL,
                     '--------------------------------' . PHP_EOL,
                 ] as $o) echo $o;

        // Try acquire lock for singleton
        if ($jobConfig->singleton && (!$this->lockProvider || !$this->lockProvider->acquire($jobConfig))) {
            $message = 'lock could not be acquired';
            if ($duration = $this->lockProvider->getDuration()) {
                $message .= ', blocked since ' . $duration . ' min(s)';
            }

            $jobResult = JobResult::fatal($message);
        }
        else {
            $jobResult = $this->executeProcess($jobConfig);

            // Release lock for singleton
            if ($jobConfig->singleton) {
                $this->lockProvider->release($jobConfig);
            }
        }

        $code = $jobResult->getCode();

        // suffix output
        foreach ([
                     empty($jobResult->getOutput())
                         ? '[MANAGER] job did not emit output' . PHP_EOL
                         : PHP_EOL,
                     '--------------------------------' . PHP_EOL,
                     'status:   ' . $code . PHP_EOL,
                     'runtime: ' . $jobResult->getDurationFormatted() . PHP_EOL,
                 ] as $o) echo $o;

        return $jobResult;
    }

    /**
     * @param JobConfig $jobConfig
     *
     * @return JobResult
     */
    protected function executeProcess(JobConfig $jobConfig)
    {
        $start = microtime(true);

        $fd_write = 0; // stdin
        $fd_read = 1; // stdout
        $fd_err = 2; // stderr
        $buf_size = 1024; // max buffer size
        $output = [];

        $descriptorspec = [
            0 => ["pipe", "r"],
            1 => ["pipe", "w"],
            2 => ["pipe", "w"],
        ];

        $ptr = proc_open($jobConfig->cmd, $descriptorspec, $pipes, null, $_ENV);
        if (!is_resource($ptr)) {
            return JobResult::fatal("not enough FD or out of memory.");
        }

        stream_set_blocking($pipes[$fd_read], 0);
        stream_set_blocking($pipes[$fd_err], 0);

        $first_exitcode = proc_get_status($ptr)["exitcode"];
        $terminated = $first_exitcode !== -1;

        $nextProcessRuntime = $processRuntimeEvery = empty($this->runtimeProcessors)
            ? 0
            : $jobConfig->processRuntimeEvery;
        $maximumRuntime = $jobConfig->maxRuntime;

        while (!feof($pipes[$fd_read]) || !feof($pipes[$fd_write])) {

            ob_start();

            if (strlen($buffer = fgets($pipes[$fd_read], $buf_size))) {
                echo $buffer;
            }
            else if (strlen($errorBuffer = fgets($pipes[$fd_err], $buf_size))) {
                echo "ERR: " . $errorBuffer;
            }
            else if ($terminated) {
                break;
            }

            // Runtime calculation
            $runtime = microtime(true) - $start;

            // Call runtime processors
            $breakProcess = false;
            if ($nextProcessRuntime > 0 && $nextProcessRuntime < $runtime) {
                foreach ($this->runtimeProcessors as $runtimeProcessor) {
                    try {
                        $breakProcess = $runtimeProcessor
                                ->process($runtime, $runtime - $nextProcessRuntime, $jobConfig) === false;
                        if ($breakProcess) {
                            echo "[RUNTIME] runtime processor " . (string)$runtimeProcessor . " forces exit.";
                            break;
                        }
                    } catch (\Throwable $exception) {
                        echo "\n[RUNTIME] Exception caught: {$exception->getMessage()}\n";
                    }
                }

                // Calculate next due date
                while ($nextProcessRuntime < $runtime) {
                    $nextProcessRuntime += $processRuntimeEvery;
                }
            }

            // Terminate using max runtime config value
            if (!$breakProcess && $maximumRuntime > 0 && $maximumRuntime < $runtime) {
                echo "\n[RUNTIME] maximum runtime of {$maximumRuntime} seconds exceeded. Terminating process.\n";
                $breakProcess = true;
            }

            $bufferedOutput = ob_get_flush();
            if (!empty($bufferedOutput)) {
                $output[] = $bufferedOutput;
            }
            else {
                usleep(200);
            }

            if ($breakProcess) {
                break;
            }

            $terminated = $terminated || (proc_get_status($ptr)["exitcode"] != -1);
        }

        foreach ($pipes as $pipe)
            fclose($pipe);

        /* Get the expected *exit* code to return the value */
        $pstatus = proc_get_status($ptr);

        if (!strlen($pstatus["exitcode"]) || $pstatus["running"]) {
            /* we can trust the retval of proc_close() */
            if ($pstatus["running"]) {
                proc_terminate($ptr);
            }
            $ret = proc_close($ptr);
        }
        else {
            if ((($first_exitcode + 256) % 256) == 255
                && (($pstatus["exitcode"] + 256) % 256) != 255
            ) {
                $ret = $pstatus["exitcode"];
            }
            elseif (!strlen($first_exitcode)) {
                $ret = $pstatus["exitcode"];
            }
            elseif ((($first_exitcode + 256) % 256) != 255) {
                $ret = $first_exitcode;
            }
            else {
                $ret = 0;
            } /* we "deduce" an EXIT_SUCCESS ;) */
            proc_close($ptr);
        }

        $exitCode = ($ret + 256) % 256;
        $end = microtime(true);

        // special case
        if ($exitCode == 127) {
            return JobResult::fatal("Command not found (returned by sh)", 127);
        }

        return new JobResult($exitCode, $start, $end, $output);
    }

    /**
     * @param JobConfig $jobConfig
     * @param JobResult $jobResult
     */
    protected function callResultProcessor(JobConfig $jobConfig, JobResult $jobResult)
    {
        // Process results
        foreach ($this->resultProcessors as $resultProcessor) {
            try {
                // run result processor.
                if ($resultProcessor->process($jobResult, $jobConfig) === false) {
                    // break out for explicit false returns.
                    break;
                }
            } catch (\Throwable $exception) {
                // We catch and hide all exceptions to guarantee execution of all processors
                echo "\n[RESULT] Exception caught: {$exception->getMessage()}\n";
            }
        }
    }

    /**
     * Retrieve an external iterator
     *
     * @link  http://php.net/manual/en/iteratoraggregate.getiterator.php
     * @return Traversable An instance of an object implementing <b>Iterator</b> or
     * <b>Traversable</b>
     * @since 5.0.0
     */
    public function getIterator()
    {
        return new \ArrayIterator($this->jobsDictionary);
    }

    /**
     * @return array
     */
    public function toArray()
    {
        return [
            'default' => $this->defaultConfig,
            'jobs' => array_map(function(JobConfig $jobConfig) {
                return $jobConfig->toArray();
            }, $this->jobsDictionary),
        ];
    }

    /**
     * Whether a offset exists
     *
     * @link  http://php.net/manual/en/arrayaccess.offsetexists.php
     *
     * @param mixed $offset <p>
     *                      An offset to check for.
     *                      </p>
     *
     * @return boolean true on success or false on failure.
     * </p>
     * <p>
     * The return value will be casted to boolean if non-boolean was returned.
     * @since 5.0.0
     */
    public function offsetExists($offset)
    {
        return isset($this->jobsDictionary[$offset]);
    }

    /**
     * Offset to retrieve
     *
     * @link  http://php.net/manual/en/arrayaccess.offsetget.php
     *
     * @param mixed $offset <p>
     *                      The offset to retrieve.
     *                      </p>
     *
     * @return mixed Can return all value types.
     * @since 5.0.0
     */
    public function offsetGet($offset)
    {
        return isset($this->jobsDictionary[$offset]) ? $this->jobsDictionary[$offset] : null;
    }

    /**
     * Offset to set
     *
     * @link  http://php.net/manual/en/arrayaccess.offsetset.php
     *
     * @param mixed $offset <p>
     *                      The offset to assign the value to.
     *                      </p>
     * @param mixed $value  <p>
     *                      The value to set.
     *                      </p>
     *
     * @return void
     * @throws \TypeError
     * @since 5.0.0
     */
    public function offsetSet($offset, $value)
    {
        if ($value !== null && !($value instanceof JobConfig)) {
            throw new \TypeError("expected JobConfig or null");
        }
        else {
            $this->jobsDictionary[$offset] = $value;
        }
    }

    /**
     * Offset to unset
     *
     * @link  http://php.net/manual/en/arrayaccess.offsetunset.php
     *
     * @param mixed $offset <p>
     *                      The offset to unset.
     *                      </p>
     *
     * @return void
     * @since 5.0.0
     */
    public function offsetUnset($offset)
    {
        unset($this->jobsDictionary[$offset]);
    }
}