Repository URL to install this package:
|
Version:
3.16.7 ▾
|
# pylint: disable=missing-docstring
import os
import time
import jwt
import pytest
import responses
from ibm_cloud_sdk_core import IAMTokenManager, ApiException, get_authenticator_from_environment
def get_access_token() -> str:
access_token_layout = {
"username": "dummy",
"role": "Admin",
"permissions": ["administrator", "manage_catalog"],
"sub": "admin",
"iss": "sss",
"aud": "sss",
"uid": "sss",
"iat": 3600,
"exp": int(time.time()),
}
access_token = jwt.encode(
access_token_layout, 'secret', algorithm='HS256', headers={'kid': '230498151c214b788dd97f22b85410a5'}
)
return access_token
@responses.activate
def test_request_token_auth_default():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = """{
"access_token": "oAeisG8yqPY7sFR_x66Z15",
"token_type": "Bearer",
"expires_in": 3600,
"expiration": 1524167011,
"refresh_token": "jy4gl91BQ"
}"""
responses.add(responses.POST, url=iam_url, body=response, status=200)
token_manager = IAMTokenManager("apikey")
token_manager.request_token()
assert len(responses.calls) == 1
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers.get('Authorization') is None
assert responses.calls[0].response.text == response
@responses.activate
def test_request_token_auth_in_ctor():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = """{
"access_token": "oAeisG8yqPY7sFR_x66Z15",
"token_type": "Bearer",
"expires_in": 3600,
"expiration": 1524167011,
"refresh_token": "jy4gl91BQ"
}"""
default_auth_header = 'Basic Yng6Yng='
responses.add(responses.POST, url=iam_url, body=response, status=200)
token_manager = IAMTokenManager("apikey", url=iam_url, client_id='foo', client_secret='bar')
token_manager.request_token()
assert len(responses.calls) == 1
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers['Authorization'] != default_auth_header
assert responses.calls[0].response.text == response
assert 'scope' not in responses.calls[0].response.request.body
@responses.activate
def test_request_token_auth_in_ctor_with_scope():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = """{
"access_token": "oAeisG8yqPY7sFR_x66Z15",
"token_type": "Bearer",
"expires_in": 3600,
"expiration": 1524167011,
"refresh_token": "jy4gl91BQ"
}"""
default_auth_header = 'Basic Yng6Yng='
responses.add(responses.POST, url=iam_url, body=response, status=200)
token_manager = IAMTokenManager("apikey", url=iam_url, client_id='foo', client_secret='bar', scope='john snow')
token_manager.request_token()
assert len(responses.calls) == 1
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers['Authorization'] != default_auth_header
assert responses.calls[0].response.text == response
assert 'scope=john+snow' in responses.calls[0].response.request.body
@responses.activate
def test_request_token_unsuccessful():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = """{
"context": {
"requestId": "38a0e9c226d94764820d92aa623eb0f6",
"requestType": "incoming.Identity_Token",
"userAgent": "ibm-python-sdk-core-1.0.0",
"url": "https://iam.cloud.ibm.com",
"instanceId": "iamid-4.5-6788-90b137c-75f48695b5-kl4wx",
"threadId": "169de5",
"host": "iamid-4.5-6788-90b137c-75f48695b5-kl4wx",
"startTime": "29.10.2019 12:31:00:300 GMT",
"endTime": "29.10.2019 12:31:00:381 GMT",
"elapsedTime": "81",
"locale": "en_US",
"clusterName": "iam-id-prdal12-8brn"
},
"errorCode": "BXNIM0415E",
"errorMessage": "Provided API key could not be found"
}
"""
responses.add(responses.POST, url=iam_url, body=response, status=400)
token_manager = IAMTokenManager("apikey")
with pytest.raises(ApiException):
token_manager.request_token()
assert len(responses.calls) == 1
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].response.text == response
@responses.activate
def test_request_token_auth_in_ctor_client_id_only():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = """{
"access_token": "oAeisG8yqPY7sFR_x66Z15",
"token_type": "Bearer",
"expires_in": 3600,
"expiration": 1524167011,
"refresh_token": "jy4gl91BQ"
}"""
responses.add(responses.POST, url=iam_url, body=response, status=200)
token_manager = IAMTokenManager("iam_apikey", url=iam_url, client_id='foo')
token_manager.request_token()
assert len(responses.calls) == 1
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers.get('Authorization') is None
assert responses.calls[0].response.text == response
assert 'scope' not in responses.calls[0].response.request.body
@responses.activate
def test_request_token_auth_in_ctor_secret_only():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = """{
"access_token": "oAeisG8yqPY7sFR_x66Z15",
"token_type": "Bearer",
"expires_in": 3600,
"expiration": 1524167011,
"refresh_token": "jy4gl91BQ"
}"""
responses.add(responses.POST, url=iam_url, body=response, status=200)
token_manager = IAMTokenManager("iam_apikey", url=iam_url, client_id=None, client_secret='bar')
token_manager.request_token()
assert len(responses.calls) == 1
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers.get('Authorization') is None
assert responses.calls[0].response.text == response
assert 'scope' not in responses.calls[0].response.request.body
@responses.activate
def test_request_token_auth_in_setter():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = """{
"access_token": "oAeisG8yqPY7sFR_x66Z15",
"token_type": "Bearer",
"expires_in": 3600,
"expiration": 1524167011,
"refresh_token": "jy4gl91BQ"
}"""
default_auth_header = 'Basic Yng6Yng='
responses.add(responses.POST, url=iam_url, body=response, status=200)
token_manager = IAMTokenManager("iam_apikey")
token_manager.set_client_id_and_secret('foo', 'bar')
token_manager.request_token()
assert len(responses.calls) == 1
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers['Authorization'] != default_auth_header
assert responses.calls[0].response.text == response
assert 'scope' not in responses.calls[0].response.request.body
@responses.activate
def test_request_token_auth_in_setter_client_id_only():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = """{
"access_token": "oAeisG8yqPY7sFR_x66Z15",
"token_type": "Bearer",
"expires_in": 3600,
"expiration": 1524167011,
"refresh_token": "jy4gl91BQ"
}"""
responses.add(responses.POST, url=iam_url, body=response, status=200)
token_manager = IAMTokenManager("iam_apikey")
token_manager.set_client_id_and_secret('foo', None)
token_manager.request_token()
assert len(responses.calls) == 1
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers.get('Authorization') is None
assert responses.calls[0].response.text == response
assert 'scope' not in responses.calls[0].response.request.body
@responses.activate
def test_request_token_auth_in_setter_secret_only():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = """{
"access_token": "oAeisG8yqPY7sFR_x66Z15",
"token_type": "Bearer",
"expires_in": 3600,
"expiration": 1524167011,
"refresh_token": "jy4gl91BQ"
}"""
responses.add(responses.POST, url=iam_url, body=response, status=200)
token_manager = IAMTokenManager("iam_apikey")
token_manager.set_client_id_and_secret(None, 'bar')
token_manager.set_headers({'user': 'header'})
token_manager.request_token()
assert len(responses.calls) == 1
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers.get('Authorization') is None
assert responses.calls[0].response.text == response
assert 'scope' not in responses.calls[0].response.request.body
@responses.activate
def test_request_token_auth_in_setter_scope():
iam_url = "https://iam.cloud.ibm.com/identity/token"
response = """{
"access_token": "oAeisG8yqPY7sFR_x66Z15",
"token_type": "Bearer",
"expires_in": 3600,
"expiration": 1524167011,
"refresh_token": "jy4gl91BQ"
}"""
responses.add(responses.POST, url=iam_url, body=response, status=200)
token_manager = IAMTokenManager("iam_apikey")
token_manager.set_client_id_and_secret(None, 'bar')
token_manager.set_headers({'user': 'header'})
token_manager.set_scope('john snow')
token_manager.request_token()
assert len(responses.calls) == 1
assert responses.calls[0].request.url == iam_url
assert responses.calls[0].request.headers.get('Authorization') is None
assert responses.calls[0].response.text == response
assert 'scope=john+snow' in responses.calls[0].response.request.body
@responses.activate
def test_get_refresh_token():
iam_url = "https://iam.cloud.ibm.com/identity/token"
access_token_str = get_access_token()
response = """{
"access_token": "%s",
"token_type": "Bearer",
"expires_in": 3600,
"expiration": 1524167011,
"refresh_token": "jy4gl91BQ"
}""" % (
access_token_str
)
responses.add(responses.POST, url=iam_url, body=response, status=200)
token_manager = IAMTokenManager("iam_apikey")
token_manager.get_token()
assert len(responses.calls) == 2
assert token_manager.refresh_token == "jy4gl91BQ"
#
# In order to run the following integration test with a live IAM server:
#
# 1. Create file "iamtest.env" in the project root.
# It should look like this:
# IAMTEST1_AUTH_URL=<url> e.g. https://iam.cloud.ibm.com
# IAMTEST1_AUTH_TYPE=iam
# IAMTEST1_APIKEY=<apikey>
# IAMTEST2_AUTH_URL=<url> e.g. https://iam.test.cloud.ibm.com
# IAMTEST2_AUTH_TYPE=iam
# IAMTEST2_APIKEY=<apikey>
# IAMTEST2_CLIENT_ID=<client id>
# IAMTEST2_CLIENT_SECRET=<client secret>
#
# 2. Comment out the "@pytest.mark.skip" decorator below.
#
# 3. Run this command:
# python3 -m pytest -s test -k "test_iam_live_token_server"
# (or just run tests like normal and this test function will be invoked)
#
@pytest.mark.skip(reason="avoid integration test in automated builds")
def test_iam_live_token_server():
# Get two iam authenticators from the environment.
# "iamtest1" uses the production IAM token server
# "iamtest2" uses the staging IAM token server
os.environ['IBM_CREDENTIALS_FILE'] = "iamtest.env"
# Test "iamtest1" service
auth1 = get_authenticator_from_environment("iamtest1")
assert auth1 is not None
assert auth1.token_manager is not None
assert auth1.token_manager.url is not None
request = {'method': "GET"}
request["url"] = ""
request["headers"] = {}
assert auth1.token_manager.refresh_token is None
auth1.authenticate(request)
assert request.get("headers") is not None
assert request["headers"].get("Authorization") is not None
assert "Bearer " in request["headers"].get("Authorization")
# Test "iamtest2" service
auth2 = get_authenticator_from_environment("iamtest2")
assert auth2 is not None
assert auth2.token_manager is not None
assert auth2.token_manager.url is not None
request = {'method': "GET"}
request["url"] = ""
request["headers"] = {}
assert auth2.token_manager.refresh_token is None
auth2.authenticate(request)
assert auth2.token_manager.refresh_token is not None
assert request.get("headers") is not None
assert request["headers"].get("Authorization") is not None
assert "Bearer " in request["headers"].get("Authorization")
# print('Refresh token: ', auth2.token_manager.refresh_token)