Repository URL to install this package:
|
Version:
2.6.1 ▾
|
module Sneakers
class Publisher
def initialize(opts = {})
@mutex = Mutex.new
@opts = Sneakers::CONFIG.merge(opts)
end
def publish(msg, options = {})
@mutex.synchronize do
ensure_connection! unless connected?
end
to_queue = options.delete(:to_queue)
options[:routing_key] ||= to_queue
Sneakers.logger.info {"publishing <#{msg}> to [#{options[:routing_key]}]"}
@exchange.publish(ContentType.serialize(msg, options[:content_type]), options)
end
attr_reader :exchange
private
def ensure_connection!
# If we've already got a bunny object, use it. This allows people to
# specify all kinds of options we don't need to know about (e.g. for ssl).
@bunny = @opts[:connection]
@bunny ||= create_bunny_connection
@bunny.start
@channel = @bunny.create_channel
@exchange = @channel.exchange(@opts[:exchange], @opts[:exchange_options])
end
def connected?
@bunny && @bunny.connected?
end
def create_bunny_connection
Bunny.new(@opts[:amqp], :vhost => @opts[:vhost],
:heartbeat => @opts[:heartbeat],
:properties => @opts.fetch(:properties, {}),
:logger => Sneakers::logger)
end
end
end