Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
e / lib / e-core / instance / stream.rb
Size: Mime:
class E

  def stream keep_open = false, &proc
    streamer EStream::Generic, keep_open, &proc
  end

  def chunked_stream keep_open = true, &proc
    transfer_encoding 'chunked'
    streamer EStream::Chunked, keep_open, &proc
  end
  alias chunk_stream chunked_stream

  def evented_stream keep_open = true, &proc
    content_type EConstants::CONTENT_TYPE__EVENT_STREAM
    streamer EStream::EventSource, keep_open, &proc
  end
  alias event_stream evented_stream

  def websocket?
    # on websocket requests, Reel web-server storing the socket into ENV['rack.websocket']
    # TODO: implement rack.hijack API
    env[EConstants::RACK__WEBSOCKET]
  end

  private
  # Allows to start sending data to the client even though later parts of
  # the response body have not yet been generated.
  #
  # The close parameter specifies whether Stream#close should be called
  # after the block has been executed. This is only relevant for evented
  # servers like Thin or Rainbows.
  def streamer streamer, keep_open = false, &proc
    scheduler = env['async.callback'] ? EventMachine : EStream::Generic
    actual_params = EUtils.indifferent_params(params)
    response.body = streamer.new(scheduler, keep_open) do |out|
      with_params actual_params do
        response = catch(:__e__catch__response__) do
          begin
            yield(out)
          rescue => e
            # if a error handler defined, use it
            if handler = error_handler_defined?(EConstants::STATUS__SERVER_ERROR)
              meth, arity = handler
              arity > 0 ? self.send(meth, e) : self.send(meth)
            else
              # otherwise raise rescued exception
              raise e
            end
          end
        end
        if response.is_a?(EResponse)
          out << response.body.join
        end
      end
    end
  end

  def with_params(temp_params)
    original, @__e__params = @__e__params, temp_params
    yield
  ensure
    @__e__params = original if original
  end
end

# Class of the response body in case you use #stream.
#
# Three things really matter: The front and back block (back being the
# block generating content, front the one sending it to the client) and
# the scheduler, integrating with whatever concurrency feature the Rack
# handler is using.
#
# Scheduler has to respond to defer and schedule.
class EStream
  class Generic # kindly borrowed from Sinatra
    def self.schedule(*) yield end
    def self.defer(*)    yield end

    def initialize(scheduler = self.class, keep_open = false, &back)
      @back, @scheduler, @keep_open = back.to_proc, scheduler, keep_open
      @callbacks, @closed = [], false
    end

    def close
      return if @closed
      @closed = true
      @scheduler.schedule { @callbacks.each { |c| c.call }}
    end

    def each(&front)
      @front = front
      @scheduler.defer do
        begin
          @back.call(self)
        rescue Exception => e
          @scheduler.schedule { raise e }
        end
        close unless @keep_open
      end
    end

    def send data
      @scheduler.schedule { @front.call(data.to_s) }
      self
    end
    alias << send

    def callback(&block)
      return yield if @closed
      @callbacks << block
    end
    alias errback callback

    def closed?
      @closed
    end
  end

  class Chunked < Generic
    def send data
      data = data.to_s.chomp + "\n" # ensure data ends in a new line
      size = data.bytesize.to_s(16)
      super(size + "\r\n" + data + "\r\n")
    end
    alias << send

    def close
      @scheduler.schedule { @front.call("0\r\n\r\n") } unless closed?
      super
    end
  end

  class EventSource < Generic
    @@directives = [:data, :event, :id, :retry].freeze

    # sending data and/or EventSource-related directives.
    # if first argument is a String it will be sent as data.
    # if a Hash given as first or second argument it will be treated as directives.
    #
    # @example sending only data
    #   event_stream :keep_open do |out|
    #     out.send 'chunk one'
    #     out.send 'chunk two'
    #     out.send 'etc.'
    #   end
    #
    # @example sending data and directives
    #   event_stream :keep_open do |out|
    #     out.send 'chunk one', id: 'some-id', retry: 10
    #     out.send 'chunk two'
    #     out.send 'etc.'
    #   end
    #
    # @example sending only :event directive
    #   event_stream :keep_open do |out|
    #     out.send event: 'news'
    #   end
    #
    def send data = nil, directives = {}
      data.is_a?(Hash) && (directives = data) && (data = nil)
      directives.each_pair do |directive_name, directive_data|
        @@directives.include?(directive_name) || next
        # pre-data directives expects a single \n at the end
        super(directive_name.to_s + ": " + normalize(directive_data))
      end
      return unless data
      # - any single message should not contain \n except at the end.
      # - EventSource expects \n\n at the end of each single message.
      super("data: %s\n" % normalize(data))
    end
    alias << send

    private
    def normalize smth
      smth.to_s.gsub(/\n|\r/, '') << "\n"
    end
  end
end