Repository URL to install this package:
|
Version:
2.1.0.jo1 ▾
|
#!/usr/bin/env python
#
# Copyright (C) 2009 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This module is used for version 2 of the Google Data APIs.
__author__ = 'j.s@google.com (Jeff Scudder)'
import sys
import types
import unittest
import gdata.gauth
import atom.http_core
import gdata.test_config as conf
PRIVATE_TEST_KEY = """
-----BEGIN PRIVATE KEY-----
MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBALRiMLAh9iimur8V
A7qVvdqxevEuUkW4K+2KdMXmnQbG9Aa7k7eBjK1S+0LYmVjPKlJGNXHDGuy5Fw/d
7rjVJ0BLB+ubPK8iA/Tw3hLQgXMRRGRXXCn8ikfuQfjUS1uZSatdLB81mydBETlJ
hI6GH4twrbDJCR2Bwy/XWXgqgGRzAgMBAAECgYBYWVtleUzavkbrPjy0T5FMou8H
X9u2AC2ry8vD/l7cqedtwMPp9k7TubgNFo+NGvKsl2ynyprOZR1xjQ7WgrgVB+mm
uScOM/5HVceFuGRDhYTCObE+y1kxRloNYXnx3ei1zbeYLPCHdhxRYW7T0qcynNmw
rn05/KO2RLjgQNalsQJBANeA3Q4Nugqy4QBUCEC09SqylT2K9FrrItqL2QKc9v0Z
zO2uwllCbg0dwpVuYPYXYvikNHHg+aCWF+VXsb9rpPsCQQDWR9TT4ORdzoj+Nccn
qkMsDmzt0EfNaAOwHOmVJ2RVBspPcxt5iN4HI7HNeG6U5YsFBb+/GZbgfBT3kpNG
WPTpAkBI+gFhjfJvRw38n3g/+UeAkwMI2TJQS4n8+hid0uus3/zOjDySH3XHCUno
cn1xOJAyZODBo47E+67R4jV1/gzbAkEAklJaspRPXP877NssM5nAZMU0/O/NGCZ+
3jPgDUno6WbJn5cqm8MqWhW1xGkImgRk+fkDBquiq4gPiT898jusgQJAd5Zrr6Q8
AO/0isr/3aa6O6NLQxISLKcPDk2NOccAfS/xOtfOz4sJYM3+Bs4Io9+dZGSDCA54
Lw03eHTNQghS0A==
-----END PRIVATE KEY-----"""
class AuthSubTest(unittest.TestCase):
def test_generate_request_url(self):
url = gdata.gauth.generate_auth_sub_url('http://example.com',
['http://example.net/scope1'])
self.assert_(isinstance(url, atom.http_core.Uri))
self.assertEqual(url.query['secure'], '0')
self.assertEqual(url.query['session'], '1')
self.assertEqual(url.query['scope'], 'http://example.net/scope1')
self.assertEqual(atom.http_core.Uri.parse_uri(
url.query['next']).query['auth_sub_scopes'],
'http://example.net/scope1')
self.assertEqual(atom.http_core.Uri.parse_uri(url.query['next']).path,
'/')
self.assertEqual(atom.http_core.Uri.parse_uri(url.query['next']).host,
'example.com')
def test_from_url(self):
token_str = gdata.gauth.auth_sub_string_from_url(
'http://example.com/?token=123abc')[0]
self.assertEqual(token_str, '123abc')
def test_from_http_body(self):
token_str = gdata.gauth.auth_sub_string_from_body('Something\n'
'Token=DQAA...7DCTN\n'
'Expiration=20061004T123456Z\n')
self.assertEqual(token_str, 'DQAA...7DCTN')
def test_modify_request(self):
token = gdata.gauth.AuthSubToken('tval')
request = atom.http_core.HttpRequest()
token.modify_request(request)
self.assertEqual(request.headers['Authorization'], 'AuthSub token=tval')
def test_create_and_upgrade_tokens(self):
token = gdata.gauth.AuthSubToken.from_url(
'http://example.com/?token=123abc')
self.assert_(isinstance(token, gdata.gauth.AuthSubToken))
self.assertEqual(token.token_string, '123abc')
self.assertEqual(token.scopes, [])
token._upgrade_token('Token=456def')
self.assertEqual(token.token_string, '456def')
self.assertEqual(token.scopes, [])
class SecureAuthSubTest(unittest.TestCase):
def test_build_data(self):
request = atom.http_core.HttpRequest(method='PUT')
request.uri = atom.http_core.Uri.parse_uri('http://example.com/foo?a=1')
data = gdata.gauth.build_auth_sub_data(request, 1234567890, 'mynonce')
self.assertEqual(data,
'PUT http://example.com/foo?a=1 1234567890 mynonce')
def test_generate_signature(self):
request = atom.http_core.HttpRequest(
method='GET', uri=atom.http_core.Uri(host='example.com', path='/foo',
query={'a': '1'}))
data = gdata.gauth.build_auth_sub_data(request, 1134567890, 'p234908')
self.assertEqual(data,
'GET http://example.com/foo?a=1 1134567890 p234908')
self.assertEqual(
gdata.gauth.generate_signature(data, PRIVATE_TEST_KEY),
'GeBfeIDnT41dvLquPgDB4U5D4hfxqaHk/5LX1kccNBnL4BjsHWU1djbEp7xp3BL9ab'
'QtLrK7oa/aHEHtGRUZGg87O+ND8iDPR76WFXAruuN8O8GCMqCDdPduNPY++LYO4MdJ'
'BZNY974Nn0m6Hc0/T4M1ElqvPhl61fkXMm+ElSM=')
class TokensToAndFromBlobsTest(unittest.TestCase):
def test_client_login_conversion(self):
token = gdata.gauth.ClientLoginToken('test|key')
copy = gdata.gauth.token_from_blob(gdata.gauth.token_to_blob(token))
self.assertEqual(token.token_string, copy.token_string)
self.assert_(isinstance(copy, gdata.gauth.ClientLoginToken))
def test_authsub_conversion(self):
token = gdata.gauth.AuthSubToken('test|key')
copy = gdata.gauth.token_from_blob(gdata.gauth.token_to_blob(token))
self.assertEqual(token.token_string, copy.token_string)
self.assert_(isinstance(copy, gdata.gauth.AuthSubToken))
scopes = ['http://example.com', 'http://other||test', 'thir|d']
token = gdata.gauth.AuthSubToken('key-=', scopes)
copy = gdata.gauth.token_from_blob(gdata.gauth.token_to_blob(token))
self.assertEqual(token.token_string, copy.token_string)
self.assert_(isinstance(copy, gdata.gauth.AuthSubToken))
self.assertEqual(token.scopes, scopes)
def test_join_and_split(self):
token_string = gdata.gauth._join_token_parts('1x', 'test|string', '%x%',
'', None)
self.assertEqual(token_string, '1x|test%7Cstring|%25x%25||')
token_type, a, b, c, d = gdata.gauth._split_token_parts(token_string)
self.assertEqual(token_type, '1x')
self.assertEqual(a, 'test|string')
self.assertEqual(b, '%x%')
self.assert_(c is None)
self.assert_(d is None)
def test_secure_authsub_conversion(self):
token = gdata.gauth.SecureAuthSubToken(
'%^%', 'myRsaKey', ['http://example.com', 'http://example.org'])
copy = gdata.gauth.token_from_blob(gdata.gauth.token_to_blob(token))
self.assertEqual(copy.token_string, '%^%')
self.assertEqual(copy.rsa_private_key, 'myRsaKey')
self.assertEqual(copy.scopes,
['http://example.com', 'http://example.org'])
token = gdata.gauth.SecureAuthSubToken(rsa_private_key='f',
token_string='b')
blob = gdata.gauth.token_to_blob(token)
self.assertEqual(blob, '1s|b|f')
copy = gdata.gauth.token_from_blob(blob)
self.assertEqual(copy.token_string, 'b')
self.assertEqual(copy.rsa_private_key, 'f')
self.assertEqual(copy.scopes, [])
token = gdata.gauth.SecureAuthSubToken(None, '')
blob = gdata.gauth.token_to_blob(token)
self.assertEqual(blob, '1s||')
copy = gdata.gauth.token_from_blob(blob)
self.assertEqual(copy.token_string, None)
self.assertEqual(copy.rsa_private_key, None)
self.assertEqual(copy.scopes, [])
token = gdata.gauth.SecureAuthSubToken('', None)
blob = gdata.gauth.token_to_blob(token)
self.assertEqual(blob, '1s||')
copy = gdata.gauth.token_from_blob(blob)
self.assertEqual(copy.token_string, None)
self.assertEqual(copy.rsa_private_key, None)
self.assertEqual(copy.scopes, [])
token = gdata.gauth.SecureAuthSubToken(
None, None, ['http://example.net', 'http://google.com'])
blob = gdata.gauth.token_to_blob(token)
self.assertEqual(
blob, '1s|||http%3A%2F%2Fexample.net|http%3A%2F%2Fgoogle.com')
copy = gdata.gauth.token_from_blob(blob)
self.assert_(copy.token_string is None)
self.assert_(copy.rsa_private_key is None)
self.assertEqual(copy.scopes, ['http://example.net', 'http://google.com'])
def test_oauth_rsa_conversion(self):
token = gdata.gauth.OAuthRsaToken(
'consumerKey', 'myRsa', 't', 'secret',
gdata.gauth.AUTHORIZED_REQUEST_TOKEN, 'http://example.com/next',
'verifier')
blob = gdata.gauth.token_to_blob(token)
self.assertEqual(
blob, '1r|consumerKey|myRsa|t|secret|2|http%3A%2F%2Fexample.com'
'%2Fnext|verifier')
copy = gdata.gauth.token_from_blob(blob)
self.assert_(isinstance(copy, gdata.gauth.OAuthRsaToken))
self.assertEqual(copy.consumer_key, token.consumer_key)
self.assertEqual(copy.rsa_private_key, token.rsa_private_key)
self.assertEqual(copy.token, token.token)
self.assertEqual(copy.token_secret, token.token_secret)
self.assertEqual(copy.auth_state, token.auth_state)
self.assertEqual(copy.next, token.next)
self.assertEqual(copy.verifier, token.verifier)
token = gdata.gauth.OAuthRsaToken(
'', 'myRsa', 't', 'secret', 0)
blob = gdata.gauth.token_to_blob(token)
self.assertEqual(blob, '1r||myRsa|t|secret|0||')
copy = gdata.gauth.token_from_blob(blob)
self.assert_(isinstance(copy, gdata.gauth.OAuthRsaToken))
self.assert_(copy.consumer_key != token.consumer_key)
self.assert_(copy.consumer_key is None)
self.assertEqual(copy.rsa_private_key, token.rsa_private_key)
self.assertEqual(copy.token, token.token)
self.assertEqual(copy.token_secret, token.token_secret)
self.assertEqual(copy.auth_state, token.auth_state)
self.assertEqual(copy.next, token.next)
self.assert_(copy.next is None)
self.assertEqual(copy.verifier, token.verifier)
self.assert_(copy.verifier is None)
token = gdata.gauth.OAuthRsaToken(
rsa_private_key='myRsa', token='t', token_secret='secret',
auth_state=gdata.gauth.ACCESS_TOKEN, verifier='v', consumer_key=None)
blob = gdata.gauth.token_to_blob(token)
self.assertEqual(blob, '1r||myRsa|t|secret|3||v')
copy = gdata.gauth.token_from_blob(blob)
self.assertEqual(copy.consumer_key, token.consumer_key)
self.assert_(copy.consumer_key is None)
self.assertEqual(copy.rsa_private_key, token.rsa_private_key)
self.assertEqual(copy.token, token.token)
self.assertEqual(copy.token_secret, token.token_secret)
self.assertEqual(copy.auth_state, token.auth_state)
self.assertEqual(copy.next, token.next)
self.assert_(copy.next is None)
self.assertEqual(copy.verifier, token.verifier)
def test_oauth_hmac_conversion(self):
token = gdata.gauth.OAuthHmacToken(
'consumerKey', 'consumerSecret', 't', 'secret',
gdata.gauth.REQUEST_TOKEN, 'http://example.com/next', 'verifier')
blob = gdata.gauth.token_to_blob(token)
self.assertEqual(
blob, '1h|consumerKey|consumerSecret|t|secret|1|http%3A%2F%2F'
'example.com%2Fnext|verifier')
copy = gdata.gauth.token_from_blob(blob)
self.assert_(isinstance(copy, gdata.gauth.OAuthHmacToken))
self.assertEqual(copy.consumer_key, token.consumer_key)
self.assertEqual(copy.consumer_secret, token.consumer_secret)
self.assertEqual(copy.token, token.token)
self.assertEqual(copy.token_secret, token.token_secret)
self.assertEqual(copy.auth_state, token.auth_state)
self.assertEqual(copy.next, token.next)
self.assertEqual(copy.verifier, token.verifier)
token = gdata.gauth.OAuthHmacToken(
consumer_secret='c,s', token='t', token_secret='secret',
auth_state=7, verifier='v', consumer_key=None)
blob = gdata.gauth.token_to_blob(token)
self.assertEqual(blob, '1h||c%2Cs|t|secret|7||v')
copy = gdata.gauth.token_from_blob(blob)
self.assert_(isinstance(copy, gdata.gauth.OAuthHmacToken))
self.assertEqual(copy.consumer_key, token.consumer_key)
self.assert_(copy.consumer_key is None)
self.assertEqual(copy.consumer_secret, token.consumer_secret)
self.assertEqual(copy.token, token.token)
self.assertEqual(copy.token_secret, token.token_secret)
self.assertEqual(copy.auth_state, token.auth_state)
self.assertEqual(copy.next, token.next)
self.assert_(copy.next is None)
self.assertEqual(copy.verifier, token.verifier)
def test_oauth2_conversion(self):
token = gdata.gauth.OAuth2Token(
'clientId', 'clientSecret', 'https://www.google.com/calendar/feeds',
'userAgent', 'https://accounts.google.com/o/oauth2/auth',
'https://accounts.google.com/o/oauth2/token',
'accessToken', 'refreshToken')
blob = gdata.gauth.token_to_blob(token)
self.assertEqual(
blob, '2o|clientId|clientSecret|https%3A%2F%2Fwww.google.com%2F'
'calendar%2Ffeeds|userAgent|https%3A%2F%2Faccounts.google.com%2F'
'o%2Foauth2%2Fauth|https%3A%2F%2Faccounts.google.com%2Fo%2Foauth2'
'%2Ftoken|accessToken|refreshToken')
copy = gdata.gauth.token_from_blob(blob)
self.assert_(isinstance(copy, gdata.gauth.OAuth2Token))
self.assertEqual(copy.client_id, token.client_id)
self.assertEqual(copy.client_secret, token.client_secret)
self.assertEqual(copy.scope, token.scope)
self.assertEqual(copy.user_agent, token.user_agent)
self.assertEqual(copy.auth_uri, token.auth_uri)
self.assertEqual(copy.token_uri, token.token_uri)
self.assertEqual(copy.access_token, token.access_token)
self.assertEqual(copy.refresh_token, token.refresh_token)
token = gdata.gauth.OAuth2Token(
'clientId', 'clientSecret', 'https://www.google.com/calendar/feeds',
'', 'https://accounts.google.com/o/oauth2/auth',
'https://accounts.google.com/o/oauth2/token',
'', '')
blob = gdata.gauth.token_to_blob(token)
self.assertEqual(
blob, '2o|clientId|clientSecret|https%3A%2F%2Fwww.google.com%2F'
'calendar%2Ffeeds||https%3A%2F%2Faccounts.google.com%2F'
'o%2Foauth2%2Fauth|https%3A%2F%2Faccounts.google.com%2Fo%2Foauth2'
'%2Ftoken||')
copy = gdata.gauth.token_from_blob(blob)
self.assert_(isinstance(copy, gdata.gauth.OAuth2Token))
self.assertEqual(copy.client_id, token.client_id)
self.assertEqual(copy.client_secret, token.client_secret)
self.assertEqual(copy.scope, token.scope)
self.assert_(copy.user_agent is None)
self.assertEqual(copy.auth_uri, token.auth_uri)
self.assertEqual(copy.token_uri, token.token_uri)
self.assert_(copy.access_token is None)
self.assert_(copy.refresh_token is None)
token = gdata.gauth.OAuth2Token(
'clientId', 'clientSecret', 'https://www.google.com/calendar/feeds',
None, 'https://accounts.google.com/o/oauth2/auth',
'https://accounts.google.com/o/oauth2/token')
blob = gdata.gauth.token_to_blob(token)
self.assertEqual(
blob, '2o|clientId|clientSecret|https%3A%2F%2Fwww.google.com%2F'
'calendar%2Ffeeds||https%3A%2F%2Faccounts.google.com%2F'
'o%2Foauth2%2Fauth|https%3A%2F%2Faccounts.google.com%2Fo%2Foauth2'
'%2Ftoken||')
copy = gdata.gauth.token_from_blob(blob)
self.assert_(isinstance(copy, gdata.gauth.OAuth2Token))
self.assertEqual(copy.client_id, token.client_id)
self.assertEqual(copy.client_secret, token.client_secret)
self.assertEqual(copy.scope, token.scope)
self.assert_(copy.user_agent is None)
self.assertEqual(copy.auth_uri, token.auth_uri)
self.assertEqual(copy.token_uri, token.token_uri)
self.assert_(copy.access_token is None)
self.assert_(copy.refresh_token is None)
def test_illegal_token_types(self):
class MyToken(object):
pass
token = MyToken()
self.assertRaises(gdata.gauth.UnsupportedTokenType,
gdata.gauth.token_to_blob, token)
blob = '~~z'
self.assertRaises(gdata.gauth.UnsupportedTokenType,
gdata.gauth.token_from_blob, blob)
class OAuthHmacTokenTests(unittest.TestCase):
def test_build_base_string(self):
request = atom.http_core.HttpRequest('http://example.com/', 'GET')
base_string = gdata.gauth.build_oauth_base_string(
request, 'example.org', '12345', gdata.gauth.HMAC_SHA1, 1246301653,
'1.0')
self.assertEqual(
base_string, 'GET&http%3A%2F%2Fexample.com%2F&oauth_callback%3Doob%2'
'6oauth_consumer_key%3Dexample.org%26oauth_nonce%3D12345%26oauth_sig'
'nature_method%3DHMAC-SHA1%26oauth_timestamp%3D1246301653%26oauth_ve'
'rsion%3D1.0')
# Test using example from documentation.
request = atom.http_core.HttpRequest(
'http://www.google.com/calendar/feeds/default/allcalendars/full'
'?orderby=starttime', 'GET')
base_string = gdata.gauth.build_oauth_base_string(
request, 'example.com', '4572616e48616d6d65724c61686176',
gdata.gauth.RSA_SHA1, 137131200, '1.0', token='1%2Fab3cd9j4ks73hf7g',
next='http://googlecodesamples.com/oauth_playground/index.php')
self.assertEqual(
base_string, 'GET&http%3A%2F%2Fwww.google.com%2Fcalendar%2Ffeeds%2Fd'
'efault%2Fallcalendars%2Ffull&oauth_callback%3Dhttp%253A%252F%252Fgo'
'oglecodesamples.com%252Foauth_playground%252Findex.php%26oauth_cons'
'umer_key%3Dexample.com%26oauth_nonce%3D4572616e48616d6d65724c616861'
'76%26oauth_signature_method%3DRSA-SHA1%26oauth_timestamp%3D13713120'
'0%26oauth_token%3D1%25252Fab3cd9j4ks73hf7g%26oauth_version%3D1.0%26'
'orderby%3Dstarttime')
# Test various defaults.
request = atom.http_core.HttpRequest('http://eXample.COM', 'get')
base_string = gdata.gauth.build_oauth_base_string(
request, 'example.org', '12345', gdata.gauth.HMAC_SHA1, 1246301653,
'1.0')
self.assertEqual(
base_string, 'GET&http%3A%2F%2Fexample.com%2F&oauth_callback%3Doob%2'
'6oauth_consumer_key%3Dexample.org%26oauth_nonce%3D12345%26oauth_sig'
'nature_method%3DHMAC-SHA1%26oauth_timestamp%3D1246301653%26oauth_ve'
'rsion%3D1.0')
request = atom.http_core.HttpRequest('https://eXample.COM:443', 'get')
base_string = gdata.gauth.build_oauth_base_string(
request, 'example.org', '12345', gdata.gauth.HMAC_SHA1, 1246301653,
'1.0', 'http://googlecodesamples.com/oauth_playground/index.php')
self.assertEqual(
base_string, 'GET&https%3A%2F%2Fexample.com%2F&oauth_callback%3Dhttp'
'%253A%252F%252Fgooglecodesamples.com%252Foauth_playground%252Findex'
'.php%26oauth_consumer_key%3Dexample.org%26oauth_nonce%3D12345%26oau'
'th_signature_method%3DHMAC-SHA1%26oauth_timestamp%3D1246301653%26oa'
'uth_version%3D1.0')
request = atom.http_core.HttpRequest('http://eXample.COM:443', 'get')
base_string = gdata.gauth.build_oauth_base_string(
request, 'example.org', '12345', gdata.gauth.HMAC_SHA1, 1246301653,
'1.0')
self.assertEqual(
base_string, 'GET&http%3A%2F%2Fexample.com%3A443%2F&oauth_callback%3'
'Doob%26oauth_consumer_key%3De'
'xample.org%26oauth_nonce%3D12345%26oauth_signature_method%3DHMAC-SH'
'A1%26oauth_timestamp%3D1246301653%26oauth_version%3D1.0')
request = atom.http_core.HttpRequest(
atom.http_core.Uri(host='eXample.COM'), 'GET')
base_string = gdata.gauth.build_oauth_base_string(
request, 'example.org', '12345', gdata.gauth.HMAC_SHA1, 1246301653,
'1.0', next='oob')
self.assertEqual(
base_string, 'GET&http%3A%2F%2Fexample.com%2F&oauth_callback%3Doob%2'
'6oauth_consumer_key%3Dexample.org%26oauth_nonce%3D12345%26oauth_sig'
'nature_method%3DHMAC-SHA1%26oauth_timestamp%3D1246301653%26oauth_ve'
'rsion%3D1.0')
request = atom.http_core.HttpRequest(
'https://www.google.com/accounts/OAuthGetRequestToken', 'GET')
request.uri.query['scope'] = ('https://docs.google.com/feeds/'
' http://docs.google.com/feeds/')
base_string = gdata.gauth.build_oauth_base_string(
request, 'anonymous', '48522759', gdata.gauth.HMAC_SHA1, 1246489532,
'1.0', 'http://googlecodesamples.com/oauth_playground/index.php')
self.assertEqual(
base_string, 'GET&https%3A%2F%2Fwww.google.com%2Faccounts%2FOAuthGet'
'RequestToken&oauth_callback%3Dhttp%253A%252F%252Fgooglecodesamples.'
'com%252Foauth_playground%252Findex.php%26oauth_consumer_key%3Danony'
'mous%26oauth_nonce%3D4852275'
'9%26oauth_signature_method%3DHMAC-SHA1%26oauth_timestamp%3D12464895'
'32%26oauth_version%3D1.0%26scope%3Dhttps%253A%252F%252Fdocs.google.'
'com%252Ffeeds%252F%2520http%253A%252F%252Fdocs.google.com%252Ffeeds'
'%252F')
def test_generate_hmac_signature(self):
# Use the example from the OAuth playground:
# http://googlecodesamples.com/oauth_playground/
request = atom.http_core.HttpRequest(
'https://www.google.com/accounts/OAuthGetRequestToken?'
'scope=http%3A%2F%2Fwww.blogger.com%2Ffeeds%2F', 'GET')
signature = gdata.gauth.generate_hmac_signature(
request, 'anonymous', 'anonymous', '1246491360',
'c0155b3f28697c029e7a62efff44bd46', '1.0',
next='http://googlecodesamples.com/oauth_playground/index.php')
self.assertEqual(signature, '5a2GPdtAY3LWYv8IdiT3wp1Coeg=')
# Try the same request but with a non escaped Uri object.
request = atom.http_core.HttpRequest(
'https://www.google.com/accounts/OAuthGetRequestToken', 'GET')
request.uri.query['scope'] = 'http://www.blogger.com/feeds/'
signature = gdata.gauth.generate_hmac_signature(
request, 'anonymous', 'anonymous', '1246491360',
'c0155b3f28697c029e7a62efff44bd46', '1.0',
'http://googlecodesamples.com/oauth_playground/index.php')
self.assertEqual(signature, '5a2GPdtAY3LWYv8IdiT3wp1Coeg=')
# A different request also checked against the OAuth playground.
request = atom.http_core.HttpRequest(
'https://www.google.com/accounts/OAuthGetRequestToken', 'GET')
request.uri.query['scope'] = ('https://www.google.com/analytics/feeds/ '
'http://www.google.com/base/feeds/ '
'http://www.google.com/calendar/feeds/')
signature = gdata.gauth.generate_hmac_signature(
request, 'anonymous', 'anonymous', 1246491797,
'33209c4d7a09be4eb1d6ff18e00f8548', '1.0',
next='http://googlecodesamples.com/oauth_playground/index.php')
self.assertEqual(signature, 'kFAgTTFDIWz4/xAabIlrcZZMTq8=')
class OAuthRsaTokenTests(unittest.TestCase):
def test_generate_rsa_signature(self):
request = atom.http_core.HttpRequest(
'https://www.google.com/accounts/OAuthGetRequestToken?'
'scope=http%3A%2F%2Fwww.blogger.com%2Ffeeds%2F', 'GET')
signature = gdata.gauth.generate_rsa_signature(
request, 'anonymous', PRIVATE_TEST_KEY, '1246491360',
'c0155b3f28697c029e7a62efff44bd46', '1.0',
next='http://googlecodesamples.com/oauth_playground/index.php')
self.assertEqual(
signature,
'bfMantdttKaTrwoxU87JiXmMeXhAiXPiq79a5XmLlOYwwlX06Pu7CafMp7hW1fPeZtL'
'4o9Sz3NvPI8GECCaZk7n5vi1EJ5/wfIQbddrC8j45joBG6gFSf4tRJct82dSyn6bd71'
'knwPZH1sKK46Y0ePJvEIDI3JDd7pRZuMM2sN8=')
class OAuth2TokenTests(unittest.TestCase):
def test_generate_authorize_url(self):
token = gdata.gauth.OAuth2Token('clientId', 'clientSecret',
'https://www.google.com/calendar/feeds',
'userAgent')
url = token.generate_authorize_url()
self.assertEqual(
url,
'https://accounts.google.com/o/oauth2/auth?access_type=offline&'
'redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&response_type=code&'
'client_id=clientId&approval_prompt=auto&'
'scope=https%3A%2F%2Fwww.google.com%2Fcalendar%2Ffeeds')
url = token.generate_authorize_url('https://www.example.com/redirect',
'token')
self.assertEqual(
url,
'https://accounts.google.com/o/oauth2/auth?access_type=offline&'
'redirect_uri=https%3A%2F%2Fwww.example.com%2Fredirect&'
'response_type=token&client_id=clientId&approval_prompt=auto&'
'scope=https%3A%2F%2Fwww.google.com%2Fcalendar%2Ffeeds')
url = token.generate_authorize_url(access_type='online')
self.assertEqual(
url,
'https://accounts.google.com/o/oauth2/auth?access_type=online&'
'redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&response_type=code&'
'client_id=clientId&approval_prompt=auto&'
'scope=https%3A%2F%2Fwww.google.com%2Fcalendar%2Ffeeds')
def test_modify_request(self):
token = gdata.gauth.OAuth2Token('clientId', 'clientSecret',
'https://www.google.com/calendar/feeds',
'userAgent', access_token='accessToken')
request = atom.http_core.HttpRequest()
token.modify_request(request)
self.assertEqual(request.headers['Authorization'], 'Bearer accessToken')
class OAuth2TokenFromCredentialsTest(unittest.TestCase):
class DummyCredentials(object):
def __init__(self, *args, **kwargs):
self.client_id = 'client_id'
self.client_secret = 'client_secret'
self.user_agent = 'user_agent'
self.token_uri = 'token_uri'
self.access_token = 'access_token'
self.refresh_token = 'refresh_token'
self.token_expiry = 'token_expiry'
self.invalid = 'invalid'
self._refresh_called = False
def _refresh(self, unused_http_request):
self._refresh_called = True
def setUp(self):
# Don't want to force the test to have a dependency on httplib2,
# which is a dependency of google-api-python-client
self._httplib2 = sys.modules.get('httplib2')
dummy_httplib2 = types.ModuleType('httplib2')
class DummyHttp(object):
def request(self, *args, **kwargs):
pass
dummy_httplib2.Http = DummyHttp
sys.modules['httplib2'] = dummy_httplib2
self.credentials = self.DummyCredentials()
self.token = gdata.gauth.OAuth2TokenFromCredentials(self.credentials)
def tearDown(self):
del sys.modules['httplib2']
if self._httplib2 is not None:
sys.modules['httplib2'] = self._httplib2
del self._httplib2
def _check_all_values(self):
self.assertEqual(self.token.client_id, self.credentials.client_id)
self.assertEqual(self.token.client_secret, self.credentials.client_secret)
self.assertEqual(self.token.user_agent, self.credentials.user_agent)
self.assertEqual(self.token.token_uri, self.credentials.token_uri)
self.assertEqual(self.token.access_token, self.credentials.access_token)
self.assertEqual(self.token.refresh_token, self.credentials.refresh_token)
self.assertEqual(self.token.token_expiry, self.credentials.token_expiry)
self.assertEqual(self.token._invalid, self.credentials.invalid)
def test_get_proxied_attributes(self):
self._check_all_values()
def test_get_proxied_values_notset(self):
proxy_keys = ['client_id', 'client_secret', 'user_agent', 'token_uri',
'access_token', 'refresh_token', 'token_expiry', '_invalid']
for key in proxy_keys:
self.assertFalse(self.token.__dict__.has_key(key))
def test_get_proxied_values_change_credentials(self):
new_value = 'NEW_VALUE'
self.credentials.access_token = new_value
self.assertEqual(self.token.access_token, new_value)
self.credentials.invalid = new_value
self.assertEqual(self.token._invalid, new_value)
def test_get_proxied_values_change_credentials(self):
# Make sure scope is not a valid attribute (ignored in this subclass)
self.assertFalse(self.token.__dict__.has_key('scope'))
self.assertFalse(self.token.__class__.__dict__.has_key('scope'))
# Make sure attribute lookup fails as it should
self.assertRaises(AttributeError, getattr, self.token, 'scope')
self.assertRaises(AttributeError, lambda: self.token.scope)
def test_set_proxied_values(self):
# Check all values after each setattr to make sure no side affects
self.token.client_id = 'value1'
self._check_all_values()
self.token.client_secret = 'value2'
self._check_all_values()
self.token.user_agent = 'value3'
self._check_all_values()
self.token.token_uri = 'value4'
self._check_all_values()
self.token.access_token = 'value5'
self._check_all_values()
self.token.refresh_token = 'value6'
self._check_all_values()
self.token.token_expiry = 'value7'
self._check_all_values()
self.token._invalid = 'value8'
self._check_all_values()
def test_set_proxied_values_nonprotected_attribute(self):
self.assertFalse(self.token.__dict__.has_key('scope'))
self.assertFalse(self.token.__class__.__dict__.has_key('scope'))
self.token.scope = 'value'
self.assertEqual(self.token.scope, 'value')
self.assertFalse(hasattr(self.credentials, 'scope'))
self._check_all_values()
def test_disallowed(self):
self.assertRaises(NotImplementedError, self.token.generate_authorize_url)
self.assertRaises(NotImplementedError, self.token.get_access_token)
self.assertRaises(NotImplementedError, self.token.revoke)
self.assertRaises(NotImplementedError, self.token._extract_tokens)
def test_refresh(self):
dummy_gdata_request_object = object()
self.assertFalse(self.credentials._refresh_called)
self.token._refresh(dummy_gdata_request_object)
self.assertTrue(self.credentials._refresh_called)
class OAuthHeaderTest(unittest.TestCase):
def test_generate_auth_header(self):
header = gdata.gauth.generate_auth_header(
'consumerkey', 1234567890, 'mynonce', 'unknown_sig_type', 'sig')
self.assert_(header.startswith('OAuth'))
self.assert_(header.find('oauth_nonce="mynonce"') > -1)
self.assert_(header.find('oauth_timestamp="1234567890"') > -1)
self.assert_(header.find('oauth_consumer_key="consumerkey"') > -1)
self.assert_(
header.find('oauth_signature_method="unknown_sig_type"') > -1)
self.assert_(header.find('oauth_version="1.0"') > -1)
self.assert_(header.find('oauth_signature="sig"') > -1)
header = gdata.gauth.generate_auth_header(
'consumer/key', 1234567890, 'ab%&33', '', 'ab/+-_=')
self.assert_(header.find('oauth_nonce="ab%25%2633"') > -1)
self.assert_(header.find('oauth_consumer_key="consumer%2Fkey"') > -1)
self.assert_(header.find('oauth_signature_method=""') > -1)
self.assert_(header.find('oauth_signature="ab%2F%2B-_%3D"') > -1)
class OAuthGetRequestToken(unittest.TestCase):
def test_request_hmac_request_token(self):
request = gdata.gauth.generate_request_for_request_token(
'anonymous', gdata.gauth.HMAC_SHA1,
['http://www.blogger.com/feeds/',
'http://www.google.com/calendar/feeds/'],
consumer_secret='anonymous')
request_uri = str(request.uri)
self.assert_('http%3A%2F%2Fwww.blogger.com%2Ffeeds%2F' in request_uri)
self.assert_(
'http%3A%2F%2Fwww.google.com%2Fcalendar%2Ffeeds%2F' in request_uri)
auth_header = request.headers['Authorization']
self.assert_('oauth_consumer_key="anonymous"' in auth_header)
self.assert_('oauth_signature_method="HMAC-SHA1"' in auth_header)
self.assert_('oauth_version="1.0"' in auth_header)
self.assert_('oauth_signature="' in auth_header)
self.assert_('oauth_nonce="' in auth_header)
self.assert_('oauth_timestamp="' in auth_header)
def test_request_rsa_request_token(self):
request = gdata.gauth.generate_request_for_request_token(
'anonymous', gdata.gauth.RSA_SHA1,
['http://www.blogger.com/feeds/',
'http://www.google.com/calendar/feeds/'],
rsa_key=PRIVATE_TEST_KEY)
request_uri = str(request.uri)
self.assert_('http%3A%2F%2Fwww.blogger.com%2Ffeeds%2F' in request_uri)
self.assert_(
'http%3A%2F%2Fwww.google.com%2Fcalendar%2Ffeeds%2F' in request_uri)
auth_header = request.headers['Authorization']
self.assert_('oauth_consumer_key="anonymous"' in auth_header)
self.assert_('oauth_signature_method="RSA-SHA1"' in auth_header)
self.assert_('oauth_version="1.0"' in auth_header)
self.assert_('oauth_signature="' in auth_header)
self.assert_('oauth_nonce="' in auth_header)
self.assert_('oauth_timestamp="' in auth_header)
def test_extract_token_from_body(self):
body = ('oauth_token=4%2F5bNFM_efIu3yN-E9RrF1KfZzOAZG&oauth_token_secret='
'%2B4O49V9WUOkjXgpOobAtgYzy&oauth_callback_confirmed=true')
token, secret = gdata.gauth.oauth_token_info_from_body(body)
self.assertEqual(token, '4/5bNFM_efIu3yN-E9RrF1KfZzOAZG')
self.assertEqual(secret, '+4O49V9WUOkjXgpOobAtgYzy')
def test_hmac_request_token_from_body(self):
body = ('oauth_token=4%2F5bNFM_efIu3yN-E9RrF1KfZzOAZG&oauth_token_secret='
'%2B4O49V9WUOkjXgpOobAtgYzy&oauth_callback_confirmed=true')
request_token = gdata.gauth.hmac_token_from_body(body, 'myKey',
'mySecret', True)
self.assertEqual(request_token.consumer_key, 'myKey')
self.assertEqual(request_token.consumer_secret, 'mySecret')
self.assertEqual(request_token.token, '4/5bNFM_efIu3yN-E9RrF1KfZzOAZG')
self.assertEqual(request_token.token_secret, '+4O49V9WUOkjXgpOobAtgYzy')
self.assertEqual(request_token.auth_state, gdata.gauth.REQUEST_TOKEN)
def test_rsa_request_token_from_body(self):
body = ('oauth_token=4%2F5bNFM_efIu3yN-E9RrF1KfZzOAZG&oauth_token_secret='
'%2B4O49V9WUOkjXgpOobAtgYzy&oauth_callback_confirmed=true')
request_token = gdata.gauth.rsa_token_from_body(body, 'myKey',
'rsaKey', True)
self.assertEqual(request_token.consumer_key, 'myKey')
self.assertEqual(request_token.rsa_private_key, 'rsaKey')
self.assertEqual(request_token.token, '4/5bNFM_efIu3yN-E9RrF1KfZzOAZG')
self.assertEqual(request_token.token_secret, '+4O49V9WUOkjXgpOobAtgYzy')
self.assertEqual(request_token.auth_state, gdata.gauth.REQUEST_TOKEN)
class OAuthAuthorizeToken(unittest.TestCase):
def test_generate_authorization_url(self):
url = gdata.gauth.generate_oauth_authorization_url('/+=aosdpikk')
self.assert_(str(url).startswith(
'https://www.google.com/accounts/OAuthAuthorizeToken'))
self.assert_('oauth_token=%2F%2B%3Daosdpikk' in str(url))
def test_extract_auth_token(self):
url = ('http://www.example.com/test?oauth_token='
'CKF50YzIHxCT85KMAg&oauth_verifier=123zzz')
token = gdata.gauth.oauth_token_info_from_url(url)
self.assertEqual(token[0], 'CKF50YzIHxCT85KMAg')
self.assertEqual(token[1], '123zzz')
class FindScopesForService(unittest.TestCase):
def test_find_all_scopes(self):
count = 0
for key, scopes in gdata.gauth.AUTH_SCOPES.iteritems():
count += len(scopes)
self.assertEqual(count, len(gdata.gauth.find_scopes_for_services()))
def test_single_service(self):
self.assertEqual(
gdata.gauth.FindScopesForServices(('codesearch',)),
['http://www.google.com/codesearch/feeds/'])
def test_multiple_services(self):
self.assertEqual(
set(gdata.gauth.find_scopes_for_services(('jotspot', 'wise'))),
set(['http://sites.google.com/feeds/',
'https://sites.google.com/feeds/',
'https://spreadsheets.google.com/feeds/']))
def suite():
return conf.build_suite([AuthSubTest, TokensToAndFromBlobsTest,
OAuthHmacTokenTests, OAuthRsaTokenTests,
OAuthHeaderTest, OAuthGetRequestToken,
OAuthAuthorizeToken, FindScopesForService])
if __name__ == '__main__':
unittest.main()