Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
rrq / spec / lib / connection_spec.rb
Size: Mime:
require 'spec_helper'

describe Rrq::Connection do
  subject { $rrq_conn }

  before do
    @payload = {
      "test" => 123
    }
  end

  describe :push do
    it "should enqueue a message" do 
      $rrq_conn.push("test_queue", 1, @payload)

      message = $rrq_conn.redis do |r|
        key = Rrq::Keys.queue("test_queue", 1)        
        JSON.parse(r.lpop(key)) 
      end

      expect(message["payload"]).to eq(@payload)
    end

    it "should generate a message id" do 
      $rrq_conn.push("test_queue", 1, @payload)

      key = Rrq::Keys.queue("test_queue", 1)
      message = JSON.parse($redis.lpop(key))
      
      expect(message["message_id"]).to_not be_empty
    end

    it "should not add duplicates to the active partitions list" do 
      $rrq_conn.redis do |r|
        expect(r.lrange(Rrq::Keys.active_partitions_for("test_queue"), 0, -1)).to be_empty
      end
      2.times { $rrq_conn.push("test_queue", 1, @payload) }

      $rrq_conn.redis do |r|
        expect(r.lrange(Rrq::Keys.active_partitions_for("test_queue"), 0, -1)).to eq(["1"])
      end
    end
  end

  describe :bulk_push do 
    it "should enqueue multiple messages" do 
      ar = []
      3.times { ar << @payload.dup }
      $rrq_conn.bulk_push("test_queue", 1, ar)

      $rrq_conn.redis do |r|
        key = Rrq::Keys.queue("test_queue", 1)
        r.llen(key).should == 3

        3.times do 
          message = JSON.parse(r.lpop(key))
          expect(message["payload"]).to eq(@payload)
        end
      end
    end

    it "should add a queue to active paritions" do 
      $rrq_conn.redis do |r|
        expect(r.smembers(Rrq::Keys.partitions_for("test_queue"))).to be_empty
        expect(r.lrange(Rrq::Keys.active_partitions_for("test_queue"), 0, -1)).to be_empty
      end

      ar = 3.times.to_a.map { @payload.dup }
      $rrq_conn.bulk_push("test_queue", 1, ar)

      $rrq_conn.redis do |r|
        expect(r.smembers(Rrq::Keys.partitions_for("test_queue"))).to eq(["1"])
        expect(r.lrange(Rrq::Keys.active_partitions_for("test_queue"), 0, -1)).to eq(["1"])
      end
    end

    it "should not add duplicates to the active partitions list" do 
      $rrq_conn.redis do |r|
        expect(r.lrange(Rrq::Keys.active_partitions_for("test_queue"), 0, -1)).to be_empty
      end

      ar = 3.times.to_a.map { @payload.dup }
      2.times { $rrq_conn.bulk_push("test_queue", 1, ar) }

      $rrq_conn.redis do |r|
        expect(r.lrange(Rrq::Keys.active_partitions_for("test_queue"), 0, -1)).to eq(["1"])
      end
    end
  end

  describe :pop do
    it "should return the message" do 
      $rrq_conn.push("test_queue", 1, @payload)

      message = $rrq_conn.pop("test_queue")
      expect(message["payload"]).to eq(@payload)
    end

    it "should allow you to not have to specify a queue name" do 
      $rrq_conn.push("test_queue", 1, @payload)
      message = $rrq_conn.pop

      expect(message["payload"]).to eq(@payload)
    end

    it "should add the message to the partition working queue" do 
      $rrq_conn.push("test_queue", 1, @payload)

      working_key = Rrq::Keys.working_queue("test_queue", 1)

      expect($redis.llen(working_key)).to eq(0)

      message = $rrq_conn.pop("test_queue")

      expect($redis.llen(working_key)).to eq(1)

      working_message = $redis.lpop(working_key)
      expect(working_message).to eq(JSON.dump(message))
    end

    it "should remove a partition from active_partitions if return message is blank" do 
      $rrq_conn.push("test_queue", 1, @payload)
      expect($redis.lrange(Rrq::Keys.active_partitions_for("test_queue"), 0, -1)).to eq(["1"])

      2.times { $rrq_conn.pop("test_queue") }
      expect($redis.lrange(Rrq::Keys.active_partitions_for("test_queue"), 0, -1)).to eq([])
    end
  end

  describe :ack do 
    it "should remove a message from the working queue" do 
      $rrq_conn.push("test_queue", 1, @payload)
      message = $rrq_conn.pop("test_queue")

      working_key = Rrq::Keys.working_queue("test_queue", 1)

      expect($redis.llen(working_key)).to eq(1)

      $rrq_conn.ack(message)
      expect($redis.llen(working_key)).to eq(0)
    end
  end


  describe "reports" do 
    before do 
      one_hundred_items = 100.times.to_a.map { {} }

      $rrq_conn.bulk_push("test_queue", 1, one_hundred_items[0...20])
      $rrq_conn.bulk_push("test_queue", 2, one_hundred_items[0...41])
      $rrq_conn.bulk_push("test_queue", 3, one_hundred_items[0...88])
      $rrq_conn.bulk_push("another_queue", 1, one_hundred_items[0...12])
    end

    describe :report do 
      it "should return a hash containing the total count of all partitions" do 
        expect(subject.report("test_queue")[:count]).to eq(149)
        expect(subject.report("another_queue")[:count]).to eq(12)
      end

      it "should return a hash containing the count of each partitions" do 
        expect(subject.report("test_queue")[:partitions]).to eq({
          "1" => 20,
          "2" => 41,
          "3" => 88
        })

        expect(subject.report("another_queue")[:partitions]).to eq({
          "1" => 12
        })
      end
    end

    describe :all_report do 
      it "should return a hash containing the total count of all queues" do 
        expect(subject.all_reports[:count]).to eq(161)
      end

      it "should return a hash containing the count of each queue" do 
        expect(subject.all_reports[:queues]).to eq({
          "test_queue" => 149,
          "another_queue" => 12
        })
      end
    end
  end
end