Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
rrq / lib / rrq / connection.rb
Size: Mime:
module Rrq
  class Connection
    def initialize
      reset!
    end

    def redis(&block)
      raise "You must provide a redis connection" unless @redis
      block.call(@redis)
    end

    def redis=(conn)
      @redis = conn
    end

    def reset!
      @queues = {}
      @scripts = {
        bulk_push: Rrq::Scripts::BulkPush.new(self),
        push: Rrq::Scripts::Push.new(self),
        pop: Rrq::Scripts::Pop.new(self)
      }
    end

    def push(queue_name, partition, payload)
      json = prepare_message(queue_name, partition, payload)
      @scripts[:push].perform(queue_name, partition, json)
    end

    def bulk_push(queue_name, partition, payloads)
      jsons = Array(payloads).map do |payload|
        prepare_message(queue_name, partition, payload)
      end
      @scripts[:bulk_push].perform(queue_name, partition, jsons)
    end

    def pop(queue_name = nil)
      @scripts[:pop].perform(queue_name)
    end

    def ack(message)
      redis do |r|
        key = Rrq::Keys.working_queue(message["queue"], message["partition"])
        r.lrem(key, 1, JSON.dump(message))
      end
    end

    def all_reports
      hash = {count: 0, queues: {}}

      all_queues.each do |queue_name|
        queue_hash = report(queue_name)

        hash[:count] += queue_hash[:count]
        hash[:queues][queue_name] = queue_hash[:count]
      end

      hash
    end

    def report(queue_name)
      hash = {count: 0, partitions: {}}
      all_partitions = all_partitions_for_queue(queue_name)

      redis do |conn|
        conn.pipelined do 
          all_partitions.each do |partition|
            hash[:partitions][partition] = conn.llen(Rrq::Keys.queue(queue_name, partition))
          end
        end
      end

      hash[:partitions].each do |partition, count|
        count = count.value

        hash[:partitions][partition] = count
        hash[:count] += count
      end

      hash
    end

    private

    def new_message_id
      Digest::SHA256.hexdigest(Time.now.to_f.to_s)
    end

    def prepare_message(queue_name, partition, payload)
      message = {
        message_id: new_message_id,
        queue: queue_name,
        partition: partition,
        payload: payload
      }
      JSON.dump(message)
    end

    def all_partitions_for_queue(queue_name)
      all_partitions = redis { |conn| conn.smembers Rrq::Keys.partitions_for(queue_name) }
      Array(all_partitions).uniq
    end

    def all_queues
      all_queues = redis { |conn| conn.smembers Rrq::Keys.queues }
      Array(all_queues).uniq
    end
  end
end