Repository URL to install this package:
|
Version:
0.13.0.0 ▾
|
module EventSource
module Read
def self.included(cls)
cls.class_exec do
include Log::Dependency
cls.extend Build
cls.extend Call
cls.extend Configure
dependency :iterator, Iterator
initializer :stream_name
attr_accessor :get
abstract :configure
end
end
class Error < RuntimeError; end
module Build
def build(stream_name, position: nil, batch_size: nil, precedence: nil, delay_milliseconds: nil, timeout_milliseconds: nil, cycle: nil, session: nil)
cycle ||= Cycle.build(delay_milliseconds: delay_milliseconds, timeout_milliseconds: timeout_milliseconds)
new(stream_name).tap do |instance|
instance.configure(batch_size: batch_size, precedence: precedence, session: session)
Iterator.configure instance, instance.get, stream_name, position: position, cycle: cycle
end
end
end
module Call
def call(stream_name, position: nil, batch_size: nil, precedence: nil, delay_milliseconds: nil, timeout_milliseconds: nil, cycle: nil, session: nil, &action)
instance = build(stream_name, position: position, batch_size: batch_size, precedence: precedence, delay_milliseconds: delay_milliseconds, timeout_milliseconds: timeout_milliseconds, cycle: cycle, session: session)
instance.(&action)
end
end
module Configure
def configure(receiver, stream_name, attr_name: nil, position: nil, batch_size: nil, precedence: nil, delay_milliseconds: nil, timeout_milliseconds: nil, cycle: nil, session: nil)
attr_name ||= :reader
instance = build(stream_name, position: position, batch_size: batch_size, precedence: precedence, delay_milliseconds: delay_milliseconds, timeout_milliseconds: timeout_milliseconds, cycle: cycle, session: session)
receiver.public_send "#{attr_name}=", instance
end
end
def call(&action)
logger.trace { "Reading (Stream Name: #{stream_name})" }
if action.nil?
error_message = "Reader must be actuated with a block"
logger.error error_message
raise Error, error_message
end
enumerate_event_data(&action)
logger.info { "Reading completed (Stream Name: #{stream_name})" }
return AsyncInvocation::Incorrect
end
def enumerate_event_data(&action)
logger.trace { "Enumerating (Stream Name: #{stream_name})" }
event_data = nil
loop do
event_data = iterator.next
break if event_data.nil?
action.(event_data)
end
logger.debug { "Enumerated (Stream Name: #{stream_name})" }
end
end
end