Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

vistahigherlearning / logstash   deb

Repository URL to install this package:

/ opt / logstash / vendor / bundle / jruby / 1.9 / gems / xmpp4r-0.5 / lib / xmpp4r / stream.rb

# =XMPP4R - XMPP Library for Ruby
# License:: Ruby's license (see the LICENSE file) or GNU GPL, at your option.
# Website::http://home.gna.org/xmpp4r/

require 'xmpp4r/callbacks'
require 'socket'
require 'thread'
require 'xmpp4r/semaphore'
require 'xmpp4r/streamparser'
require 'xmpp4r/presence'
require 'xmpp4r/message'
require 'xmpp4r/iq'
require 'xmpp4r/debuglog'
require 'xmpp4r/idgenerator'

module Jabber
  ##
  # The stream class manages a connection stream (a file descriptor using which
  # XML messages are read and sent)
  #
  # You may register callbacks for the three Jabber stanzas
  # (message, presence and iq) and use the send and send_with_id
  # methods.
  #
  # To ensure the order of received stanzas, callback blocks are
  # launched in the parser thread. If further blocking operations
  # are intended in those callbacks, run your own thread there.
  class Stream
    DISCONNECTED = 1
    CONNECTED = 2

    # file descriptor used
    attr_reader :fd

    # connection status
    attr_reader :status

    # number of stanzas currently being processed
    attr_reader :processing

    ##
    # Initialize a new stream
    def initialize
      @fd = nil
      @status = DISCONNECTED
      @xmlcbs = CallbackList.new
      @stanzacbs = CallbackList.new
      @messagecbs = CallbackList.new
      @iqcbs = CallbackList.new
      @presencecbs = CallbackList.new
      @send_lock = Mutex.new
      @last_send = Time.now
      @exception_block = nil
      @tbcbmutex = Mutex.new
      @threadblocks = []
      @wakeup_thread = nil
      @streamid = nil
      @streamns = 'jabber:client'
      @features_sem = Semaphore.new
      @parser_thread = nil
      @processing = 0
    end

    ##
    # Start the XML parser on the fd
    def start(fd)
      @stream_mechanisms = []
      @stream_features = {}

      @fd = fd
      @parser = StreamParser.new(@fd, self)
      @parser_thread = Thread.new do
        Thread.current.abort_on_exception = true
        begin
          @parser.parse
          Jabber::debuglog("DISCONNECTED\n")

          if @exception_block
            Thread.new { close!; @exception_block.call(nil, self, :disconnected) }
          else
            close!
          end
        rescue Exception => e
          Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

          if @exception_block
            Thread.new do
              Thread.current.abort_on_exception = true
              close
              @exception_block.call(e, self, :start)
            end
          else
            Jabber::warnlog "Exception caught in Parser thread! (#{e.class})\n#{e.backtrace.join("\n")}"
            close!
            raise
          end
        end
      end

      @status = CONNECTED
    end

    def stop
      @parser_thread.kill
      @parser = nil
    end

    ##
    # Mounts a block to handle exceptions if they occur during the
    # poll send.  This will likely be the first indication that
    # the socket dropped in a Jabber Session.
    #
    # The block has to take three arguments:
    # * the Exception
    # * the Jabber::Stream object (self)
    # * a symbol where it happened, namely :start, :parser, :sending and :end
    def on_exception(&block)
      @exception_block = block
    end

    ##
    # This method is called by the parser when a failure occurs
    def parse_failure(e)
      Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

      # A new thread has to be created because close will cause the thread
      # to commit suicide(???)
      if @exception_block
        # New thread, because close will kill the current thread
        Thread.new do
          Thread.current.abort_on_exception = true
          close
          @exception_block.call(e, self, :parser)
        end
      else
        Jabber::warnlog "Stream#parse_failure was called by XML parser. Dumping " +
          "backtrace...\n" + e.exception + "\n#{e.backtrace.join("\n")}"
        close
        raise
      end
    end

    ##
    # This method is called by the parser upon receiving <tt></stream:stream></tt>
    def parser_end
      if @exception_block
        Thread.new do
          Thread.current.abort_on_exception = true
          close
          @exception_block.call(nil, self, :close)
        end
      else
        close
      end
    end

    ##
    # Returns if this connection is connected to a Jabber service
    # return:: [Boolean] Connection status
    def is_connected?
      return @status == CONNECTED
    end

    ##
    # Returns if this connection is NOT connected to a Jabber service
    #
    # return:: [Boolean] Connection status
    def is_disconnected?
      return @status == DISCONNECTED
    end

    ##
    # Processes a received REXML::Element and executes
    # registered thread blocks and filters against it.
    #
    # element:: [REXML::Element] The received element
    def receive(element)
      @tbcbmutex.synchronize { @processing += 1 }
      Jabber::debuglog("RECEIVED:\n#{element.to_s}")

      if element.namespace('').to_s == '' # REXML namespaces are always strings
        element.add_namespace(@streamns)
      end

      case element.prefix
      when 'stream'
        case element.name
          when 'stream'
            stanza = element
            @streamid = element.attributes['id']
            @streamns = element.namespace('') if element.namespace('')

            # Hack: component streams are basically client streams.
            # Someday we may want to create special stanza classes
            # for components/s2s deriving from normal stanzas but
            # posessing these namespaces
            @streamns = 'jabber:client' if @streamns == 'jabber:component:accept'

            unless element.attributes['version']  # isn't XMPP compliant, so
              Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features")
              @features_sem.run                   # don't wait for <stream:features/>
            end
          when 'features'
            stanza = element
            element.each { |e|
              if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl'
                e.each_element('mechanism') { |mech|
                  @stream_mechanisms.push(mech.text)
                }
              else
                @stream_features[e.name] = e.namespace
              end
            }
            Jabber::debuglog("FEATURES: received")
            @features_sem.run
          else
            stanza = element
        end
      else
        # Any stanza, classes are registered by XMPPElement::name_xmlns
        begin
          stanza = XMPPStanza::import(element)
        rescue NoNameXmlnsRegistered
          stanza = element
        end
      end

      if @xmlcbs.process(stanza)
        @tbcbmutex.synchronize { @processing -= 1 }
        return true
      end

      # Iterate through blocked threads (= waiting for an answer)
      #
      # We're dup'ping the @threadblocks here, so that we won't end up in an
      # endless loop if Stream#send is being nested. That means, the nested
      # threadblock won't receive the stanza currently processed, but the next
      # one.
      threadblocks = nil
      @tbcbmutex.synchronize do
        threadblocks = @threadblocks.dup
      end
      threadblocks.each { |threadblock|
        exception = nil
        r = false
        begin
          r = threadblock.call(stanza)
        rescue Exception => e
          exception = e
        end

        if r == true
          @tbcbmutex.synchronize do
            @threadblocks.delete(threadblock)
          end
          threadblock.wakeup
          @tbcbmutex.synchronize { @processing -= 1 }
          return true
        elsif exception
          @tbcbmutex.synchronize do
            @threadblocks.delete(threadblock)
          end
          threadblock.raise(exception)
        end
      }

      Jabber::debuglog("PROCESSING:\n#{stanza.to_s} (#{stanza.class})")
      Jabber::debuglog("TRYING stanzacbs...")
      if @stanzacbs.process(stanza)
          @tbcbmutex.synchronize { @processing -= 1 }
          return true
      end
      r = false
      Jabber::debuglog("TRYING message/iq/presence/cbs...")
      case stanza
      when Message
        r = @messagecbs.process(stanza)
      when Iq
        r = @iqcbs.process(stanza)
      when Presence
        r = @presencecbs.process(stanza)
      end
      @tbcbmutex.synchronize { @processing -= 1 }
      return r
    end

    ##
    # Get the list of iq callbacks.
    def iq_callbacks
      @iqcbs
    end

    ##
    # Get the list of message callbacks.
    def message_callbacks
      @messagecbs
    end

    ##
    # Get the list of presence callbacks.
    def presence_callbacks
      @presencecbs
    end

    ##
    # Get the list of stanza callbacks.
    def stanza_callbacks
      @stanzacbs
    end

    ##
    # Get the list of xml callbacks.
    def xml_callbacks
      @xmlcbs
    end

    ##
    # This is used by Jabber::Stream internally to
    # keep track of any blocks which were passed to
    # Stream#send.
    class ThreadBlock
      def initialize(block)
        @block = block
        @waiter = Semaphore.new
        @exception = nil
      end
      def call(*args)
        @block.call(*args)
      end
      def wait
        @waiter.wait
        raise @exception if @exception
      end
      def wakeup
        @waiter.run
      end
      def raise(exception)
        @exception = exception
        @waiter.run
      end
    end

    def send_data(data)
      @send_lock.synchronize do
        @last_send = Time.now
Loading ...