Repository URL to install this package:
|
Version:
2.2.0 ▾
|
<?php
namespace CrazyFactory\Jobs;
use Traversable;
class JobManager implements \IteratorAggregate, \ArrayAccess
{
/* @var JobConfig[] $jobsDictionary */
protected $jobsDictionary = [];
/* @var JobConfig $defaultConfig */
protected $defaultConfig = null;
/* @var IJobResultProcessor[] $resultProcessors */
protected $resultProcessors = [];
/* @var IJobRuntimeProcessor[] $runtimeProcessors */
protected $runtimeProcessors = [];
/* @var IJobLockProvider $lockProvider */
protected $lockProvider = null;
/**
* Add a processor, which will be called after execution of any job.
*
* @param IJobResultProcessor $processor
*
* @return self
*/
public function withResultProcessor(IJobResultProcessor $processor)
{
if ($processor !== null) {
array_push($this->resultProcessors, $processor);
}
return $this;
}
/**
* Add a processor, which will be called regularly during execution of any job.
*
* @param IJobRuntimeProcessor $processor
*
* @return self
*/
public function withRuntimeProcessor(IJobRuntimeProcessor $processor)
{
if ($processor !== null) {
array_push($this->runtimeProcessors, $processor);
}
return $this;
}
/**
* Add a single lock provider, which will be called for jobs configured to run as singleton.
*
* @param IJobLockProvider|null $lockProvider
*
* @return self
*/
public function withLockProvider(IJobLockProvider $lockProvider = null)
{
$this->lockProvider = $lockProvider;
return $this;
}
/**
* Sets the default config to be merged with any job config before execution
*
* @param JobConfig|null $defaultConfig
*
* @return self
*/
public function withDefaultConfig(JobConfig $defaultConfig = null)
{
$this->defaultConfig = $defaultConfig;
return $this;
}
/**
* Add multiple job configurations at once. These can be plain arrays and will be converted automatically.
* If the $treatAsDefault key is set and matched, this entry will be used as a default configuration.
*
* @param array[]|JobConfig[] $jobDictionary
* @param string $treatKeyAsDefault
*
* @return self
*/
public function withJobs(array $jobDictionary, $treatKeyAsDefault = "default")
{
// a default config may be passed in with these jobs
if (!empty($treatKeyAsDefault) && !empty($jobDictionary[$treatKeyAsDefault])) {
$defaultConfig = $jobDictionary[$treatKeyAsDefault];
$this->defaultConfig = $defaultConfig instanceof JobConfig
? $defaultConfig
: new JobConfig($defaultConfig);
unset($jobDictionary[$treatKeyAsDefault]);
}
// add jobs
foreach ($jobDictionary as $key => $jobConfig) {
// convert array literals on the fly
if (!($jobConfig instanceof JobConfig)) {
if (empty($jobConfig['name'])) {
$jobConfig['name'] = $key;
}
$jobConfig = new JobConfig($jobConfig);
}
// add to dictionary
$this->jobsDictionary[$jobConfig->name] = $jobConfig;
}
return $this;
}
/**
* @param JobConfig|array $jobConfig
*
* @return self
*/
public function withJob(JobConfig $jobConfig)
{
$this->jobsDictionary[$jobConfig->name] = $jobConfig;
return $this;
}
/**
* Return true if a named job exists.
*
* @param string $name
*
* @return bool
*/
public function has($name)
{
return !empty($this->jobsDictionary[$name]);
}
/**
* Runs a configured job and exits the script with the same code.
*
* @param string $name
* @param bool $exitAfterExecution Set to true if you want the exit code returned instead of exiting
* @param JobResult|null $jobResult
*
* @return int the exit code of the job. will only be returned if $exitAfterExecution is set to false
*/
public function run($name, $exitAfterExecution = true, JobResult &$jobResult = null)
{
$jobConfig = $this->get($name, true);
// No config
if (!$jobConfig) {
echo "[MANAGER] ERR: Job not found!";
$exitCode = 1;
}
// Job disabled
else if (!$jobConfig->enabled) {
echo "[MANAGER] job execution skipped";
$exitCode = 0;
}
// Execute!
else {
$jobResult = $this->executeJobConfig($jobConfig);
$exitCode = $jobResult->getCode();
// Process results
$this->callResultProcessor($jobConfig, $jobResult);
}
// Return or exit with the same exit code as the job
if (!$exitAfterExecution) {
return $exitCode;
}
exit($exitCode);
}
/**
* Gets a config back from the dictionary
*
* @param string $name
* @param bool $mergeWithDefaults
*
* @return JobConfig
* @internal param null|string $error
*
*/
public function get($name, $mergeWithDefaults = false)
{
$jobConfig = isset($this->jobsDictionary[$name]) ? $this->jobsDictionary[$name] : null;
// Null OR no merge intended? return directly
if (!$jobConfig || !$mergeWithDefaults) {
return $jobConfig;
}
// merge jobConfig into customized default if existing
if ($this->defaultConfig) {
$jobConfig = $this->defaultConfig->merge($jobConfig);
}
// merge into absolute default
return JobConfig::getDefault()->merge($jobConfig);
}
/**
* Gets all registered configs as an array
*
* @return JobConfig[]
*/
public function getAll()
{
return array_slice($this->jobsDictionary, 0, null, true);
}
/**
* @return JobConfig
*/
public function getDefault() {
return $this->defaultConfig;
}
/**
* @param JobConfig $jobConfig
*
* @return JobResult
*/
protected function executeJobConfig(JobConfig $jobConfig)
{
// Prefix output
foreach ([
'' . PHP_EOL,
'job: ' . $jobConfig->name . PHP_EOL,
'cmd: ' . $jobConfig->cmd . PHP_EOL,
'--------------------------------' . PHP_EOL,
] as $o) echo $o;
// Try acquire lock for singleton
if ($jobConfig->singleton && (!$this->lockProvider || !$this->lockProvider->acquire($jobConfig))) {
$message = 'lock could not be acquired';
if ($duration = $this->lockProvider->getDuration()) {
$message .= ', blocked since ' . $duration . ' min(s)';
}
$jobResult = JobResult::fatal($message);
}
else {
$jobResult = $this->executeProcess($jobConfig);
// Release lock for singleton
if ($jobConfig->singleton) {
$this->lockProvider->release($jobConfig);
}
}
$code = $jobResult->getCode();
// suffix output
foreach ([
empty($jobResult->getOutput())
? '[MANAGER] job did not emit output' . PHP_EOL
: PHP_EOL,
'--------------------------------' . PHP_EOL,
'status: ' . $code . PHP_EOL,
'runtime: ' . $jobResult->getDurationFormatted() . PHP_EOL,
] as $o) echo $o;
return $jobResult;
}
/**
* @param JobConfig $jobConfig
*
* @return JobResult
*/
protected function executeProcess(JobConfig $jobConfig)
{
$start = microtime(true);
$fd_write = 0; // stdin
$fd_read = 1; // stdout
$fd_err = 2; // stderr
$buf_size = 1024; // max buffer size
$output = [];
$descriptorspec = [
0 => ["pipe", "r"],
1 => ["pipe", "w"],
2 => ["pipe", "w"],
];
$ptr = proc_open($jobConfig->cmd, $descriptorspec, $pipes, null, $_ENV);
if (!is_resource($ptr)) {
return JobResult::fatal("not enough FD or out of memory.");
}
stream_set_blocking($pipes[$fd_read], 0);
stream_set_blocking($pipes[$fd_err], 0);
$first_exitcode = proc_get_status($ptr)["exitcode"];
$terminated = $first_exitcode !== -1;
$nextProcessRuntime = $processRuntimeEvery = empty($this->runtimeProcessors)
? 0
: $jobConfig->processRuntimeEvery;
$maximumRuntime = $jobConfig->maxRuntime;
while (!feof($pipes[$fd_read]) || !feof($pipes[$fd_write])) {
ob_start();
if (strlen($buffer = fgets($pipes[$fd_read], $buf_size))) {
echo $buffer;
}
else if (strlen($errorBuffer = fgets($pipes[$fd_err], $buf_size))) {
echo "ERR: " . $errorBuffer;
}
else if ($terminated) {
break;
}
// Runtime calculation
$runtime = microtime(true) - $start;
// Call runtime processors
$breakProcess = false;
if ($nextProcessRuntime > 0 && $nextProcessRuntime < $runtime) {
foreach ($this->runtimeProcessors as $runtimeProcessor) {
try {
$breakProcess = $runtimeProcessor
->process($runtime, $runtime - $nextProcessRuntime, $jobConfig) === false;
if ($breakProcess) {
echo "[RUNTIME] runtime processor " . (string)$runtimeProcessor . " forces exit.";
break;
}
} catch (\Throwable $exception) {
echo "\n[RUNTIME] Exception caught: {$exception->getMessage()}\n";
}
}
// Calculate next due date
while ($nextProcessRuntime < $runtime) {
$nextProcessRuntime += $processRuntimeEvery;
}
}
// Terminate using max runtime config value
if (!$breakProcess && $maximumRuntime > 0 && $maximumRuntime < $runtime) {
echo "\n[RUNTIME] maximum runtime of {$maximumRuntime} seconds exceeded. Terminating process.\n";
$breakProcess = true;
}
$bufferedOutput = ob_get_flush();
if (!empty($bufferedOutput)) {
$output[] = $bufferedOutput;
}
else {
usleep(200);
}
if ($breakProcess) {
break;
}
$terminated = $terminated || (proc_get_status($ptr)["exitcode"] != -1);
}
foreach ($pipes as $pipe)
fclose($pipe);
/* Get the expected *exit* code to return the value */
$pstatus = proc_get_status($ptr);
if (!strlen($pstatus["exitcode"]) || $pstatus["running"]) {
/* we can trust the retval of proc_close() */
if ($pstatus["running"]) {
proc_terminate($ptr);
}
$ret = proc_close($ptr);
}
else {
if ((($first_exitcode + 256) % 256) == 255
&& (($pstatus["exitcode"] + 256) % 256) != 255
) {
$ret = $pstatus["exitcode"];
}
elseif (!strlen($first_exitcode)) {
$ret = $pstatus["exitcode"];
}
elseif ((($first_exitcode + 256) % 256) != 255) {
$ret = $first_exitcode;
}
else {
$ret = 0;
} /* we "deduce" an EXIT_SUCCESS ;) */
proc_close($ptr);
}
$exitCode = ($ret + 256) % 256;
$end = microtime(true);
// special case
if ($exitCode == 127) {
return JobResult::fatal("Command not found (returned by sh)", 127);
}
return new JobResult($exitCode, $start, $end, $output);
}
/**
* @param JobConfig $jobConfig
* @param JobResult $jobResult
*/
protected function callResultProcessor(JobConfig $jobConfig, JobResult $jobResult)
{
// Process results
foreach ($this->resultProcessors as $resultProcessor) {
try {
// run result processor.
if ($resultProcessor->process($jobResult, $jobConfig) === false) {
// break out for explicit false returns.
break;
}
} catch (\Throwable $exception) {
// We catch and hide all exceptions to guarantee execution of all processors
echo "\n[RESULT] Exception caught: {$exception->getMessage()}\n";
}
}
}
/**
* Retrieve an external iterator
*
* @link http://php.net/manual/en/iteratoraggregate.getiterator.php
* @return Traversable An instance of an object implementing <b>Iterator</b> or
* <b>Traversable</b>
* @since 5.0.0
*/
public function getIterator()
{
return new \ArrayIterator($this->jobsDictionary);
}
/**
* @return array
*/
public function toArray()
{
return [
'default' => $this->defaultConfig,
'jobs' => array_map(function(JobConfig $jobConfig) {
return $jobConfig->toArray();
}, $this->jobsDictionary),
];
}
/**
* Whether a offset exists
*
* @link http://php.net/manual/en/arrayaccess.offsetexists.php
*
* @param mixed $offset <p>
* An offset to check for.
* </p>
*
* @return boolean true on success or false on failure.
* </p>
* <p>
* The return value will be casted to boolean if non-boolean was returned.
* @since 5.0.0
*/
public function offsetExists($offset)
{
return isset($this->jobsDictionary[$offset]);
}
/**
* Offset to retrieve
*
* @link http://php.net/manual/en/arrayaccess.offsetget.php
*
* @param mixed $offset <p>
* The offset to retrieve.
* </p>
*
* @return mixed Can return all value types.
* @since 5.0.0
*/
public function offsetGet($offset)
{
return isset($this->jobsDictionary[$offset]) ? $this->jobsDictionary[$offset] : null;
}
/**
* Offset to set
*
* @link http://php.net/manual/en/arrayaccess.offsetset.php
*
* @param mixed $offset <p>
* The offset to assign the value to.
* </p>
* @param mixed $value <p>
* The value to set.
* </p>
*
* @return void
* @throws \TypeError
* @since 5.0.0
*/
public function offsetSet($offset, $value)
{
if ($value !== null && !($value instanceof JobConfig)) {
throw new \TypeError("expected JobConfig or null");
}
else {
$this->jobsDictionary[$offset] = $value;
}
}
/**
* Offset to unset
*
* @link http://php.net/manual/en/arrayaccess.offsetunset.php
*
* @param mixed $offset <p>
* The offset to unset.
* </p>
*
* @return void
* @since 5.0.0
*/
public function offsetUnset($offset)
{
unset($this->jobsDictionary[$offset]);
}
}