Repository URL to install this package:
|
Version:
2.4.0 ▾
|
from botocore.retries.standard import (
DEFAULT_MAX_ATTEMPTS,
ExponentialBackoff,
MaxAttemptsChecker,
ModeledRetryableChecker,
OrRetryChecker,
RetryEventAdapter,
RetryHandler,
RetryPolicy,
RetryQuotaChecker,
StandardRetryConditions,
ThrottledRetryableChecker,
TransientRetryableChecker,
logger,
quota,
special,
)
from .._helpers import async_any, resolve_awaitable
from .special import AioRetryDDBChecksumError
def register_retry_handler(client, max_attempts=DEFAULT_MAX_ATTEMPTS):
retry_quota = RetryQuotaChecker(quota.RetryQuota())
service_id = client.meta.service_model.service_id
service_event_name = service_id.hyphenize()
client.meta.events.register(
f'after-call.{service_event_name}', retry_quota.release_retry_quota
)
handler = AioRetryHandler(
retry_policy=AioRetryPolicy(
retry_checker=AioStandardRetryConditions(
max_attempts=max_attempts
),
retry_backoff=ExponentialBackoff(),
),
retry_event_adapter=RetryEventAdapter(),
retry_quota=retry_quota,
)
unique_id = 'retry-config-%s' % service_event_name
client.meta.events.register(
'needs-retry.%s' % service_event_name,
handler.needs_retry,
unique_id=unique_id,
)
return handler
class AioRetryHandler(RetryHandler):
async def needs_retry(self, **kwargs):
"""Connect as a handler to the needs-retry event."""
retry_delay = None
context = self._retry_event_adapter.create_retry_context(**kwargs)
if await self._retry_policy.should_retry(context):
# Before we can retry we need to ensure we have sufficient
# capacity in our retry quota.
if self._retry_quota.acquire_retry_quota(context):
retry_delay = self._retry_policy.compute_retry_delay(context)
logger.debug(
"Retry needed, retrying request after delay of: %s",
retry_delay,
)
else:
logger.debug(
"Retry needed but retry quota reached, "
"not retrying request."
)
else:
logger.debug("Not retrying request.")
self._retry_event_adapter.adapt_retry_response_from_context(context)
return retry_delay
class AioRetryPolicy(RetryPolicy):
async def should_retry(self, context):
return await resolve_awaitable(
self._retry_checker.is_retryable(context)
)
class AioStandardRetryConditions(StandardRetryConditions):
def __init__(
self, max_attempts=DEFAULT_MAX_ATTEMPTS
): # noqa: E501, lgtm [py/missing-call-to-init]
# Note: This class is for convenience so you can have the
# standard retry condition in a single class.
self._max_attempts_checker = MaxAttemptsChecker(max_attempts)
self._additional_checkers = AioOrRetryChecker(
[
TransientRetryableChecker(),
ThrottledRetryableChecker(),
ModeledRetryableChecker(),
AioOrRetryChecker(
[
special.RetryIDPCommunicationError(),
AioRetryDDBChecksumError(),
]
),
]
)
async def is_retryable(self, context):
return self._max_attempts_checker.is_retryable(
context
) and await resolve_awaitable(
self._additional_checkers.is_retryable(context)
)
class AioOrRetryChecker(OrRetryChecker):
async def is_retryable(self, context):
return await async_any(
checker.is_retryable(context) for checker in self._checkers
)