Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

vistahigherlearning / punjab   python

Repository URL to install this package:

/ punjab / session.py

"""
 session stuff for jabber connections

"""
from twisted.internet import defer,  reactor
from twisted.python import failure, log
from twisted.web import server
from twisted.names.srvconnect import SRVConnector

try:
    from twisted.words.xish import domish, xmlstream
    from twisted.words.protocols import jabber as jabber_protocol
except ImportError:
    from twisted.xish import domish, xmlstream


import traceback
import os
import warnings
from punjab import jabber
from punjab.xmpp import ns

import time
import error

try:
    from twisted.internet import ssl
except ImportError:
    ssl = None
if ssl and not ssl.supported:
    ssl = None
if not ssl:
    log.msg("SSL ERROR: You do not have ssl support this may cause problems with tls client connections.")



class XMPPClientConnector(SRVConnector):
    """
    A jabber connection to find srv records for xmpp client connections.
    """
    def __init__(self, client_reactor, domain, factory):
        """ Init """
        if isinstance(domain, unicode):
            warnings.warn(
                "Domain argument to XMPPClientConnector should be bytes, "
                "not unicode",
                stacklevel=2)
            domain = domain.encode('ascii')
        SRVConnector.__init__(self, client_reactor, 'xmpp-client', domain, factory)
        self.timeout = [1,3]

    def pickServer(self):
        """
        Pick a server and port to make the connection.
        """
        host, port = SRVConnector.pickServer(self)

        if port == 5223 and ssl:
            context = ssl.ClientContextFactory()
            context.method = ssl.SSL.SSLv23_METHOD

            self.connectFuncName = 'connectSSL'
            self.connectFuncArgs = (context,)
        return host, port

def make_session(pint, attrs, session_type='BOSH'):
    """
    pint  - punjab session interface class
    attrs - attributes sent from the body tag
    """

    s    = Session(pint, attrs)

    if pint.v:
        log.msg('================================== %s connect to %s:%s ==================================' % (str(time.time()),s.hostname,s.port))

    connect_srv = s.connect_srv
    if attrs.has_key('route'):
        connect_srv = False
    if s.hostname in ['localhost', '127.0.0.1']:
        connect_srv = False
    if not connect_srv:
        reactor.connectTCP(s.hostname, s.port, s, bindAddress=pint.bindAddress)
    else:
        connector = XMPPClientConnector(reactor, s.hostname, s)
        connector.connect()
    # timeout
    reactor.callLater(s.inactivity, s.checkExpired)

    pint.sessions[s.sid] = s

    return s, s.waiting_requests[0].deferred


class WaitingRequest(object):
    """A helper object for managing waiting requests."""

    def __init__(self, deferred, delayedcall, timeout = 30, startup = False, rid = None):
        """ """
        self.deferred    = deferred
        self.delayedcall = delayedcall
        self.startup     = startup
        self.timeout     = timeout
        self.wait_start  = time.time()
        self.rid         = rid

    def doCallback(self, data):
        """ """
        self.deferred.callback(data)

    def doErrback(self, data):
        """ """
        self.deferred.errback(data)


class Session(jabber.JabberClientFactory, server.Session):
    """ Jabber Client Session class for client XMPP connections. """
    def __init__(self, pint, attrs):
        """
        Initialize the session
        """
        if attrs.has_key('charset'):
            self.charset = str(attrs['charset'])
        else:
            self.charset = 'utf-8'

        self.to    = attrs['to']
        self.port  = 5222
        self.inactivity = 900
        if self.to != '' and self.to.find(":") != -1:
            # Check if port is in the 'to' string
            to, port = self.to.split(':')

            if port:
                self.to   = to
                self.port = int(port)
            else:
                self.port = 5222

        self.sid = "".join("%02x" % ord(i) for i in os.urandom(20))

        jabber.JabberClientFactory.__init__(self, self.to, pint.v)
        server.Session.__init__(self, pint, self.sid)
        self.pint  = pint

        self.attrs = attrs
        self.s     = None

        self.elems = []
        rid        = int(attrs['rid'])

        self.waiting_requests = []
        self.use_raw = attrs.get('raw', False)

        self.raw_buffer = u""
        self.xmpp_node  = ''
        self.success    = 0
        self.mechanisms = []
        self.xmlstream  = None
        self.features   = None
        self.session    = None

        self.cache_data = {}
        self.verbose    = self.pint.v
        self.noisy      = self.verbose

        self.version = attrs.get('version', 0.0)

        self.key = attrs.get('newkey')

        self.wait  = int(attrs.get('wait', 0))

        self.hold  = int(attrs.get('hold', 0))
        self.inactivity = int(attrs.get('inactivity', 900)) # default inactivity 15 mins

        if attrs.has_key('window'):
            self.window  = int(attrs['window'])
        else:
            self.window  = self.hold + 2

        if attrs.has_key('polling'):
            self.polling  = int(attrs['polling'])
        else:
            self.polling  = 0

        if attrs.has_key('port'):
            self.port = int(attrs['port'])

        if attrs.has_key('hostname'):
            self.hostname = attrs['hostname']
        else:
            self.hostname = self.to

        self.use_raw = getattr(pint, 'use_raw', False) # use raw buffers

        self.connect_srv = getattr(pint, 'connect_srv', True)

        self.secure = attrs.has_key('secure') and attrs['secure'] == 'true'
        self.authenticator.useTls = self.secure

        if attrs.has_key('route'):
            if attrs['route'].startswith("xmpp:"):
                self.route = attrs['route'][5:]
                if self.route.startswith("//"):
                    self.route = self.route[2:]

                # route format change, see http://www.xmpp.org/extensions/xep-0124.html#session-request
                rhostname, rport = self.route.split(":")
                self.port = int(rport)
                self.hostname = rhostname
                self.resource = ''
            else:
                raise error.Error('internal-server-error')


        self.authid      = 0
        self.rid         = rid + 1
        self.connected   = 0 # number of clients connected on this session

        self.notifyOnExpire(self.onExpire)
        self.stream_error = None
        if pint.v:
            log.msg('Session Created : %s %s' % (str(self.sid),str(time.time()), ))
        self.stream_error_called = False
        self.addBootstrap(xmlstream.STREAM_START_EVENT, self.streamStart)
        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.connectEvent)
        self.addBootstrap(xmlstream.STREAM_ERROR_EVENT, self.streamError)
        self.addBootstrap(xmlstream.STREAM_END_EVENT, self.connectError)

        # create the first waiting request
        d = defer.Deferred()
        timeout = 30
        rid = self.rid - 1
        self.appendWaitingRequest(d, rid,
                                  timeout=timeout,
                                  poll=self._startup_timeout,
                                  startup=True,
                                  )

    def rawDataIn(self, buf):
        """ Log incoming data on the xmlstream """
        if self.pint and self.pint.v:
            try:
                log.msg("SID: %s => RECV: %r" % (self.sid, buf,))
            except:
                log.err()
        if self.use_raw and self.authid:
            if type(buf) == type(''):
                buf = unicode(buf, 'utf-8')
            # add some raw data
            self.raw_buffer = self.raw_buffer + buf


    def rawDataOut(self, buf):
        """ Log outgoing data on the xmlstream """
        try:
            log.msg("SID: %s => SEND: %r" % (self.sid, buf,))
        except:
            log.err()

    def _wrPop(self, data, i=0):
        """Pop off a waiting requst, do callback, and cache request
        """
        wr = self.waiting_requests.pop(i)
        wr.doCallback(data)
        self._cacheData(wr.rid, data)

    def clearWaitingRequests(self, hold = 0):
        """clear number of requests given

           hold - number of requests to clear, default is all
        """
        while len(self.waiting_requests) > hold:
            self._wrPop([])

    def _wrError(self, err, i = 0):
        wr = self.waiting_requests.pop(i)
        wr.doErrback(err)


    def appendWaitingRequest(self, d, rid, timeout=None, poll=None, startup=False):
        """append waiting request
        """
        if timeout is None:
            timeout = self.wait
        if poll is None:
            poll = self._pollTimeout
        self.waiting_requests.append(
            WaitingRequest(d,
                           poll,
                           timeout = timeout,
                           rid = rid,
                           startup=startup))

    def returnWaitingRequests(self):
        """return a waiting request
        """
        while len(self.elems) > 0 and len(self.waiting_requests) > 0:
            data = self.elems
            self.elems = []
            self._wrPop(data)


    def onExpire(self):
        """ When the session expires call this. """
        if 'onExpire' in dir(self.pint):
            self.pint.onExpire(self.sid)
        if self.verbose and not getattr(self, 'terminated', False):
            log.msg('SESSION -> We have expired', self.sid, self.rid, self.waiting_requests)
        self.disconnect()

    def terminate(self):
        """Terminates the session."""
        self.wait = 0
        self.terminated = True
        if self.verbose:
            log.msg('SESSION -> Terminate')

        # if there are any elements hanging around and waiting
        # requests, send those off
        self.returnWaitingRequests()

        self.clearWaitingRequests()

        try:
            self.expire()
        except:
            self.onExpire()


        return defer.succeed(self.elems)

    def poll(self, d = None, rid = None):
        """Handles the responses to requests.

        This function is called for every request except session setup
        and session termination.  It handles the reply portion of the
        request by returning a deferred which will get called back
        when there is data or when the wait timeout expires.
        """
        # queue this request
        if d is None:
            d = defer.Deferred()
        if self.pint.error:
            d.addErrback(self.pint.error)
Loading ...