Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
<?php

/*
 * Copyright 2012 Johannes M. Schmitt <schmittjoh@gmail.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

namespace JMS\JobQueueBundle\Entity\Repository;

use Doctrine\Common\Collections\ArrayCollection;
use Doctrine\Common\Util\ClassUtils;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Types\Types;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\ORM\Query\Parameter;
use Doctrine\ORM\Query\ResultSetMappingBuilder;
use Doctrine\Persistence\ManagerRegistry;
use JMS\JobQueueBundle\Entity\Job;
use JMS\JobQueueBundle\Event\StateChangeEvent;
use JMS\JobQueueBundle\Exception\RuntimeException;
use JMS\JobQueueBundle\Retry\ExponentialRetryScheduler;
use JMS\JobQueueBundle\Retry\RetryScheduler;

use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;


class JobManager
{
    private $dispatcher;
    private $registry;
    private $retryScheduler;

    public function __construct(
        ManagerRegistry $managerRegistry,
        EventDispatcherInterface $eventDispatcher,
        RetryScheduler $retryScheduler
    ) {
        $this->registry = $managerRegistry;
        $this->dispatcher = $eventDispatcher;
        $this->retryScheduler = $retryScheduler;
    }

    public function findJob($command, array $args = array())
    {
        return $this->getJobManager()->createQuery(
            "SELECT j FROM JMSJobQueueBundle:Job j WHERE j.command = :command AND j.args = :args"
        )
            ->setParameter('command', $command)
            ->setParameter('args', $args, Types::JSON)
            ->setMaxResults(1)
            ->getOneOrNullResult();
    }

    public function getJob($command, array $args = array())
    {
        if (null !== $job = $this->findJob($command, $args)) {
            return $job;
        }

        throw new RuntimeException(
            sprintf('Found no job for command "%s" with args "%s".', $command, json_encode($args))
        );
    }

    public function getOrCreateIfNotExists($command, array $args = array())
    {
        if (null !== $job = $this->findJob($command, $args)) {
            return $job;
        }

        $job = new Job($command, $args, false);
        $this->getJobManager()->persist($job);
        $this->getJobManager()->flush();

        $firstJob = $this->getJobManager()->createQuery(
            "SELECT j FROM JMSJobQueueBundle:Job j WHERE j.command = :command AND j.args = :args ORDER BY j.id ASC"
        )
            ->setParameter('command', $command)
            ->setParameter('args', $args, Types::JSON)
            ->setMaxResults(1)
            ->getSingleResult();

        if ($firstJob === $job) {
            $job->setState(Job::STATE_PENDING);
            $this->getJobManager()->persist($job);
            $this->getJobManager()->flush();

            return $job;
        }

        $this->getJobManager()->remove($job);
        $this->getJobManager()->flush();

        return $firstJob;
    }

    public function findStartableJob(
        $workerName,
        array &$excludedIds = array(),
        $excludedQueues = array(),
        $restrictedQueues = array()
    ) {
        while (null !== $job = $this->findPendingJob($excludedIds, $excludedQueues, $restrictedQueues)) {
            if ($job->isStartable() && $this->acquireLock($workerName, $job)) {
                return $job;
            }

            $excludedIds[] = $job->getId();

            // We do not want to have non-startable jobs floating around in
            // cache as they might be changed by another process. So, better
            // re-fetch them when they are not excluded anymore.
            $this->detachJobFromManager($job);
        }

        return null;
    }

    private function acquireLock($workerName, Job $job)
    {
        $affectedRows = $this->getJobManager()->getConnection()->executeStatement(
            "UPDATE jms_jobs SET workerName = :worker WHERE id = :id AND workerName IS NULL",
            array(
                'worker' => $workerName,
                'id' => $job->getId(),
            )
        );

        if ($affectedRows > 0) {
            $job->setWorkerName($workerName);

            return true;
        }

        return false;
    }

    public function findAllForRelatedEntity($relatedEntity)
    {
        list($relClass, $relId) = $this->getRelatedEntityIdentifier($relatedEntity);

        $rsm = new ResultSetMappingBuilder($this->getJobManager());
        $rsm->addRootEntityFromClassMetadata(Job::class, 'j');

        return $this->getJobManager()->createNativeQuery(
            "SELECT j.* FROM jms_jobs j INNER JOIN jms_job_related_entities r ON r.job_id = j.id WHERE r.related_class = :relClass AND r.related_id = :relId",
            $rsm
        )
            ->setParameter('relClass', $relClass)
            ->setParameter('relId', $relId)
            ->getResult();
    }

    public function findOpenJobForRelatedEntity($command, $relatedEntity)
    {
        return $this->findJobForRelatedEntity(
            $command,
            $relatedEntity,
            array(Job::STATE_RUNNING, Job::STATE_PENDING, Job::STATE_NEW)
        );
    }

    public function findJobForRelatedEntity($command, $relatedEntity, array $states = array())
    {
        list($relClass, $relId) = $this->getRelatedEntityIdentifier($relatedEntity);

        $rsm = new ResultSetMappingBuilder($this->getJobManager());
        $rsm->addRootEntityFromClassMetadata(Job::class, 'j');

        $sql = "SELECT j.* FROM jms_jobs j INNER JOIN jms_job_related_entities r ON r.job_id = j.id WHERE r.related_class = :relClass AND r.related_id = :relId AND j.command = :command";
        $params = new ArrayCollection();
        $params->add(new Parameter('command', $command));
        $params->add(new Parameter('relClass', $relClass));
        $params->add(new Parameter('relId', $relId));

        if (!empty($states)) {
            $sql .= " AND j.state IN (:states)";
            $params->add(new Parameter('states', $states, Connection::PARAM_STR_ARRAY));
        }

        return $this->getJobManager()->createNativeQuery($sql, $rsm)
            ->setParameters($params)
            ->getOneOrNullResult();
    }

    private function getRelatedEntityIdentifier($entity)
    {
        if (!is_object($entity)) {
            throw new \RuntimeException('$entity must be an object.');
        }

        if ($entity instanceof \Doctrine\Common\Persistence\Proxy) {
            $entity->__load();
        }

        $relClass = ClassUtils::getClass($entity);
        $relId = $this->registry->getManagerForClass($relClass)->getMetadataFactory()
            ->getMetadataFor($relClass)->getIdentifierValues($entity);
        asort($relId);

        if (!$relId) {
            throw new \InvalidArgumentException(
                sprintf('The identifier for entity of class "%s" was empty.', $relClass)
            );
        }

        return array($relClass, json_encode($relId));
    }

    public function findPendingJob(
        array $excludedIds = array(),
        array $excludedQueues = array(),
        array $restrictedQueues = array()
    ) {
        $qb = $this->getJobManager()->createQueryBuilder();
        $qb->select('j')->from(Job::class, 'j')
            ->orderBy('j.priority', 'ASC')
            ->addOrderBy('j.id', 'ASC');

        $conditions = array();

        $conditions[] = $qb->expr()->isNull('j.workerName');

        $conditions[] = $qb->expr()->lt('j.executeAfter', ':now');
        $qb->setParameter(':now', new \DateTime(), 'datetime');

        $conditions[] = $qb->expr()->eq('j.state', ':state');
        $qb->setParameter('state', Job::STATE_PENDING);

        if (!empty($excludedIds)) {
            $conditions[] = $qb->expr()->notIn('j.id', ':excludedIds');
            $qb->setParameter('excludedIds', $excludedIds, Connection::PARAM_INT_ARRAY);
        }

        if (!empty($excludedQueues)) {
            $conditions[] = $qb->expr()->notIn('j.queue', ':excludedQueues');
            $qb->setParameter('excludedQueues', $excludedQueues, Connection::PARAM_STR_ARRAY);
        }

        if (!empty($restrictedQueues)) {
            $conditions[] = $qb->expr()->in('j.queue', ':restrictedQueues');
            $qb->setParameter('restrictedQueues', $restrictedQueues, Connection::PARAM_STR_ARRAY);
        }

        $qb->where(call_user_func_array(array($qb->expr(), 'andX'), $conditions));

        return $qb->getQuery()->setMaxResults(1)->getOneOrNullResult();
    }

    public function closeJob(Job $job, $finalState)
    {
        $this->getJobManager()->getConnection()->beginTransaction();
        try {
            $visited = array();
            $this->closeJobInternal($job, $finalState, $visited);
            $this->getJobManager()->flush();
            $this->getJobManager()->getConnection()->commit();

            // Clean-up entity manager to allow for garbage collection to kick in.
            foreach ($visited as $job) {
                // If the job is an original job which is now being retried, let's
                // not remove it just yet.
                if (!$job->isClosedNonSuccessful() || $job->isRetryJob()) {
                    continue;
                }

                $this->detachJobFromManager($job);
            }
        } catch (\Exception $ex) {
            $this->getJobManager()->getConnection()->rollback();

            throw $ex;
        }
    }

    /**
     * @param Job $job
     * @todo EntityManager::detach is marked as deprecated, but seems will be undeprecated https://github.com/doctrine/orm/pull/8466;
     * @todo After that, remove SYMFONY_DEPRECATIONS_HELPER env from phpunit.xml.dist
     */
    private function detachJobFromManager(Job $job): void
    {
        $this->getJobManager()->detach($job);
    }

    private function closeJobInternal(Job $job, $finalState, array &$visited = array())
    {
        if (in_array($job, $visited, true)) {
            return;
        }
        $visited[] = $job;

        if ($job->isInFinalState()) {
            return;
        }

        if (null !== $this->dispatcher && ($job->isRetryJob() || 0 === count($job->getRetryJobs()))) {
            $event = new StateChangeEvent($job, $finalState);
            $this->dispatcher->dispatch($event, 'jms_job_queue.job_state_change');
            $finalState = $event->getNewState();
        }

        switch ($finalState) {
            case Job::STATE_CANCELED:
                $job->setState(Job::STATE_CANCELED);
                $this->getJobManager()->persist($job);

                if ($job->isRetryJob()) {
                    $this->closeJobInternal($job->getOriginalJob(), Job::STATE_CANCELED, $visited);

                    return;
                }

                foreach ($this->findIncomingDependencies($job) as $dep) {
                    $this->closeJobInternal($dep, Job::STATE_CANCELED, $visited);
                }

                return;

            case Job::STATE_FAILED:
            case Job::STATE_TERMINATED:
            case Job::STATE_INCOMPLETE:
                if ($job->isRetryJob()) {
                    $job->setState($finalState);
                    $this->getJobManager()->persist($job);

                    $this->closeJobInternal($job->getOriginalJob(), $finalState);

                    return;
                }

                // The original job has failed, and we are allowed to retry it.
                if ($job->isRetryAllowed()) {
                    $retryJob = new Job(
                        $job->getCommand(), $job->getArgs(), true, $job->getQueue(), $job->getPriority()
                    );
                    $retryJob->setMaxRuntime($job->getMaxRuntime());

                    if ($this->retryScheduler === null) {
                        $this->retryScheduler = new ExponentialRetryScheduler(5);
                    }

                    $retryJob->setExecuteAfter($this->retryScheduler->scheduleNextRetry($job));

                    $job->addRetryJob($retryJob);
                    $this->getJobManager()->persist($retryJob);
                    $this->getJobManager()->persist($job);

                    return;
                }

                $job->setState($finalState);
                $this->getJobManager()->persist($job);

                // The original job has failed, and no retries are allowed.
                foreach ($this->findIncomingDependencies($job) as $dep) {
                    // This is a safe-guard to avoid blowing up if there is a database inconsistency.
                    if (!$dep->isPending() && !$dep->isNew()) {
                        continue;
                    }

                    $this->closeJobInternal($dep, Job::STATE_CANCELED, $visited);
                }

                return;

            case Job::STATE_FINISHED:
                if ($job->isRetryJob()) {
                    $job->getOriginalJob()->setState($finalState);
                    $this->getJobManager()->persist($job->getOriginalJob());
                }
                $job->setState($finalState);
                $this->getJobManager()->persist($job);

                return;

            default:
                throw new \LogicException(sprintf('Non allowed state "%s" in closeJobInternal().', $finalState));
        }
    }

    /**
     * @return Job[]
     */
    public function findIncomingDependencies(Job $job)
    {
        $jobIds = $this->getJobIdsOfIncomingDependencies($job);
        if (empty($jobIds)) {
            return array();
        }

        return $this->getJobManager()->createQuery(
            "SELECT j, d FROM JMSJobQueueBundle:Job j LEFT JOIN j.dependencies d WHERE j.id IN (:ids)"
        )
            ->setParameter('ids', $jobIds)
            ->getResult();
    }

    /**
     * @return Job[]
     */
    public function getIncomingDependencies(Job $job)
    {
        $jobIds = $this->getJobIdsOfIncomingDependencies($job);
        if (empty($jobIds)) {
            return array();
        }

        return $this->getJobManager()->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.id IN (:ids)")
            ->setParameter('ids', $jobIds)
            ->getResult();
    }

    private function getJobIdsOfIncomingDependencies(Job $job)
    {
        $jobIds = $this->getJobManager()->getConnection()
            ->executeQuery(
                "SELECT source_job_id FROM jms_job_dependencies WHERE dest_job_id = :id",
                array('id' => $job->getId())
            )
            ->fetchAll(\PDO::FETCH_COLUMN);

        return $jobIds;
    }

    public function findLastJobsWithError($nbJobs = 10)
    {
        return $this->getJobManager()->createQuery(
            "SELECT j FROM JMSJobQueueBundle:Job j WHERE j.state IN (:errorStates) AND j.originalJob IS NULL ORDER BY j.closedAt DESC"
        )
            ->setParameter('errorStates', array(Job::STATE_TERMINATED, Job::STATE_FAILED))
            ->setMaxResults($nbJobs)
            ->getResult();
    }

    public function getAvailableQueueList()
    {
        $queues = $this->getJobManager()->createQuery(
            "SELECT DISTINCT j.queue FROM JMSJobQueueBundle:Job j WHERE j.state IN (:availableStates)  GROUP BY j.queue"
        )
            ->setParameter('availableStates', array(Job::STATE_RUNNING, Job::STATE_NEW, Job::STATE_PENDING))
            ->getResult();

        $newQueueArray = array();

        foreach ($queues as $queue) {
            $newQueue = $queue['queue'];
            $newQueueArray[] = $newQueue;
        }

        return $newQueueArray;
    }


    public function getAvailableJobsForQueueCount($jobQueue)
    {
        $result = $this->getJobManager()->createQuery(
            "SELECT j.queue FROM JMSJobQueueBundle:Job j WHERE j.state IN (:availableStates) AND j.queue = :queue"
        )
            ->setParameter('availableStates', array(Job::STATE_RUNNING, Job::STATE_NEW, Job::STATE_PENDING))
            ->setParameter('queue', $jobQueue)
            ->setMaxResults(1)
            ->getOneOrNullResult();

        return count($result);
    }

    private function getJobManager(): EntityManagerInterface
    {
        return $this->registry->getManagerForClass(Job::class);
    }
}