Repository URL to install this package:
|
Version:
0.9.2 ▾
|
class E
def stream keep_open = false, &proc
streamer EStream::Generic, keep_open, &proc
end
def chunked_stream keep_open = true, &proc
transfer_encoding 'chunked'
streamer EStream::Chunked, keep_open, &proc
end
alias chunk_stream chunked_stream
def evented_stream keep_open = true, &proc
content_type EConstants::CONTENT_TYPE__EVENT_STREAM
streamer EStream::EventSource, keep_open, &proc
end
alias event_stream evented_stream
def websocket?
# on websocket requests, Reel web-server storing the socket into ENV['rack.websocket']
# TODO: implement rack.hijack API
env[EConstants::RACK__WEBSOCKET]
end
private
# Allows to start sending data to the client even though later parts of
# the response body have not yet been generated.
#
# The close parameter specifies whether Stream#close should be called
# after the block has been executed. This is only relevant for evented
# servers like Thin or Rainbows.
def streamer streamer, keep_open = false, &proc
scheduler = env['async.callback'] ? EventMachine : EStream::Generic
actual_params = EUtils.indifferent_params(params)
response.body = streamer.new(scheduler, keep_open) do |out|
with_params actual_params do
response = catch(:__e__catch__response__) do
begin
yield(out)
rescue => e
# if a error handler defined, use it
if handler = error_handler_defined?(EConstants::STATUS__SERVER_ERROR)
meth, arity = handler
arity > 0 ? self.send(meth, e) : self.send(meth)
else
# otherwise raise rescued exception
raise e
end
end
end
if response.is_a?(EResponse)
out << response.body.join
end
end
end
end
def with_params(temp_params)
original, @__e__params = @__e__params, temp_params
yield
ensure
@__e__params = original if original
end
end
# Class of the response body in case you use #stream.
#
# Three things really matter: The front and back block (back being the
# block generating content, front the one sending it to the client) and
# the scheduler, integrating with whatever concurrency feature the Rack
# handler is using.
#
# Scheduler has to respond to defer and schedule.
class EStream
class Generic # kindly borrowed from Sinatra
def self.schedule(*) yield end
def self.defer(*) yield end
def initialize(scheduler = self.class, keep_open = false, &back)
@back, @scheduler, @keep_open = back.to_proc, scheduler, keep_open
@callbacks, @closed = [], false
end
def close
return if @closed
@closed = true
@scheduler.schedule { @callbacks.each { |c| c.call }}
end
def each(&front)
@front = front
@scheduler.defer do
begin
@back.call(self)
rescue Exception => e
@scheduler.schedule { raise e }
end
close unless @keep_open
end
end
def send data
@scheduler.schedule { @front.call(data.to_s) }
self
end
alias << send
def callback(&block)
return yield if @closed
@callbacks << block
end
alias errback callback
def closed?
@closed
end
end
class Chunked < Generic
def send data
data = data.to_s.chomp + "\n" # ensure data ends in a new line
size = data.bytesize.to_s(16)
super(size + "\r\n" + data + "\r\n")
end
alias << send
def close
@scheduler.schedule { @front.call("0\r\n\r\n") } unless closed?
super
end
end
class EventSource < Generic
@@directives = [:data, :event, :id, :retry].freeze
# sending data and/or EventSource-related directives.
# if first argument is a String it will be sent as data.
# if a Hash given as first or second argument it will be treated as directives.
#
# @example sending only data
# event_stream :keep_open do |out|
# out.send 'chunk one'
# out.send 'chunk two'
# out.send 'etc.'
# end
#
# @example sending data and directives
# event_stream :keep_open do |out|
# out.send 'chunk one', id: 'some-id', retry: 10
# out.send 'chunk two'
# out.send 'etc.'
# end
#
# @example sending only :event directive
# event_stream :keep_open do |out|
# out.send event: 'news'
# end
#
def send data = nil, directives = {}
data.is_a?(Hash) && (directives = data) && (data = nil)
directives.each_pair do |directive_name, directive_data|
@@directives.include?(directive_name) || next
# pre-data directives expects a single \n at the end
super(directive_name.to_s + ": " + normalize(directive_data))
end
return unless data
# - any single message should not contain \n except at the end.
# - EventSource expects \n\n at the end of each single message.
super("data: %s\n" % normalize(data))
end
alias << send
private
def normalize smth
smth.to_s.gsub(/\n|\r/, '') << "\n"
end
end
end