# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import datetime
import weakref
import json
import base64
import botocore
import botocore.auth
from botocore.compat import six, OrderedDict
from botocore.awsrequest import create_request_object, prepare_request_dict
from botocore.exceptions import UnknownSignatureVersionError
from botocore.exceptions import UnknownClientMethodError
from botocore.exceptions import UnsupportedSignatureVersionError
from botocore.utils import fix_s3_host, datetime2timestamp
class RequestSigner(object):
"""
An object to sign requests before they go out over the wire using
one of the authentication mechanisms defined in ``auth.py``. This
class fires two events scoped to a service and operation name:
* choose-signer: Allows overriding the auth signer name.
* before-sign: Allows mutating the request before signing.
Together these events allow for customization of the request
signing pipeline, including overrides, request path manipulation,
and disabling signing per operation.
:type service_name: string
:param service_name: Name of the service, e.g. ``S3``
:type region_name: string
:param region_name: Name of the service region, e.g. ``us-east-1``
:type signing_name: string
:param signing_name: Service signing name. This is usually the
same as the service name, but can differ. E.g.
``emr`` vs. ``elasticmapreduce``.
:type signature_version: string
:param signature_version: Signature name like ``v4``.
:type credentials: :py:class:`~botocore.credentials.Credentials`
:param credentials: User credentials with which to sign requests.
:type event_emitter: :py:class:`~botocore.hooks.BaseEventHooks`
:param event_emitter: Extension mechanism to fire events.
"""
def __init__(self, service_name, region_name, signing_name,
signature_version, credentials, event_emitter):
self._service_name = service_name
self._region_name = region_name
self._signing_name = signing_name
self._signature_version = signature_version
self._credentials = credentials
# We need weakref to prevent leaking memory in Python 2.6 on Linux 2.6
self._event_emitter = weakref.proxy(event_emitter)
@property
def region_name(self):
return self._region_name
@property
def signature_version(self):
return self._signature_version
@property
def signing_name(self):
return self._signing_name
def handler(self, operation_name=None, request=None, **kwargs):
# This is typically hooked up to the "request-created" event
# from a client's event emitter. When a new request is created
# this method is invoked to sign the request.
# Don't call this method directly.
return self.sign(operation_name, request)
def sign(self, operation_name, request, region_name=None,
signing_type='standard', expires_in=None):
"""Sign a request before it goes out over the wire.
:type operation_name: string
:param operation_name: The name of the current operation, e.g.
``ListBuckets``.
:type request: AWSRequest
:param request: The request object to be sent over the wire.
:type region_name: str
:param region_name: The region to sign the request for.
:type signing_type: str
:param signing_type: The type of signing to perform. This can be one of
three possible values:
* 'standard' - This should be used for most requests.
* 'presign-url' - This should be used when pre-signing a request.
* 'presign-post' - This should be used when pre-signing an S3 post.
:type expires_in: int
:param expires_in: The number of seconds the presigned url is valid
for. This parameter is only valid for signing type 'presign-url'.
"""
if region_name is None:
region_name = self._region_name
signature_version = self._choose_signer(operation_name, signing_type)
# Allow mutating request before signing
self._event_emitter.emit(
'before-sign.{0}.{1}'.format(self._service_name, operation_name),
request=request, signing_name=self._signing_name,
region_name=self._region_name,
signature_version=signature_version, request_signer=self)
if signature_version != botocore.UNSIGNED:
kwargs = {
'signing_name': self._signing_name,
'region_name': region_name,
'signature_version': signature_version
}
if expires_in is not None:
kwargs['expires'] = expires_in
try:
auth = self.get_auth_instance(**kwargs)
except UnknownSignatureVersionError as e:
if signing_type != 'standard':
raise UnsupportedSignatureVersionError(
signature_version=signature_version)
else:
raise e
auth.add_auth(request)
def _choose_signer(self, operation_name, signing_type):
"""
Allow setting the signature version via the choose-signer event.
A value of `botocore.UNSIGNED` means no signing will be performed.
:param operation_name: The operation to sign.
:param signing_type: The type of signing that the signer is to be used
for.
:return: The signature version to sign with.
"""
signing_type_suffix_map = {
'presign-post': '-presign-post',
'presign-url': '-query'
}
suffix = signing_type_suffix_map.get(signing_type, '')
signature_version = self._signature_version
if signature_version is not botocore.UNSIGNED and not \
signature_version.endswith(suffix):
signature_version += suffix
handler, response = self._event_emitter.emit_until_response(
'choose-signer.{0}.{1}'.format(self._service_name, operation_name),
signing_name=self._signing_name, region_name=self._region_name,
signature_version=signature_version)
if response is not None:
signature_version = response
# The suffix needs to be checked again in case we get an improper
# signature version from choose-signer.
if signature_version is not botocore.UNSIGNED and not \
signature_version.endswith(suffix):
signature_version += suffix
return signature_version
def get_auth_instance(self, signing_name, region_name,
signature_version=None, **kwargs):
"""
Get an auth instance which can be used to sign a request
using the given signature version.
:type signing_name: string
:param signing_name: Service signing name. This is usually the
same as the service name, but can differ. E.g.
``emr`` vs. ``elasticmapreduce``.
:type region_name: string
:param region_name: Name of the service region, e.g. ``us-east-1``
:type signature_version: string
:param signature_version: Signature name like ``v4``.
:rtype: :py:class:`~botocore.auth.BaseSigner`
:return: Auth instance to sign a request.
"""
if signature_version is None:
signature_version = self._signature_version
cls = botocore.auth.AUTH_TYPE_MAPS.get(signature_version)
if cls is None:
raise UnknownSignatureVersionError(
signature_version=signature_version)
# If there's no credentials provided (i.e credentials is None),
# then we'll pass a value of "None" over to the auth classes,
# which already handle the cases where no credentials have
# been provided.
frozen_credentials = None
if self._credentials is not None:
frozen_credentials = self._credentials.get_frozen_credentials()
kwargs['credentials'] = frozen_credentials
if cls.REQUIRES_REGION:
if self._region_name is None:
raise botocore.exceptions.NoRegionError()
kwargs['region_name'] = region_name
kwargs['service_name'] = signing_name
auth = cls(**kwargs)
return auth
# Alias get_auth for backwards compatibility.
get_auth = get_auth_instance
def generate_presigned_url(self, request_dict, operation_name,
expires_in=3600, region_name=None):
"""Generates a presigned url
:type request_dict: dict
:param request_dict: The prepared request dictionary returned by
``botocore.awsrequest.prepare_request_dict()``
:type operation_name: str
:param operation_name: The operation being signed.
:type expires_in: int
:param expires_in: The number of seconds the presigned url is valid
for. By default it expires in an hour (3600 seconds)
:type region_name: string
:param region_name: The region name to sign the presigned url.
:returns: The presigned url
"""
request = create_request_object(request_dict)
self.sign(operation_name, request, region_name,
'presign-url', expires_in)
request.prepare()
return request.url
class CloudFrontSigner(object):
'''A signer to create a signed CloudFront URL.
First you create a cloudfront signer based on a normalized RSA signer::
import rsa
def rsa_signer(message):
private_key = open('private_key.pem', 'r').read()
return rsa.sign(
message,
rsa.PrivateKey.load_pkcs1(private_key.encode('utf8')),
'SHA-1') # CloudFront requires SHA-1 hash
cf_signer = CloudFrontSigner(key_id, rsa_signer)
To sign with a canned policy::
signed_url = cf_signer.generate_signed_url(
url, date_less_than=datetime(2015, 12, 1))
To sign with a custom policy::
signed_url = cf_signer.generate_signed_url(url, policy=my_policy)
'''
def __init__(self, key_id, rsa_signer):
"""Create a CloudFrontSigner.
:type key_id: str
:param key_id: The CloudFront Key Pair ID
:type rsa_signer: callable
:param rsa_signer: An RSA signer.
Its only input parameter will be the message to be signed,
and its output will be the signed content as a binary string.
The hash algorithm needed by CloudFront is SHA-1.
"""
self.key_id = key_id
self.rsa_signer = rsa_signer
def generate_presigned_url(self, url, date_less_than=None, policy=None):
"""Creates a signed CloudFront URL based on given parameters.
:type url: str
:param url: The URL of the protected object
:type date_less_than: datetime
:param date_less_than: The URL will expire after that date and time
:type policy: str
:param policy: The custom policy, possibly built by self.build_policy()
:rtype: str
:return: The signed URL.
"""
if (date_less_than is not None and policy is not None or
date_less_than is None and policy is None):
e = 'Need to provide either date_less_than or policy, but not both'
raise ValueError(e)
if date_less_than is not None:
# We still need to build a canned policy for signing purpose
policy = self.build_policy(url, date_less_than)
if isinstance(policy, six.text_type):
policy = policy.encode('utf8')
if date_less_than is not None:
params = ['Expires=%s' % int(datetime2timestamp(date_less_than))]
else:
params = ['Policy=%s' % self._url_b64encode(policy).decode('utf8')]
signature = self.rsa_signer(policy)
params.extend([
'Signature=%s' % self._url_b64encode(signature).decode('utf8'),
'Key-Pair-Id=%s' % self.key_id,
])
return self._build_url(url, params)
def _build_url(self, base_url, extra_params):
separator = '&' if '?' in base_url else '?'
return base_url + separator + '&'.join(extra_params)
def build_policy(self, resource, date_less_than,
date_greater_than=None, ip_address=None):
"""A helper to build policy.
:type resource: str
:param resource: The URL or the stream filename of the protected object
:type date_less_than: datetime
:param date_less_than: The URL will expire after the time has passed
Loading ...