Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
resque-status / lib / resque / plugins / status.rb
Size: Mime:
module Resque
  module Plugins

    # Resque::Plugins::Status is a module you're jobs will include.
    # It provides helper methods for updating the status/etc from within an
    # instance as well as class methods for creating and queuing the jobs.
    #
    # All you have to do to get this functionality is include Resque::Plugins::Status
    # and then implement a <tt>perform<tt> method.
    #
    # For example
    #
    #       class ExampleJob
    #         include Resque::Plugins::Status
    #
    #         def perform
    #           num = options['num']
    #           i = 0
    #           while i < num
    #             i += 1
    #             at(i, num)
    #           end
    #           completed("Finished!")
    #         end
    #
    #       end
    #
    # This job would iterate num times updating the status as it goes. At the end
    # we update the status telling anyone listening to this job that its complete.
    module Status
      VERSION = '0.4.1'

      autoload :Hash, 'resque/plugins/status/hash'

      # The error class raised when a job is killed
      class Killed < RuntimeError; end
      class NotANumber < RuntimeError; end

      attr_reader :uuid, :options

      def self.included(base)
        base.extend(ClassMethods)
      end

      module ClassMethods

        # The default queue is :statused, this can be ovveridden in the specific job
        # class to put the jobs on a specific worker queue
        def queue
          :statused
        end

        # used when displaying the Job in the resque-web UI and identifiyng the job
        # type by status. By default this is the name of the job class, but can be
        # ovveridden in the specific job class to present a more user friendly job
        # name
        def name
          self.to_s
        end

        # Create is the primary method for adding jobs to the queue. This would be
        # called on the job class to create a job of that type. Any options passed are
        # passed to the Job instance as a hash of options. It returns the UUID of the
        # job.
        #
        # == Example:
        #
        #       class ExampleJob
        #         include Resque::Plugins::Status
        #
        #         def perform
        #           set_status "Hey I'm a job num #{options['num']}"
        #         end
        #
        #       end
        #
        #       job_id = ExampleJob.create(:num => 100)
        #
        def create(options = {})
          self.enqueue(self, options)
        end

        # Adds a job of type <tt>klass<tt> to the queue with <tt>options<tt>.
        #
        # Returns the UUID of the job if the job was queued, or nil if the job was
        # rejected by a before_enqueue hook.
        def enqueue(klass, options = {})
          self.enqueue_to(Resque.queue_from_class(klass) || queue, klass, options)
        end

        # Adds a job of type <tt>klass<tt> to a specified queue with <tt>options<tt>.
        #
        # Returns the UUID of the job if the job was queued, or nil if the job was
        # rejected by a before_enqueue hook.
        def enqueue_to(queue, klass, options = {})
          uuid = Resque::Plugins::Status::Hash.generate_uuid
          Resque::Plugins::Status::Hash.create uuid, :options => options

          if Resque.enqueue_to(queue, klass, uuid, options)
            uuid
          else
            Resque::Plugins::Status::Hash.remove(uuid)
            nil
          end
        end

        # Removes a job of type <tt>klass<tt> from the queue.
        #
        # The initially given options are retrieved from the status hash.
        # (Resque needs the options to find the correct queue entry)
        def dequeue(klass, uuid)
          status = Resque::Plugins::Status::Hash.get(uuid)
          Resque.dequeue(klass, uuid, status.options)
        end

        # This is the method called by Resque::Worker when processing jobs. It
        # creates a new instance of the job class and populates it with the uuid and
        # options.
        #
        # You should not override this method, rahter the <tt>perform</tt> instance method.
        def perform(uuid=nil, options = {})
          uuid ||= Resque::Plugins::Status::Hash.generate_uuid
          instance = new(uuid, options)
          instance.safe_perform!
          instance
        end

        # Wrapper API to forward a Resque::Job creation API call into a Resque::Plugins::Status call.
        # This is needed to be used with resque scheduler
        # http://github.com/bvandenbos/resque-scheduler
        def scheduled(queue, klass, *args)
          self.enqueue_to(queue, self, *args)
        end
      end

      # Create a new instance with <tt>uuid</tt> and <tt>options</tt>
      def initialize(uuid, options = {})
        @uuid    = uuid
        @options = options
      end

      # Run by the Resque::Worker when processing this job. It wraps the <tt>perform</tt>
      # method ensuring that the final status of the job is set regardless of error.
      # If an error occurs within the job's work, it will set the status as failed and
      # re-raise the error.
      def safe_perform!
        set_status({'status' => 'working'})
        perform
        if status && status.failed?
          on_failure(status.message) if respond_to?(:on_failure)
          return
        elsif status && !status.completed?
          completed
        end
        on_success if respond_to?(:on_success)
      rescue Killed
        Resque::Plugins::Status::Hash.killed(uuid)
        on_killed if respond_to?(:on_killed)
      rescue => e
        failed("The task failed because of an error: #{e}")
        if respond_to?(:on_failure)
          on_failure(e)
        else
          raise e
        end
      end

      # Set the jobs status. Can take an array of strings or hashes that are merged
      # (in order) into a final status hash.
      def status=(new_status)
        Resque::Plugins::Status::Hash.set(uuid, *new_status)
      end

      # get the Resque::Plugins::Status::Hash object for the current uuid
      def status
        Resque::Plugins::Status::Hash.get(uuid)
      end

      def name
        "#{self.class.name}(#{options.inspect unless options.empty?})"
      end

      # Checks against the kill list if this specific job instance should be killed
      # on the next iteration
      def should_kill?
        Resque::Plugins::Status::Hash.should_kill?(uuid)
      end

      # set the status of the job for the current itteration. <tt>num</tt> and
      # <tt>total</tt> are passed to the status as well as any messages.
      # This will kill the job if it has been added to the kill list with
      # <tt>Resque::Plugins::Status::Hash.kill()</tt>
      def at(num, total, *messages)
        if total.to_f <= 0.0
          raise(NotANumber, "Called at() with total=#{total} which is not a number")
        end
        tick({
          'num' => num,
          'total' => total
        }, *messages)
      end

      # sets the status of the job for the current itteration. You should use
      # the <tt>at</tt> method if you have actual numbers to track the iteration count.
      # This will kill the job if it has been added to the kill list with
      # <tt>Resque::Plugins::Status::Hash.kill()</tt>
      def tick(*messages)
        kill! if should_kill?
        set_status({'status' => 'working'}, *messages)
      end

      # set the status to 'failed' passing along any additional messages
      def failed(*messages)
        set_status({'status' => 'failed'}, *messages)
      end

      # set the status to 'completed' passing along any addional messages
      def completed(*messages)
        set_status({
          'status' => 'completed',
          'message' => "Completed at #{Time.now}"
        }, *messages)
      end

      # kill the current job, setting the status to 'killed' and raising <tt>Killed</tt>
      def kill!
        set_status({
          'status' => 'killed',
          'message' => "Killed at #{Time.now}"
        })
        raise Killed
      end

      private
      def set_status(*args)
        self.status = [status, {'name'  => self.name}, args].flatten
      end

    end
  end
end