Repository URL to install this package:
|
Version:
0.2.10 ▾
|
module Rrq
class Connection
def initialize
reset!
end
def redis(&block)
raise "You must provide a redis connection" unless @redis
block.call(@redis)
end
def redis=(conn)
@redis = conn
end
def reset!
@queues = {}
@scripts = {
bulk_push: Rrq::Scripts::BulkPush.new(self),
push: Rrq::Scripts::Push.new(self),
pop: Rrq::Scripts::Pop.new(self)
}
end
def push(queue_name, partition, payload)
json = prepare_message(queue_name, partition, payload)
@scripts[:push].perform(queue_name, partition, json)
end
def bulk_push(queue_name, partition, payloads)
jsons = Array(payloads).map do |payload|
prepare_message(queue_name, partition, payload)
end
@scripts[:bulk_push].perform(queue_name, partition, jsons)
end
def pop(queue_name = nil)
@scripts[:pop].perform(queue_name)
end
def ack(message)
redis do |r|
key = Rrq::Keys.working_queue(message["queue"], message["partition"])
r.lrem(key, 1, JSON.dump(message))
end
end
def all_reports
hash = {count: 0, queues: {}}
all_queues.each do |queue_name|
queue_hash = report(queue_name)
hash[:count] += queue_hash[:count]
hash[:queues][queue_name] = queue_hash[:count]
end
hash
end
def report(queue_name)
hash = {count: 0, partitions: {}}
all_partitions = all_partitions_for_queue(queue_name)
redis do |conn|
conn.pipelined do
all_partitions.each do |partition|
hash[:partitions][partition] = conn.llen(Rrq::Keys.queue(queue_name, partition))
end
end
end
hash[:partitions].each do |partition, count|
count = count.value
hash[:partitions][partition] = count
hash[:count] += count
end
hash
end
private
def new_message_id
Digest::SHA256.hexdigest(Time.now.to_f.to_s)
end
def prepare_message(queue_name, partition, payload)
message = {
message_id: new_message_id,
queue: queue_name,
partition: partition,
payload: payload
}
JSON.dump(message)
end
def all_partitions_for_queue(queue_name)
all_partitions = redis { |conn| conn.smembers Rrq::Keys.partitions_for(queue_name) }
Array(all_partitions).uniq
end
def all_queues
all_queues = redis { |conn| conn.smembers Rrq::Keys.queues }
Array(all_queues).uniq
end
end
end