/ botocore / retryhandler.py

# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
# http://aws.amazon.com/apache2.0/
# or in the "license" file accompanying this file. This file is
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

import random
import functools
import logging
from binascii import crc32

from botocore.vendored.requests import ConnectionError, Timeout
from botocore.vendored.requests.packages.urllib3.exceptions import ClosedPoolError

from botocore.exceptions import ChecksumError, EndpointConnectionError

logger = logging.getLogger(__name__)
# The only supported error for now is GENERAL_CONNECTION_ERROR
# which maps to requests generic ConnectionError.  If we're able
# to get more specific exceptions from requests we can update
# this mapping with more specific exceptions.
        ConnectionError, ClosedPoolError, Timeout,

def delay_exponential(base, growth_factor, attempts):
    """Calculate time to sleep based on exponential function.

    The format is::

        base * growth_factor ^ (attempts - 1)

    If ``base`` is set to 'rand' then a random number between
    0 and 1 will be used as the base.
    Base must be greater than 0, otherwise a ValueError will be

    if base == 'rand':
        base = random.random()
    elif base <= 0:
        raise ValueError("The 'base' param must be greater than 0, "
                         "got: %s" % base)
    time_to_sleep = base * (growth_factor ** (attempts - 1))
    return time_to_sleep

def create_exponential_delay_function(base, growth_factor):
    """Create an exponential delay function based on the attempts.

    This is used so that you only have to pass it the attempts
    parameter to calculate the delay.

    return functools.partial(
        delay_exponential, base=base, growth_factor=growth_factor)

def create_retry_handler(config, operation_name=None):
    checker = create_checker_from_retry_config(
        config, operation_name=operation_name)
    action = create_retry_action_from_config(
        config, operation_name=operation_name)
    return RetryHandler(checker=checker, action=action)

def create_retry_action_from_config(config, operation_name=None):
    # The spec has the possibility of supporting per policy
    # actions, but right now, we assume this comes from the
    # default section, which means that delay functions apply
    # for every policy in the retry config (per service).
    delay_config = config['__default__']['delay']
    if delay_config['type'] == 'exponential':
        return create_exponential_delay_function(

def create_checker_from_retry_config(config, operation_name=None):
    checkers = []
    max_attempts = None
    retryable_exceptions = []
    if '__default__' in config:
        policies = config['__default__'].get('policies', [])
        max_attempts = config['__default__']['max_attempts']
        for key in policies:
            current_config = policies[key]
            retry_exception = _extract_retryable_exception(current_config)
            if retry_exception is not None:
    if operation_name is not None and config.get(operation_name) is not None:
        operation_policies = config[operation_name]['policies']
        for key in operation_policies:
            retry_exception = _extract_retryable_exception(
            if retry_exception is not None:
    if len(checkers) == 1:
        # Don't need to use a MultiChecker
        return MaxAttemptsDecorator(checkers[0], max_attempts=max_attempts)
        multi_checker = MultiChecker(checkers)
        return MaxAttemptsDecorator(
            multi_checker, max_attempts=max_attempts,

def _create_single_checker(config):
    if 'response' in config['applies_when']:
        return _create_single_response_checker(
    elif 'socket_errors' in config['applies_when']:
        return ExceptionRaiser()

def _create_single_response_checker(response):
    if 'service_error_code' in response:
        checker = ServiceErrorCodeChecker(
    elif 'http_status_code' in response:
        checker = HTTPStatusCodeChecker(
    elif 'crc32body' in response:
        checker = CRC32Checker(header=response['crc32body'])
        # TODO: send a signal.
        raise ValueError("Unknown retry policy: %s" % config)
    return checker

def _extract_retryable_exception(config):
    applies_when = config['applies_when']
    if 'crc32body' in applies_when.get('response', {}):
        return [ChecksumError]
    elif 'socket_errors' in applies_when:
        exceptions = []
        for name in applies_when['socket_errors']:
        return exceptions

class RetryHandler(object):
    """Retry handler.

    The retry handler takes two params, ``checker`` object
    and an ``action`` object.

    The ``checker`` object must be a callable object and based on a response
    and an attempt number, determines whether or not sufficient criteria for
    a retry has been met.  If this is the case then the ``action`` object
    (which also is a callable) determines what needs to happen in the event
    of a retry.


    def __init__(self, checker, action):
        self._checker = checker
        self._action = action

    def __call__(self, attempts, response, caught_exception, **kwargs):
        """Handler for a retry.

        Intended to be hooked up to an event handler (hence the **kwargs),
        this will process retries appropriately.

        if self._checker(attempts, response, caught_exception):
            result = self._action(attempts=attempts)
            logger.debug("Retry needed, action of: %s", result)
            return result
        logger.debug("No retry needed.")

class BaseChecker(object):
    """Base class for retry checkers.

    Each class is responsible for checking a single criteria that determines
    whether or not a retry should not happen.

    def __call__(self, attempt_number, response, caught_exception):
        """Determine if retry criteria matches.

        Note that either ``response`` is not None and ``caught_exception`` is
        None or ``response`` is None and ``caught_exception`` is not None.

        :type attempt_number: int
        :param attempt_number: The total number of times we've attempted
            to send the request.

        :param response: The HTTP response (if one was received).

        :type caught_exception: Exception
        :param caught_exception: Any exception that was caught while trying to
            send the HTTP response.

        :return: True, if the retry criteria matches (and therefore a retry
            should occur.  False if the criteria does not match.

        # The default implementation allows subclasses to not have to check
        # whether or not response is None or not.
        if response is not None:
            return self._check_response(attempt_number, response)
        elif caught_exception is not None:
            return self._check_caught_exception(
                attempt_number, caught_exception)
            raise ValueError("Both response and caught_exception are None.")

    def _check_response(self, attempt_number, response):

    def _check_caught_exception(self, attempt_number, caught_exception):

class MaxAttemptsDecorator(BaseChecker):
    """Allow retries up to a maximum number of attempts.

    This will pass through calls to the decorated retry checker, provided
    that the number of attempts does not exceed max_attempts.  It will
    also catch any retryable_exceptions passed in.  Once max_attempts has
    been exceeded, then False will be returned or the retryable_exceptions
    that was previously being caught will be raised.

    def __init__(self, checker, max_attempts, retryable_exceptions=None):
        self._checker = checker
        self._max_attempts = max_attempts
        self._retryable_exceptions = retryable_exceptions

    def __call__(self, attempt_number, response, caught_exception):
        should_retry = self._should_retry(attempt_number, response,
        if should_retry:
            if attempt_number >= self._max_attempts:
                # explicitly set MaxAttemptsReached
                if response is not None and 'ResponseMetadata' in response[1]:
                    response[1]['ResponseMetadata']['MaxAttemptsReached'] = True
                logger.debug("Reached the maximum number of retry "
                             "attempts: %s", attempt_number)
                return False
                return should_retry
            return False

    def _should_retry(self, attempt_number, response, caught_exception):
        if self._retryable_exceptions and \
                attempt_number < self._max_attempts:
                return self._checker(attempt_number, response, caught_exception)
            except self._retryable_exceptions as e:
                logger.debug("retry needed, retryable exception caught: %s",
                             e, exc_info=True)
                return True
            # If we've exceeded the max attempts we just let the exception
            # propogate if one has occurred.
            return self._checker(attempt_number, response, caught_exception)

class HTTPStatusCodeChecker(BaseChecker):
    def __init__(self, status_code):
        self._status_code = status_code

    def _check_response(self, attempt_number, response):
        if response[0].status_code == self._status_code:
                "retry needed: retryable HTTP status code received: %s",
            return True
            return False

class ServiceErrorCodeChecker(BaseChecker):
    def __init__(self, status_code, error_code):
        self._status_code = status_code
        self._error_code = error_code

    def _check_response(self, attempt_number, response):
        if response[0].status_code == self._status_code:
            actual_error_code = response[1].get('Error', {}).get('Code')
            if actual_error_code == self._error_code:
                    "retry needed: matching HTTP status and error code seen: "
                    "%s, %s", self._status_code, self._error_code)
                return True
        return False

class MultiChecker(BaseChecker):
    def __init__(self, checkers):
        self._checkers = checkers

    def __call__(self, attempt_number, response, caught_exception):
        for checker in self._checkers:
            checker_response = checker(attempt_number, response,
            if checker_response:
                return checker_response
        return False

class CRC32Checker(BaseChecker):
    def __init__(self, header):
        # The header where the expected crc32 is located.
        self._header_name = header

    def _check_response(self, attempt_number, response):
        http_response = response[0]
        expected_crc = http_response.headers.get(self._header_name)
        if expected_crc is None:
            logger.debug("crc32 check skipped, the %s header is not "
                         "in the http response.", self._header_name)
            actual_crc32 = crc32(response[0].content) & 0xffffffff
            if not actual_crc32 == int(expected_crc):
                    "retry needed: crc32 check failed, expected != actual: "
                    "%s != %s", int(expected_crc), actual_crc32)
                raise ChecksumError(checksum_type='crc32',

class ExceptionRaiser(BaseChecker):
