Repository URL to install this package:
|
Version:
2.6.1 ▾
|
require 'sneakers/queue'
require 'sneakers/support/utils'
require 'timeout'
module Sneakers
module Worker
attr_reader :queue, :id, :opts
# For now, a worker is hardly dependant on these concerns
# (because it uses methods from them directly.)
include Concerns::Logging
include Concerns::Metrics
include Sneakers::ErrorReporter
def initialize(queue = nil, pool = nil, opts = {})
opts = opts.merge(self.class.queue_opts || {})
queue_name = self.class.queue_name
opts = Sneakers::CONFIG.merge(opts)
@should_ack = opts[:ack]
@timeout_after = opts[:timeout_job_after]
@pool = pool || Concurrent::FixedThreadPool.new(opts[:threads] || Sneakers::Configuration::DEFAULTS[:threads])
@call_with_params = respond_to?(:work_with_params)
@content_type = opts[:content_type]
@queue = queue || Sneakers::Queue.new(
queue_name,
opts
)
@opts = opts
@id = Utils.make_worker_id(queue_name)
end
def ack!; :ack end
def reject!; :reject; end
def requeue!; :requeue; end
def publish(msg, opts)
to_queue = opts.delete(:to_queue)
opts[:routing_key] ||= to_queue
return unless opts[:routing_key]
@queue.exchange.publish(Sneakers::ContentType.serialize(msg, opts[:content_type]), opts)
end
def do_work(delivery_info, metadata, msg, handler)
worker_trace "Working off: #{msg.inspect}"
puts "SNEAKERS LOG: Thread pool stats\nIdle Time Before Thread Reclaim: #{@pool.idletime}\n" +
"Number of Threads: #{@pool.length}\nLargest Number of Threads Since Start-up: #{@pool.largest_length}\n" +
"Fallback Policy: #{@pool.fallback_policy}\nNumber of Tasks in Thread Pool Queue: #{@pool.queue_length}\n" +
"Remaining Queue Capacity: #{@pool.remaining_capacity}"
puts "SNEAKERS LOG: Worker received message for worker #{self.class}, about to request thread from thread pool"
@pool.post do
puts "SNEAKERS LOG: Started work on thread from thread pool"
res = nil
error = nil
begin
metrics.increment("work.#{self.class.name}.started")
Timeout.timeout(@timeout_after, WorkerTimeout) do
puts "SNEAKERS LOG: Inside Timeout block"
metrics.timing("work.#{self.class.name}.time") do
puts "SNEAKERS LOG: Inside metrics timing block"
deserialized_msg = ContentType.deserialize(msg, @content_type || metadata && metadata[:content_type])
if @call_with_params
puts "SNEAKERS LOG: calling work_with_params"
res = work_with_params(deserialized_msg, delivery_info, metadata)
else
puts "SNEAKERS LOG: calling work"
res = work(deserialized_msg)
end
end
end
rescue WorkerTimeout => ex
puts "SNEAKERS LOG: WorkerTimeout Error"
res = :timeout
worker_error(ex, log_msg: log_msg(msg), class: self.class.name,
message: msg, delivery_info: delivery_info, metadata: metadata)
rescue => ex
puts "SNEAKERS LOG: #{ex.class} Error"
res = :error
error = ex
worker_error(ex, log_msg: log_msg(msg), class: self.class.name,
message: msg, delivery_info: delivery_info, metadata: metadata)
end
if @should_ack
puts "SNEAKERS LOG: inside should ack conditional, response is #{res}"
if res == :ack
# note to future-self. never acknowledge multiple (multiple=true) messages under threads.
handler.acknowledge(delivery_info, metadata, msg)
elsif res == :timeout
handler.timeout(delivery_info, metadata, msg)
elsif res == :error
handler.error(delivery_info, metadata, msg, error)
elsif res == :reject
handler.reject(delivery_info, metadata, msg)
elsif res == :requeue
handler.reject(delivery_info, metadata, msg, true)
else
handler.noop(delivery_info, metadata, msg)
end
metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}")
end
metrics.increment("work.#{self.class.name}.ended")
puts "SNEAKERS LOG: made it to the end of job"
end #post
end
def stop
worker_trace "Stopping worker: unsubscribing."
@queue.unsubscribe
worker_trace "Stopping worker: shutting down thread pool."
@pool.shutdown
@pool.wait_for_termination
worker_trace "Stopping worker: I'm gone."
end
def run
worker_trace "New worker: subscribing."
@queue.subscribe(self)
worker_trace "New worker: I'm alive."
end
# Construct a log message with some standard prefix for this worker
def log_msg(msg)
"[#{@id}][#{Thread.current}][#{@queue.name}][#{@queue.opts}] #{msg}"
end
def worker_trace(msg)
logger.debug(log_msg(msg))
end
Classes = []
def self.included(base)
base.extend ClassMethods
Classes << base if base.is_a? Class
end
module ClassMethods
attr_reader :queue_opts
attr_reader :queue_name
def from_queue(q, opts={})
@queue_name = q.to_s
@queue_opts = opts
end
def enqueue(msg, opts={})
opts[:routing_key] ||= @queue_opts[:routing_key]
opts[:content_type] ||= @queue_opts[:content_type]
opts[:to_queue] ||= @queue_name
publisher.publish(msg, opts)
end
private
def publisher
@publisher ||= Sneakers::Publisher.new(queue_opts)
end
end
end
end