Repository URL to install this package:
|
Version:
0.2.8 ▾
|
require 'spec_helper'
describe Rrq::Connection do
subject { $rrq_conn }
before do
@payload = {
"test" => 123
}
end
describe :push do
it "should enqueue a message" do
$rrq_conn.push("test_queue", 1, @payload)
message = $rrq_conn.redis do |r|
key = Rrq::Keys.queue("test_queue", 1)
JSON.parse(r.lpop(key))
end
expect(message["payload"]).to eq(@payload)
end
it "should generate a message id" do
$rrq_conn.push("test_queue", 1, @payload)
key = Rrq::Keys.queue("test_queue", 1)
message = JSON.parse($redis.lpop(key))
expect(message["message_id"]).to_not be_empty
end
it "should not add duplicates to the active partitions list" do
$rrq_conn.redis do |r|
expect(r.lrange(Rrq::Keys.active_partitions_for("test_queue"), 0, -1)).to be_empty
end
2.times { $rrq_conn.push("test_queue", 1, @payload) }
$rrq_conn.redis do |r|
expect(r.lrange(Rrq::Keys.active_partitions_for("test_queue"), 0, -1)).to eq(["1"])
end
end
end
describe :bulk_push do
it "should enqueue multiple messages" do
ar = []
3.times { ar << @payload.dup }
$rrq_conn.bulk_push("test_queue", 1, ar)
$rrq_conn.redis do |r|
key = Rrq::Keys.queue("test_queue", 1)
r.llen(key).should == 3
3.times do
message = JSON.parse(r.lpop(key))
expect(message["payload"]).to eq(@payload)
end
end
end
it "should add a queue to active paritions" do
$rrq_conn.redis do |r|
expect(r.smembers(Rrq::Keys.partitions_for("test_queue"))).to be_empty
expect(r.lrange(Rrq::Keys.active_partitions_for("test_queue"), 0, -1)).to be_empty
end
ar = 3.times.to_a.map { @payload.dup }
$rrq_conn.bulk_push("test_queue", 1, ar)
$rrq_conn.redis do |r|
expect(r.smembers(Rrq::Keys.partitions_for("test_queue"))).to eq(["1"])
expect(r.lrange(Rrq::Keys.active_partitions_for("test_queue"), 0, -1)).to eq(["1"])
end
end
it "should not add duplicates to the active partitions list" do
$rrq_conn.redis do |r|
expect(r.lrange(Rrq::Keys.active_partitions_for("test_queue"), 0, -1)).to be_empty
end
ar = 3.times.to_a.map { @payload.dup }
2.times { $rrq_conn.bulk_push("test_queue", 1, ar) }
$rrq_conn.redis do |r|
expect(r.lrange(Rrq::Keys.active_partitions_for("test_queue"), 0, -1)).to eq(["1"])
end
end
end
describe :pop do
it "should return the message" do
$rrq_conn.push("test_queue", 1, @payload)
message = $rrq_conn.pop("test_queue")
expect(message["payload"]).to eq(@payload)
end
it "should allow you to not have to specify a queue name" do
$rrq_conn.push("test_queue", 1, @payload)
message = $rrq_conn.pop
expect(message["payload"]).to eq(@payload)
end
it "should add the message to the partition working queue" do
$rrq_conn.push("test_queue", 1, @payload)
working_key = Rrq::Keys.working_queue("test_queue", 1)
expect($redis.llen(working_key)).to eq(0)
message = $rrq_conn.pop("test_queue")
expect($redis.llen(working_key)).to eq(1)
working_message = $redis.lpop(working_key)
expect(working_message).to eq(JSON.dump(message))
end
it "should remove a partition from active_partitions if return message is blank" do
$rrq_conn.push("test_queue", 1, @payload)
expect($redis.lrange(Rrq::Keys.active_partitions_for("test_queue"), 0, -1)).to eq(["1"])
2.times { $rrq_conn.pop("test_queue") }
expect($redis.lrange(Rrq::Keys.active_partitions_for("test_queue"), 0, -1)).to eq([])
end
end
describe :ack do
it "should remove a message from the working queue" do
$rrq_conn.push("test_queue", 1, @payload)
message = $rrq_conn.pop("test_queue")
working_key = Rrq::Keys.working_queue("test_queue", 1)
expect($redis.llen(working_key)).to eq(1)
$rrq_conn.ack(message)
expect($redis.llen(working_key)).to eq(0)
end
end
describe "reports" do
before do
one_hundred_items = 100.times.to_a.map { {} }
$rrq_conn.bulk_push("test_queue", 1, one_hundred_items[0...20])
$rrq_conn.bulk_push("test_queue", 2, one_hundred_items[0...41])
$rrq_conn.bulk_push("test_queue", 3, one_hundred_items[0...88])
$rrq_conn.bulk_push("another_queue", 1, one_hundred_items[0...12])
end
describe :report do
it "should return a hash containing the total count of all partitions" do
expect(subject.report("test_queue")[:count]).to eq(149)
expect(subject.report("another_queue")[:count]).to eq(12)
end
it "should return a hash containing the count of each partitions" do
expect(subject.report("test_queue")[:partitions]).to eq({
"1" => 20,
"2" => 41,
"3" => 88
})
expect(subject.report("another_queue")[:partitions]).to eq({
"1" => 12
})
end
end
describe :all_report do
it "should return a hash containing the total count of all queues" do
expect(subject.all_reports[:count]).to eq(161)
end
it "should return a hash containing the count of each queue" do
expect(subject.all_reports[:queues]).to eq({
"test_queue" => 149,
"another_queue" => 12
})
end
end
end
end