Repository URL to install this package:
|
Version:
0.6.0 ▾
|
# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from botocore.credentials import CredentialResolver, ReadOnlyCredentials
from botocore.session import Session
from s3transfer.exceptions import TransferNotDoneError
from s3transfer.utils import CallArgs
from tests import HAS_CRT, FileCreator, mock, requires_crt, unittest
if HAS_CRT:
import awscrt.s3
import s3transfer.crt
class CustomFutureException(Exception):
pass
@requires_crt
class TestBotocoreCRTRequestSerializer(unittest.TestCase):
def setUp(self):
self.region = 'us-west-2'
self.session = Session()
self.session.set_config_variable('region', self.region)
self.request_serializer = s3transfer.crt.BotocoreCRTRequestSerializer(
self.session
)
self.bucket = "test_bucket"
self.key = "test_key"
self.files = FileCreator()
self.filename = self.files.create_file('myfile', 'my content')
self.expected_path = "/" + self.bucket + "/" + self.key
self.expected_host = "s3.%s.amazonaws.com" % (self.region)
def tearDown(self):
self.files.remove_all()
def test_upload_request(self):
callargs = CallArgs(
bucket=self.bucket,
key=self.key,
fileobj=self.filename,
extra_args={},
subscribers=[],
)
coordinator = s3transfer.crt.CRTTransferCoordinator()
future = s3transfer.crt.CRTTransferFuture(
s3transfer.crt.CRTTransferMeta(call_args=callargs), coordinator
)
crt_request = self.request_serializer.serialize_http_request(
"put_object", future
)
self.assertEqual("PUT", crt_request.method)
self.assertEqual(self.expected_path, crt_request.path)
self.assertEqual(self.expected_host, crt_request.headers.get("host"))
self.assertIsNone(crt_request.headers.get("Authorization"))
def test_download_request(self):
callargs = CallArgs(
bucket=self.bucket,
key=self.key,
fileobj=self.filename,
extra_args={},
subscribers=[],
)
coordinator = s3transfer.crt.CRTTransferCoordinator()
future = s3transfer.crt.CRTTransferFuture(
s3transfer.crt.CRTTransferMeta(call_args=callargs), coordinator
)
crt_request = self.request_serializer.serialize_http_request(
"get_object", future
)
self.assertEqual("GET", crt_request.method)
self.assertEqual(self.expected_path, crt_request.path)
self.assertEqual(self.expected_host, crt_request.headers.get("host"))
self.assertIsNone(crt_request.headers.get("Authorization"))
def test_delete_request(self):
callargs = CallArgs(
bucket=self.bucket, key=self.key, extra_args={}, subscribers=[]
)
coordinator = s3transfer.crt.CRTTransferCoordinator()
future = s3transfer.crt.CRTTransferFuture(
s3transfer.crt.CRTTransferMeta(call_args=callargs), coordinator
)
crt_request = self.request_serializer.serialize_http_request(
"delete_object", future
)
self.assertEqual("DELETE", crt_request.method)
self.assertEqual(self.expected_path, crt_request.path)
self.assertEqual(self.expected_host, crt_request.headers.get("host"))
self.assertIsNone(crt_request.headers.get("Authorization"))
@requires_crt
class TestCRTCredentialProviderAdapter(unittest.TestCase):
def setUp(self):
self.botocore_credential_provider = mock.Mock(CredentialResolver)
self.access_key = "access_key"
self.secret_key = "secret_key"
self.token = "token"
self.botocore_credential_provider.load_credentials.return_value.get_frozen_credentials.return_value = ReadOnlyCredentials(
self.access_key, self.secret_key, self.token
)
def _call_adapter_and_check(self, credentails_provider_adapter):
credentials = credentails_provider_adapter()
self.assertEqual(credentials.access_key_id, self.access_key)
self.assertEqual(credentials.secret_access_key, self.secret_key)
self.assertEqual(credentials.session_token, self.token)
def test_fetch_crt_credentials_successfully(self):
credentails_provider_adapter = (
s3transfer.crt.CRTCredentialProviderAdapter(
self.botocore_credential_provider
)
)
self._call_adapter_and_check(credentails_provider_adapter)
def test_load_credentials_once(self):
credentails_provider_adapter = (
s3transfer.crt.CRTCredentialProviderAdapter(
self.botocore_credential_provider
)
)
called_times = 5
for i in range(called_times):
self._call_adapter_and_check(credentails_provider_adapter)
# Assert that the load_credentails of botocore credential provider
# will only be called once
self.assertEqual(
self.botocore_credential_provider.load_credentials.call_count, 1
)
@requires_crt
class TestCRTTransferFuture(unittest.TestCase):
def setUp(self):
self.mock_s3_request = mock.Mock(awscrt.s3.S3RequestType)
self.mock_crt_future = mock.Mock(awscrt.s3.Future)
self.mock_s3_request.finished_future = self.mock_crt_future
self.coordinator = s3transfer.crt.CRTTransferCoordinator()
self.coordinator.set_s3_request(self.mock_s3_request)
self.future = s3transfer.crt.CRTTransferFuture(
coordinator=self.coordinator
)
def test_set_exception(self):
self.future.set_exception(CustomFutureException())
with self.assertRaises(CustomFutureException):
self.future.result()
def test_set_exception_raises_error_when_not_done(self):
self.mock_crt_future.done.return_value = False
with self.assertRaises(TransferNotDoneError):
self.future.set_exception(CustomFutureException())
def test_set_exception_can_override_previous_exception(self):
self.future.set_exception(Exception())
self.future.set_exception(CustomFutureException())
with self.assertRaises(CustomFutureException):
self.future.result()