# =XMPP4R - XMPP Library for Ruby
# License:: Ruby's license (see the LICENSE file) or GNU GPL, at your option.
# Website::http://home.gna.org/xmpp4r/
require 'xmpp4r/callbacks'
require 'socket'
require 'thread'
require 'xmpp4r/semaphore'
require 'xmpp4r/streamparser'
require 'xmpp4r/presence'
require 'xmpp4r/message'
require 'xmpp4r/iq'
require 'xmpp4r/debuglog'
require 'xmpp4r/idgenerator'
module Jabber
##
# The stream class manages a connection stream (a file descriptor using which
# XML messages are read and sent)
#
# You may register callbacks for the three Jabber stanzas
# (message, presence and iq) and use the send and send_with_id
# methods.
#
# To ensure the order of received stanzas, callback blocks are
# launched in the parser thread. If further blocking operations
# are intended in those callbacks, run your own thread there.
class Stream
DISCONNECTED = 1
CONNECTED = 2
# file descriptor used
attr_reader :fd
# connection status
attr_reader :status
# number of stanzas currently being processed
attr_reader :processing
##
# Initialize a new stream
def initialize
@fd = nil
@status = DISCONNECTED
@xmlcbs = CallbackList.new
@stanzacbs = CallbackList.new
@messagecbs = CallbackList.new
@iqcbs = CallbackList.new
@presencecbs = CallbackList.new
@send_lock = Mutex.new
@last_send = Time.now
@exception_block = nil
@tbcbmutex = Mutex.new
@threadblocks = []
@wakeup_thread = nil
@streamid = nil
@streamns = 'jabber:client'
@features_sem = Semaphore.new
@parser_thread = nil
@processing = 0
end
##
# Start the XML parser on the fd
def start(fd)
@stream_mechanisms = []
@stream_features = {}
@fd = fd
@parser = StreamParser.new(@fd, self)
@parser_thread = Thread.new do
Thread.current.abort_on_exception = true
begin
@parser.parse
Jabber::debuglog("DISCONNECTED\n")
if @exception_block
Thread.new { close!; @exception_block.call(nil, self, :disconnected) }
else
close!
end
rescue Exception => e
Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
if @exception_block
Thread.new do
Thread.current.abort_on_exception = true
close
@exception_block.call(e, self, :start)
end
else
Jabber::warnlog "Exception caught in Parser thread! (#{e.class})\n#{e.backtrace.join("\n")}"
close!
raise
end
end
end
@status = CONNECTED
end
def stop
@parser_thread.kill
@parser = nil
end
##
# Mounts a block to handle exceptions if they occur during the
# poll send. This will likely be the first indication that
# the socket dropped in a Jabber Session.
#
# The block has to take three arguments:
# * the Exception
# * the Jabber::Stream object (self)
# * a symbol where it happened, namely :start, :parser, :sending and :end
def on_exception(&block)
@exception_block = block
end
##
# This method is called by the parser when a failure occurs
def parse_failure(e)
Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")
# A new thread has to be created because close will cause the thread
# to commit suicide(???)
if @exception_block
# New thread, because close will kill the current thread
Thread.new do
Thread.current.abort_on_exception = true
close
@exception_block.call(e, self, :parser)
end
else
Jabber::warnlog "Stream#parse_failure was called by XML parser. Dumping " +
"backtrace...\n" + e.exception + "\n#{e.backtrace.join("\n")}"
close
raise
end
end
##
# This method is called by the parser upon receiving <tt></stream:stream></tt>
def parser_end
if @exception_block
Thread.new do
Thread.current.abort_on_exception = true
close
@exception_block.call(nil, self, :close)
end
else
close
end
end
##
# Returns if this connection is connected to a Jabber service
# return:: [Boolean] Connection status
def is_connected?
return @status == CONNECTED
end
##
# Returns if this connection is NOT connected to a Jabber service
#
# return:: [Boolean] Connection status
def is_disconnected?
return @status == DISCONNECTED
end
##
# Processes a received REXML::Element and executes
# registered thread blocks and filters against it.
#
# element:: [REXML::Element] The received element
def receive(element)
@tbcbmutex.synchronize { @processing += 1 }
Jabber::debuglog("RECEIVED:\n#{element.to_s}")
if element.namespace('').to_s == '' # REXML namespaces are always strings
element.add_namespace(@streamns)
end
case element.prefix
when 'stream'
case element.name
when 'stream'
stanza = element
@streamid = element.attributes['id']
@streamns = element.namespace('') if element.namespace('')
# Hack: component streams are basically client streams.
# Someday we may want to create special stanza classes
# for components/s2s deriving from normal stanzas but
# posessing these namespaces
@streamns = 'jabber:client' if @streamns == 'jabber:component:accept'
unless element.attributes['version'] # isn't XMPP compliant, so
Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features")
@features_sem.run # don't wait for <stream:features/>
end
when 'features'
stanza = element
element.each { |e|
if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl'
e.each_element('mechanism') { |mech|
@stream_mechanisms.push(mech.text)
}
else
@stream_features[e.name] = e.namespace
end
}
Jabber::debuglog("FEATURES: received")
@features_sem.run
else
stanza = element
end
else
# Any stanza, classes are registered by XMPPElement::name_xmlns
begin
stanza = XMPPStanza::import(element)
rescue NoNameXmlnsRegistered
stanza = element
end
end
if @xmlcbs.process(stanza)
@tbcbmutex.synchronize { @processing -= 1 }
return true
end
# Iterate through blocked threads (= waiting for an answer)
#
# We're dup'ping the @threadblocks here, so that we won't end up in an
# endless loop if Stream#send is being nested. That means, the nested
# threadblock won't receive the stanza currently processed, but the next
# one.
threadblocks = nil
@tbcbmutex.synchronize do
threadblocks = @threadblocks.dup
end
threadblocks.each { |threadblock|
exception = nil
r = false
begin
r = threadblock.call(stanza)
rescue Exception => e
exception = e
end
if r == true
@tbcbmutex.synchronize do
@threadblocks.delete(threadblock)
end
threadblock.wakeup
@tbcbmutex.synchronize { @processing -= 1 }
return true
elsif exception
@tbcbmutex.synchronize do
@threadblocks.delete(threadblock)
end
threadblock.raise(exception)
end
}
Jabber::debuglog("PROCESSING:\n#{stanza.to_s} (#{stanza.class})")
Jabber::debuglog("TRYING stanzacbs...")
if @stanzacbs.process(stanza)
@tbcbmutex.synchronize { @processing -= 1 }
return true
end
r = false
Jabber::debuglog("TRYING message/iq/presence/cbs...")
case stanza
when Message
r = @messagecbs.process(stanza)
when Iq
r = @iqcbs.process(stanza)
when Presence
r = @presencecbs.process(stanza)
end
@tbcbmutex.synchronize { @processing -= 1 }
return r
end
##
# Get the list of iq callbacks.
def iq_callbacks
@iqcbs
end
##
# Get the list of message callbacks.
def message_callbacks
@messagecbs
end
##
# Get the list of presence callbacks.
def presence_callbacks
@presencecbs
end
##
# Get the list of stanza callbacks.
def stanza_callbacks
@stanzacbs
end
##
# Get the list of xml callbacks.
def xml_callbacks
@xmlcbs
end
##
# This is used by Jabber::Stream internally to
# keep track of any blocks which were passed to
# Stream#send.
class ThreadBlock
def initialize(block)
@block = block
@waiter = Semaphore.new
@exception = nil
end
def call(*args)
@block.call(*args)
end
def wait
@waiter.wait
raise @exception if @exception
end
def wakeup
@waiter.run
end
def raise(exception)
@exception = exception
@waiter.run
end
end
def send_data(data)
@send_lock.synchronize do
@last_send = Time.now
Loading ...