Repository URL to install this package:
|
Version:
0.2.8 ▾
|
require 'spec_helper'
describe Rrq::Manager do
subject { Rrq::Manager.new($rrq_conn) }
before { subject }
describe :recover! do
it "should copy jobs from working partition over to queue" do
$rrq_conn.push("test_queue", 1, @payload)
$rrq_conn.push("test_queue", 2, @payload)
$rrq_conn.pop("test_queue")
$rrq_conn.pop("test_queue")
key1 = Rrq::Keys.queue("test_queue", 1)
key2 = Rrq::Keys.queue("test_queue", 2)
expect($redis.llen(key1)).to eq(0)
expect($redis.llen(key2)).to eq(0)
subject.recover!
expect($redis.llen(key1)).to eq(1)
expect($redis.llen(key2)).to eq(1)
end
it "should add parition back to all and active_partitions" do
$rrq_conn.push("test_queue", 1, @payload)
$rrq_conn.pop("test_queue")
$rrq_conn.redis do |r|
r.del("rrq:queues")
r.del("rrq:queue:test_queue:all_partitions")
r.del("rrq:queue:test_queue:active_partitions")
end
subject.recover!
$rrq_conn.redis do |r|
expect(r.smembers("rrq:queues")).to eq(["test_queue"])
expect(r.smembers("rrq:queue:test_queue:all_partitions")).to eq(["1"])
expect(r.lrange("rrq:queue:test_queue:active_partitions", 0, -1)).to eq(["1"])
end
end
end
describe :working_queues do
it "should return an array of hashs describing the working queues" do
$rrq_conn.push("test_queue", 1, @payload)
$rrq_conn.pop("test_queue")
expect(subject.working_queues).to eq([
{key: "rrq:queue:test_queue:1:working", queue: "test_queue", partition: "1"}
])
end
it "should find orphaned working queues" do
$rrq_conn.push("test_queue", 1, @payload)
$rrq_conn.pop("test_queue")
$rrq_conn.redis do |r|
r.del("rrq:queues")
r.del("rrq:queue:test_queue:all_partitions")
r.del("rrq:queue:test_queue:active_partitions")
end
expect(subject.working_queues).to eq([
{key: "rrq:queue:test_queue:1:working", queue: "test_queue", partition: "1"}
])
end
end
end