"""
session stuff for jabber connections
"""
from twisted.internet import defer, reactor
from twisted.python import failure, log
from twisted.web import server
from twisted.names.srvconnect import SRVConnector
try:
from twisted.words.xish import domish, xmlstream
from twisted.words.protocols import jabber as jabber_protocol
except ImportError:
from twisted.xish import domish, xmlstream
import traceback
import os
import warnings
from punjab import jabber
from punjab.xmpp import ns
import time
import error
try:
from twisted.internet import ssl
except ImportError:
ssl = None
if ssl and not ssl.supported:
ssl = None
if not ssl:
log.msg("SSL ERROR: You do not have ssl support this may cause problems with tls client connections.")
class XMPPClientConnector(SRVConnector):
"""
A jabber connection to find srv records for xmpp client connections.
"""
def __init__(self, client_reactor, domain, factory):
""" Init """
if isinstance(domain, unicode):
warnings.warn(
"Domain argument to XMPPClientConnector should be bytes, "
"not unicode",
stacklevel=2)
domain = domain.encode('ascii')
SRVConnector.__init__(self, client_reactor, 'xmpp-client', domain, factory)
self.timeout = [1,3]
def pickServer(self):
"""
Pick a server and port to make the connection.
"""
host, port = SRVConnector.pickServer(self)
if port == 5223 and ssl:
context = ssl.ClientContextFactory()
context.method = ssl.SSL.SSLv23_METHOD
self.connectFuncName = 'connectSSL'
self.connectFuncArgs = (context,)
return host, port
def make_session(pint, attrs, session_type='BOSH'):
"""
pint - punjab session interface class
attrs - attributes sent from the body tag
"""
s = Session(pint, attrs)
if pint.v:
log.msg('================================== %s connect to %s:%s ==================================' % (str(time.time()),s.hostname,s.port))
connect_srv = s.connect_srv
if attrs.has_key('route'):
connect_srv = False
if s.hostname in ['localhost', '127.0.0.1']:
connect_srv = False
if not connect_srv:
reactor.connectTCP(s.hostname, s.port, s, bindAddress=pint.bindAddress)
else:
connector = XMPPClientConnector(reactor, s.hostname, s)
connector.connect()
# timeout
reactor.callLater(s.inactivity, s.checkExpired)
pint.sessions[s.sid] = s
return s, s.waiting_requests[0].deferred
class WaitingRequest(object):
"""A helper object for managing waiting requests."""
def __init__(self, deferred, delayedcall, timeout = 30, startup = False, rid = None):
""" """
self.deferred = deferred
self.delayedcall = delayedcall
self.startup = startup
self.timeout = timeout
self.wait_start = time.time()
self.rid = rid
def doCallback(self, data):
""" """
self.deferred.callback(data)
def doErrback(self, data):
""" """
self.deferred.errback(data)
class Session(jabber.JabberClientFactory, server.Session):
""" Jabber Client Session class for client XMPP connections. """
def __init__(self, pint, attrs):
"""
Initialize the session
"""
if attrs.has_key('charset'):
self.charset = str(attrs['charset'])
else:
self.charset = 'utf-8'
self.to = attrs['to']
self.port = 5222
self.inactivity = 900
if self.to != '' and self.to.find(":") != -1:
# Check if port is in the 'to' string
to, port = self.to.split(':')
if port:
self.to = to
self.port = int(port)
else:
self.port = 5222
self.sid = "".join("%02x" % ord(i) for i in os.urandom(20))
jabber.JabberClientFactory.__init__(self, self.to, pint.v)
server.Session.__init__(self, pint, self.sid)
self.pint = pint
self.attrs = attrs
self.s = None
self.elems = []
rid = int(attrs['rid'])
self.waiting_requests = []
self.use_raw = attrs.get('raw', False)
self.raw_buffer = u""
self.xmpp_node = ''
self.success = 0
self.mechanisms = []
self.xmlstream = None
self.features = None
self.session = None
self.cache_data = {}
self.verbose = self.pint.v
self.noisy = self.verbose
self.version = attrs.get('version', 0.0)
self.key = attrs.get('newkey')
self.wait = int(attrs.get('wait', 0))
self.hold = int(attrs.get('hold', 0))
self.inactivity = int(attrs.get('inactivity', 900)) # default inactivity 15 mins
if attrs.has_key('window'):
self.window = int(attrs['window'])
else:
self.window = self.hold + 2
if attrs.has_key('polling'):
self.polling = int(attrs['polling'])
else:
self.polling = 0
if attrs.has_key('port'):
self.port = int(attrs['port'])
if attrs.has_key('hostname'):
self.hostname = attrs['hostname']
else:
self.hostname = self.to
self.use_raw = getattr(pint, 'use_raw', False) # use raw buffers
self.connect_srv = getattr(pint, 'connect_srv', True)
self.secure = attrs.has_key('secure') and attrs['secure'] == 'true'
self.authenticator.useTls = self.secure
if attrs.has_key('route'):
if attrs['route'].startswith("xmpp:"):
self.route = attrs['route'][5:]
if self.route.startswith("//"):
self.route = self.route[2:]
# route format change, see http://www.xmpp.org/extensions/xep-0124.html#session-request
rhostname, rport = self.route.split(":")
self.port = int(rport)
self.hostname = rhostname
self.resource = ''
else:
raise error.Error('internal-server-error')
self.authid = 0
self.rid = rid + 1
self.connected = 0 # number of clients connected on this session
self.notifyOnExpire(self.onExpire)
self.stream_error = None
if pint.v:
log.msg('Session Created : %s %s' % (str(self.sid),str(time.time()), ))
self.stream_error_called = False
self.addBootstrap(xmlstream.STREAM_START_EVENT, self.streamStart)
self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.connectEvent)
self.addBootstrap(xmlstream.STREAM_ERROR_EVENT, self.streamError)
self.addBootstrap(xmlstream.STREAM_END_EVENT, self.connectError)
# create the first waiting request
d = defer.Deferred()
timeout = 30
rid = self.rid - 1
self.appendWaitingRequest(d, rid,
timeout=timeout,
poll=self._startup_timeout,
startup=True,
)
def rawDataIn(self, buf):
""" Log incoming data on the xmlstream """
if self.pint and self.pint.v:
try:
log.msg("SID: %s => RECV: %r" % (self.sid, buf,))
except:
log.err()
if self.use_raw and self.authid:
if type(buf) == type(''):
buf = unicode(buf, 'utf-8')
# add some raw data
self.raw_buffer = self.raw_buffer + buf
def rawDataOut(self, buf):
""" Log outgoing data on the xmlstream """
try:
log.msg("SID: %s => SEND: %r" % (self.sid, buf,))
except:
log.err()
def _wrPop(self, data, i=0):
"""Pop off a waiting requst, do callback, and cache request
"""
wr = self.waiting_requests.pop(i)
wr.doCallback(data)
self._cacheData(wr.rid, data)
def clearWaitingRequests(self, hold = 0):
"""clear number of requests given
hold - number of requests to clear, default is all
"""
while len(self.waiting_requests) > hold:
self._wrPop([])
def _wrError(self, err, i = 0):
wr = self.waiting_requests.pop(i)
wr.doErrback(err)
def appendWaitingRequest(self, d, rid, timeout=None, poll=None, startup=False):
"""append waiting request
"""
if timeout is None:
timeout = self.wait
if poll is None:
poll = self._pollTimeout
self.waiting_requests.append(
WaitingRequest(d,
poll,
timeout = timeout,
rid = rid,
startup=startup))
def returnWaitingRequests(self):
"""return a waiting request
"""
while len(self.elems) > 0 and len(self.waiting_requests) > 0:
data = self.elems
self.elems = []
self._wrPop(data)
def onExpire(self):
""" When the session expires call this. """
if 'onExpire' in dir(self.pint):
self.pint.onExpire(self.sid)
if self.verbose and not getattr(self, 'terminated', False):
log.msg('SESSION -> We have expired', self.sid, self.rid, self.waiting_requests)
self.disconnect()
def terminate(self):
"""Terminates the session."""
self.wait = 0
self.terminated = True
if self.verbose:
log.msg('SESSION -> Terminate')
# if there are any elements hanging around and waiting
# requests, send those off
self.returnWaitingRequests()
self.clearWaitingRequests()
try:
self.expire()
except:
self.onExpire()
return defer.succeed(self.elems)
def poll(self, d = None, rid = None):
"""Handles the responses to requests.
This function is called for every request except session setup
and session termination. It handles the reply portion of the
request by returning a deferred which will get called back
when there is data or when the wait timeout expires.
"""
# queue this request
if d is None:
d = defer.Deferred()
if self.pint.error:
d.addErrback(self.pint.error)
Loading ...