Repository URL to install this package:
|
Version:
6.2.3 ▾
|
<?php
/*
* Copyright 2012 Johannes M. Schmitt <schmittjoh@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace JMS\JobQueueBundle\Entity\Repository;
use Doctrine\Common\Collections\ArrayCollection;
use Doctrine\Common\Util\ClassUtils;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Types\Types;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\ORM\Query\Parameter;
use Doctrine\ORM\Query\ResultSetMappingBuilder;
use Doctrine\Persistence\ManagerRegistry;
use JMS\JobQueueBundle\Entity\Job;
use JMS\JobQueueBundle\Event\StateChangeEvent;
use JMS\JobQueueBundle\Exception\RuntimeException;
use JMS\JobQueueBundle\Retry\ExponentialRetryScheduler;
use JMS\JobQueueBundle\Retry\RetryScheduler;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
class JobManager
{
private $dispatcher;
private $registry;
private $retryScheduler;
public function __construct(
ManagerRegistry $managerRegistry,
EventDispatcherInterface $eventDispatcher,
RetryScheduler $retryScheduler
) {
$this->registry = $managerRegistry;
$this->dispatcher = $eventDispatcher;
$this->retryScheduler = $retryScheduler;
}
public function findJob($command, array $args = array())
{
return $this->getJobManager()->createQuery(
"SELECT j FROM JMSJobQueueBundle:Job j WHERE j.command = :command AND j.args = :args"
)
->setParameter('command', $command)
->setParameter('args', $args, Types::JSON)
->setMaxResults(1)
->getOneOrNullResult();
}
public function getJob($command, array $args = array())
{
if (null !== $job = $this->findJob($command, $args)) {
return $job;
}
throw new RuntimeException(
sprintf('Found no job for command "%s" with args "%s".', $command, json_encode($args))
);
}
public function getOrCreateIfNotExists($command, array $args = array())
{
if (null !== $job = $this->findJob($command, $args)) {
return $job;
}
$job = new Job($command, $args, false);
$this->getJobManager()->persist($job);
$this->getJobManager()->flush();
$firstJob = $this->getJobManager()->createQuery(
"SELECT j FROM JMSJobQueueBundle:Job j WHERE j.command = :command AND j.args = :args ORDER BY j.id ASC"
)
->setParameter('command', $command)
->setParameter('args', $args, Types::JSON)
->setMaxResults(1)
->getSingleResult();
if ($firstJob === $job) {
$job->setState(Job::STATE_PENDING);
$this->getJobManager()->persist($job);
$this->getJobManager()->flush();
return $job;
}
$this->getJobManager()->remove($job);
$this->getJobManager()->flush();
return $firstJob;
}
public function findStartableJob(
$workerName,
array &$excludedIds = array(),
$excludedQueues = array(),
$restrictedQueues = array()
) {
while (null !== $job = $this->findPendingJob($excludedIds, $excludedQueues, $restrictedQueues)) {
if ($job->isStartable() && $this->acquireLock($workerName, $job)) {
return $job;
}
$excludedIds[] = $job->getId();
// We do not want to have non-startable jobs floating around in
// cache as they might be changed by another process. So, better
// re-fetch them when they are not excluded anymore.
$this->detachJobFromManager($job);
}
return null;
}
private function acquireLock($workerName, Job $job)
{
$affectedRows = $this->getJobManager()->getConnection()->executeStatement(
"UPDATE jms_jobs SET workerName = :worker WHERE id = :id AND workerName IS NULL",
array(
'worker' => $workerName,
'id' => $job->getId(),
)
);
if ($affectedRows > 0) {
$job->setWorkerName($workerName);
return true;
}
return false;
}
public function findAllForRelatedEntity($relatedEntity)
{
list($relClass, $relId) = $this->getRelatedEntityIdentifier($relatedEntity);
$rsm = new ResultSetMappingBuilder($this->getJobManager());
$rsm->addRootEntityFromClassMetadata(Job::class, 'j');
return $this->getJobManager()->createNativeQuery(
"SELECT j.* FROM jms_jobs j INNER JOIN jms_job_related_entities r ON r.job_id = j.id WHERE r.related_class = :relClass AND r.related_id = :relId",
$rsm
)
->setParameter('relClass', $relClass)
->setParameter('relId', $relId)
->getResult();
}
public function findOpenJobForRelatedEntity($command, $relatedEntity)
{
return $this->findJobForRelatedEntity(
$command,
$relatedEntity,
array(Job::STATE_RUNNING, Job::STATE_PENDING, Job::STATE_NEW)
);
}
public function findJobForRelatedEntity($command, $relatedEntity, array $states = array())
{
list($relClass, $relId) = $this->getRelatedEntityIdentifier($relatedEntity);
$rsm = new ResultSetMappingBuilder($this->getJobManager());
$rsm->addRootEntityFromClassMetadata(Job::class, 'j');
$sql = "SELECT j.* FROM jms_jobs j INNER JOIN jms_job_related_entities r ON r.job_id = j.id WHERE r.related_class = :relClass AND r.related_id = :relId AND j.command = :command";
$params = new ArrayCollection();
$params->add(new Parameter('command', $command));
$params->add(new Parameter('relClass', $relClass));
$params->add(new Parameter('relId', $relId));
if (!empty($states)) {
$sql .= " AND j.state IN (:states)";
$params->add(new Parameter('states', $states, Connection::PARAM_STR_ARRAY));
}
return $this->getJobManager()->createNativeQuery($sql, $rsm)
->setParameters($params)
->getOneOrNullResult();
}
private function getRelatedEntityIdentifier($entity)
{
if (!is_object($entity)) {
throw new \RuntimeException('$entity must be an object.');
}
if ($entity instanceof \Doctrine\Common\Persistence\Proxy) {
$entity->__load();
}
$relClass = ClassUtils::getClass($entity);
$relId = $this->registry->getManagerForClass($relClass)->getMetadataFactory()
->getMetadataFor($relClass)->getIdentifierValues($entity);
asort($relId);
if (!$relId) {
throw new \InvalidArgumentException(
sprintf('The identifier for entity of class "%s" was empty.', $relClass)
);
}
return array($relClass, json_encode($relId));
}
public function findPendingJob(
array $excludedIds = array(),
array $excludedQueues = array(),
array $restrictedQueues = array()
) {
$qb = $this->getJobManager()->createQueryBuilder();
$qb->select('j')->from(Job::class, 'j')
->orderBy('j.priority', 'ASC')
->addOrderBy('j.id', 'ASC');
$conditions = array();
$conditions[] = $qb->expr()->isNull('j.workerName');
$conditions[] = $qb->expr()->lt('j.executeAfter', ':now');
$qb->setParameter(':now', new \DateTime(), 'datetime');
$conditions[] = $qb->expr()->eq('j.state', ':state');
$qb->setParameter('state', Job::STATE_PENDING);
if (!empty($excludedIds)) {
$conditions[] = $qb->expr()->notIn('j.id', ':excludedIds');
$qb->setParameter('excludedIds', $excludedIds, Connection::PARAM_INT_ARRAY);
}
if (!empty($excludedQueues)) {
$conditions[] = $qb->expr()->notIn('j.queue', ':excludedQueues');
$qb->setParameter('excludedQueues', $excludedQueues, Connection::PARAM_STR_ARRAY);
}
if (!empty($restrictedQueues)) {
$conditions[] = $qb->expr()->in('j.queue', ':restrictedQueues');
$qb->setParameter('restrictedQueues', $restrictedQueues, Connection::PARAM_STR_ARRAY);
}
$qb->where(call_user_func_array(array($qb->expr(), 'andX'), $conditions));
return $qb->getQuery()->setMaxResults(1)->getOneOrNullResult();
}
public function closeJob(Job $job, $finalState)
{
$this->getJobManager()->getConnection()->beginTransaction();
try {
$visited = array();
$this->closeJobInternal($job, $finalState, $visited);
$this->getJobManager()->flush();
$this->getJobManager()->getConnection()->commit();
// Clean-up entity manager to allow for garbage collection to kick in.
foreach ($visited as $job) {
// If the job is an original job which is now being retried, let's
// not remove it just yet.
if (!$job->isClosedNonSuccessful() || $job->isRetryJob()) {
continue;
}
$this->detachJobFromManager($job);
}
} catch (\Exception $ex) {
$this->getJobManager()->getConnection()->rollback();
throw $ex;
}
}
/**
* @param Job $job
* @todo EntityManager::detach is marked as deprecated, but seems will be undeprecated https://github.com/doctrine/orm/pull/8466;
* @todo After that, remove SYMFONY_DEPRECATIONS_HELPER env from phpunit.xml.dist
*/
private function detachJobFromManager(Job $job): void
{
$this->getJobManager()->detach($job);
}
private function closeJobInternal(Job $job, $finalState, array &$visited = array())
{
if (in_array($job, $visited, true)) {
return;
}
$visited[] = $job;
if ($job->isInFinalState()) {
return;
}
if (null !== $this->dispatcher && ($job->isRetryJob() || 0 === count($job->getRetryJobs()))) {
$event = new StateChangeEvent($job, $finalState);
$this->dispatcher->dispatch($event, 'jms_job_queue.job_state_change');
$finalState = $event->getNewState();
}
switch ($finalState) {
case Job::STATE_CANCELED:
$job->setState(Job::STATE_CANCELED);
$this->getJobManager()->persist($job);
if ($job->isRetryJob()) {
$this->closeJobInternal($job->getOriginalJob(), Job::STATE_CANCELED, $visited);
return;
}
foreach ($this->findIncomingDependencies($job) as $dep) {
$this->closeJobInternal($dep, Job::STATE_CANCELED, $visited);
}
return;
case Job::STATE_FAILED:
case Job::STATE_TERMINATED:
case Job::STATE_INCOMPLETE:
if ($job->isRetryJob()) {
$job->setState($finalState);
$this->getJobManager()->persist($job);
$this->closeJobInternal($job->getOriginalJob(), $finalState);
return;
}
// The original job has failed, and we are allowed to retry it.
if ($job->isRetryAllowed()) {
$retryJob = new Job(
$job->getCommand(), $job->getArgs(), true, $job->getQueue(), $job->getPriority()
);
$retryJob->setMaxRuntime($job->getMaxRuntime());
if ($this->retryScheduler === null) {
$this->retryScheduler = new ExponentialRetryScheduler(5);
}
$retryJob->setExecuteAfter($this->retryScheduler->scheduleNextRetry($job));
$job->addRetryJob($retryJob);
$this->getJobManager()->persist($retryJob);
$this->getJobManager()->persist($job);
return;
}
$job->setState($finalState);
$this->getJobManager()->persist($job);
// The original job has failed, and no retries are allowed.
foreach ($this->findIncomingDependencies($job) as $dep) {
// This is a safe-guard to avoid blowing up if there is a database inconsistency.
if (!$dep->isPending() && !$dep->isNew()) {
continue;
}
$this->closeJobInternal($dep, Job::STATE_CANCELED, $visited);
}
return;
case Job::STATE_FINISHED:
if ($job->isRetryJob()) {
$job->getOriginalJob()->setState($finalState);
$this->getJobManager()->persist($job->getOriginalJob());
}
$job->setState($finalState);
$this->getJobManager()->persist($job);
return;
default:
throw new \LogicException(sprintf('Non allowed state "%s" in closeJobInternal().', $finalState));
}
}
/**
* @return Job[]
*/
public function findIncomingDependencies(Job $job)
{
$jobIds = $this->getJobIdsOfIncomingDependencies($job);
if (empty($jobIds)) {
return array();
}
return $this->getJobManager()->createQuery(
"SELECT j, d FROM JMSJobQueueBundle:Job j LEFT JOIN j.dependencies d WHERE j.id IN (:ids)"
)
->setParameter('ids', $jobIds)
->getResult();
}
/**
* @return Job[]
*/
public function getIncomingDependencies(Job $job)
{
$jobIds = $this->getJobIdsOfIncomingDependencies($job);
if (empty($jobIds)) {
return array();
}
return $this->getJobManager()->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.id IN (:ids)")
->setParameter('ids', $jobIds)
->getResult();
}
private function getJobIdsOfIncomingDependencies(Job $job)
{
$jobIds = $this->getJobManager()->getConnection()
->executeQuery(
"SELECT source_job_id FROM jms_job_dependencies WHERE dest_job_id = :id",
array('id' => $job->getId())
)
->fetchAll(\PDO::FETCH_COLUMN);
return $jobIds;
}
public function findLastJobsWithError($nbJobs = 10)
{
return $this->getJobManager()->createQuery(
"SELECT j FROM JMSJobQueueBundle:Job j WHERE j.state IN (:errorStates) AND j.originalJob IS NULL ORDER BY j.closedAt DESC"
)
->setParameter('errorStates', array(Job::STATE_TERMINATED, Job::STATE_FAILED))
->setMaxResults($nbJobs)
->getResult();
}
public function getAvailableQueueList()
{
$queues = $this->getJobManager()->createQuery(
"SELECT DISTINCT j.queue FROM JMSJobQueueBundle:Job j WHERE j.state IN (:availableStates) GROUP BY j.queue"
)
->setParameter('availableStates', array(Job::STATE_RUNNING, Job::STATE_NEW, Job::STATE_PENDING))
->getResult();
$newQueueArray = array();
foreach ($queues as $queue) {
$newQueue = $queue['queue'];
$newQueueArray[] = $newQueue;
}
return $newQueueArray;
}
public function getAvailableJobsForQueueCount($jobQueue)
{
$result = $this->getJobManager()->createQuery(
"SELECT j.queue FROM JMSJobQueueBundle:Job j WHERE j.state IN (:availableStates) AND j.queue = :queue"
)
->setParameter('availableStates', array(Job::STATE_RUNNING, Job::STATE_NEW, Job::STATE_PENDING))
->setParameter('queue', $jobQueue)
->setMaxResults(1)
->getOneOrNullResult();
return count($result);
}
private function getJobManager(): EntityManagerInterface
{
return $this->registry->getManagerForClass(Job::class);
}
}