Repository URL to install this package:
Version:
16.3.0 ▾
|
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.words.protocols.jabber.client}
"""
from hashlib import sha1
from twisted.internet import defer
from twisted.trial import unittest
from twisted.words.protocols.jabber import client, error, jid, xmlstream
from twisted.words.protocols.jabber.sasl import SASLInitiatingInitializer
from twisted.words.xish import utility
IQ_AUTH_GET = '/iq[@type="get"]/query[@xmlns="jabber:iq:auth"]'
IQ_AUTH_SET = '/iq[@type="set"]/query[@xmlns="jabber:iq:auth"]'
NS_BIND = 'urn:ietf:params:xml:ns:xmpp-bind'
IQ_BIND_SET = '/iq[@type="set"]/bind[@xmlns="%s"]' % NS_BIND
NS_SESSION = 'urn:ietf:params:xml:ns:xmpp-session'
IQ_SESSION_SET = '/iq[@type="set"]/session[@xmlns="%s"]' % NS_SESSION
class CheckVersionInitializerTests(unittest.TestCase):
def setUp(self):
a = xmlstream.Authenticator()
xs = xmlstream.XmlStream(a)
self.init = client.CheckVersionInitializer(xs)
def testSupported(self):
"""
Test supported version number 1.0
"""
self.init.xmlstream.version = (1, 0)
self.init.initialize()
def testNotSupported(self):
"""
Test unsupported version number 0.0, and check exception.
"""
self.init.xmlstream.version = (0, 0)
exc = self.assertRaises(error.StreamError, self.init.initialize)
self.assertEqual('unsupported-version', exc.condition)
class InitiatingInitializerHarness(object):
"""
Testing harness for interacting with XML stream initializers.
This sets up an L{utility.XmlPipe} to create a communication channel between
the initializer and the stubbed receiving entity. It features a sink and
source side that both act similarly to a real L{xmlstream.XmlStream}. The
sink is augmented with an authenticator to which initializers can be added.
The harness also provides some utility methods to work with event observers
and deferreds.
"""
def setUp(self):
self.output = []
self.pipe = utility.XmlPipe()
self.xmlstream = self.pipe.sink
self.authenticator = xmlstream.ConnectAuthenticator('example.org')
self.xmlstream.authenticator = self.authenticator
def waitFor(self, event, handler):
"""
Observe an output event, returning a deferred.
The returned deferred will be fired when the given event has been
observed on the source end of the L{XmlPipe} tied to the protocol
under test. The handler is added as the first callback.
@param event: The event to be observed. See
L{utility.EventDispatcher.addOnetimeObserver}.
@param handler: The handler to be called with the observed event object.
@rtype: L{defer.Deferred}.
"""
d = defer.Deferred()
d.addCallback(handler)
self.pipe.source.addOnetimeObserver(event, d.callback)
return d
class IQAuthInitializerTests(InitiatingInitializerHarness, unittest.TestCase):
"""
Tests for L{client.IQAuthInitializer}.
"""
def setUp(self):
super(IQAuthInitializerTests, self).setUp()
self.init = client.IQAuthInitializer(self.xmlstream)
self.authenticator.jid = jid.JID('user@example.com/resource')
self.authenticator.password = 'secret'
def testPlainText(self):
"""
Test plain-text authentication.
Act as a server supporting plain-text authentication and expect the
C{password} field to be filled with the password. Then act as if
authentication succeeds.
"""
def onAuthGet(iq):
"""
Called when the initializer sent a query for authentication methods.
The response informs the client that plain-text authentication
is supported.
"""
# Create server response
response = xmlstream.toResponse(iq, 'result')
response.addElement(('jabber:iq:auth', 'query'))
response.query.addElement('username')
response.query.addElement('password')
response.query.addElement('resource')
# Set up an observer for the next request we expect.
d = self.waitFor(IQ_AUTH_SET, onAuthSet)
# Send server response
self.pipe.source.send(response)
return d
def onAuthSet(iq):
"""
Called when the initializer sent the authentication request.
The server checks the credentials and responds with an empty result
signalling success.
"""
self.assertEqual('user', unicode(iq.query.username))
self.assertEqual('secret', unicode(iq.query.password))
self.assertEqual('resource', unicode(iq.query.resource))
# Send server response
response = xmlstream.toResponse(iq, 'result')
self.pipe.source.send(response)
# Set up an observer for the request for authentication fields
d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
# Start the initializer
d2 = self.init.initialize()
return defer.gatherResults([d1, d2])
def testDigest(self):
"""
Test digest authentication.
Act as a server supporting digest authentication and expect the
C{digest} field to be filled with a sha1 digest of the concatenated
stream session identifier and password. Then act as if authentication
succeeds.
"""
def onAuthGet(iq):
"""
Called when the initializer sent a query for authentication methods.
The response informs the client that digest authentication is
supported.
"""
# Create server response
response = xmlstream.toResponse(iq, 'result')
response.addElement(('jabber:iq:auth', 'query'))
response.query.addElement('username')
response.query.addElement('digest')
response.query.addElement('resource')
# Set up an observer for the next request we expect.
d = self.waitFor(IQ_AUTH_SET, onAuthSet)
# Send server response
self.pipe.source.send(response)
return d
def onAuthSet(iq):
"""
Called when the initializer sent the authentication request.
The server checks the credentials and responds with an empty result
signalling success.
"""
self.assertEqual('user', unicode(iq.query.username))
self.assertEqual(sha1('12345secret').hexdigest(),
unicode(iq.query.digest).encode('utf-8'))
self.assertEqual('resource', unicode(iq.query.resource))
# Send server response
response = xmlstream.toResponse(iq, 'result')
self.pipe.source.send(response)
# Digest authentication relies on the stream session identifier. Set it.
self.xmlstream.sid = u'12345'
# Set up an observer for the request for authentication fields
d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
# Start the initializer
d2 = self.init.initialize()
return defer.gatherResults([d1, d2])
def testFailRequestFields(self):
"""
Test initializer failure of request for fields for authentication.
"""
def onAuthGet(iq):
"""
Called when the initializer sent a query for authentication methods.
The server responds that the client is not authorized to authenticate.
"""
response = error.StanzaError('not-authorized').toResponse(iq)
self.pipe.source.send(response)
# Set up an observer for the request for authentication fields
d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
# Start the initializer
d2 = self.init.initialize()
# The initialized should fail with a stanza error.
self.assertFailure(d2, error.StanzaError)
return defer.gatherResults([d1, d2])
def testFailAuth(self):
"""
Test initializer failure to authenticate.
"""
def onAuthGet(iq):
"""
Called when the initializer sent a query for authentication methods.
The response informs the client that plain-text authentication
is supported.
"""
# Send server response
response = xmlstream.toResponse(iq, 'result')
response.addElement(('jabber:iq:auth', 'query'))
response.query.addElement('username')
response.query.addElement('password')
response.query.addElement('resource')
# Set up an observer for the next request we expect.
d = self.waitFor(IQ_AUTH_SET, onAuthSet)
# Send server response
self.pipe.source.send(response)
return d
def onAuthSet(iq):
"""
Called when the initializer sent the authentication request.
The server checks the credentials and responds with a not-authorized
stanza error.
"""
response = error.StanzaError('not-authorized').toResponse(iq)
self.pipe.source.send(response)
# Set up an observer for the request for authentication fields
d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
# Start the initializer
d2 = self.init.initialize()
# The initializer should fail with a stanza error.
self.assertFailure(d2, error.StanzaError)
return defer.gatherResults([d1, d2])
class BindInitializerTests(InitiatingInitializerHarness, unittest.TestCase):
"""
Tests for L{client.BindInitializer}.
"""
def setUp(self):
super(BindInitializerTests, self).setUp()
self.init = client.BindInitializer(self.xmlstream)
self.authenticator.jid = jid.JID('user@example.com/resource')
def testBasic(self):
"""
Set up a stream, and act as if resource binding succeeds.
"""
def onBind(iq):
response = xmlstream.toResponse(iq, 'result')
response.addElement((NS_BIND, 'bind'))
response.bind.addElement('jid',
content='user@example.com/other resource')
self.pipe.source.send(response)
def cb(result):
self.assertEqual(jid.JID('user@example.com/other resource'),
self.authenticator.jid)
d1 = self.waitFor(IQ_BIND_SET, onBind)
d2 = self.init.start()
d2.addCallback(cb)
return defer.gatherResults([d1, d2])
def testFailure(self):
"""
Set up a stream, and act as if resource binding fails.
"""
def onBind(iq):
response = error.StanzaError('conflict').toResponse(iq)
self.pipe.source.send(response)
d1 = self.waitFor(IQ_BIND_SET, onBind)
d2 = self.init.start()
self.assertFailure(d2, error.StanzaError)
return defer.gatherResults([d1, d2])
class SessionInitializerTests(InitiatingInitializerHarness, unittest.TestCase):
"""
Tests for L{client.SessionInitializer}.
"""
def setUp(self):
super(SessionInitializerTests, self).setUp()
self.init = client.SessionInitializer(self.xmlstream)
def testSuccess(self):
"""
Set up a stream, and act as if session establishment succeeds.
"""
def onSession(iq):
response = xmlstream.toResponse(iq, 'result')
self.pipe.source.send(response)
d1 = self.waitFor(IQ_SESSION_SET, onSession)
d2 = self.init.start()
return defer.gatherResults([d1, d2])
def testFailure(self):
"""
Set up a stream, and act as if session establishment fails.
"""
def onSession(iq):
response = error.StanzaError('forbidden').toResponse(iq)
self.pipe.source.send(response)
d1 = self.waitFor(IQ_SESSION_SET, onSession)
d2 = self.init.start()
self.assertFailure(d2, error.StanzaError)
return defer.gatherResults([d1, d2])
class XMPPAuthenticatorTests(unittest.TestCase):
"""
Test for both XMPPAuthenticator and XMPPClientFactory.
"""
def testBasic(self):
"""
Test basic operations.
Setup an XMPPClientFactory, which sets up an XMPPAuthenticator, and let
it produce a protocol instance. Then inspect the instance variables of
the authenticator and XML stream objects.
"""
self.client_jid = jid.JID('user@example.com/resource')
# Get an XmlStream instance. Note that it gets initialized with the
# XMPPAuthenticator (that has its associateWithXmlStream called) that
# is in turn initialized with the arguments to the factory.
xs = client.XMPPClientFactory(self.client_jid,
'secret').buildProtocol(None)
# test authenticator's instance variables
self.assertEqual('example.com', xs.authenticator.otherHost)
self.assertEqual(self.client_jid, xs.authenticator.jid)
self.assertEqual('secret', xs.authenticator.password)
# test list of initializers
version, tls, sasl, bind, session = xs.initializers
self.assert_(isinstance(tls, xmlstream.TLSInitiatingInitializer))
self.assert_(isinstance(sasl, SASLInitiatingInitializer))
self.assert_(isinstance(bind, client.BindInitializer))
self.assert_(isinstance(session, client.SessionInitializer))
self.assertFalse(tls.required)
self.assertTrue(sasl.required)
self.assertFalse(bind.required)
self.assertFalse(session.required)