Repository URL to install this package:
| 
          
        
        Version: 
           
    
          4.2.19  ▾
        
   | 
# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import time
import threading
class RequestExceededException(Exception):
    def __init__(self, requested_amt, retry_time):
        """Error when requested amount exceeds what is allowed
        The request that raised this error should be retried after waiting
        the time specified by ``retry_time``.
        :type requested_amt: int
        :param requested_amt: The originally requested byte amount
        :type retry_time: float
        :param retry_time: The length in time to wait to retry for the
            requested amount
        """
        self.requested_amt = requested_amt
        self.retry_time = retry_time
        msg = (
            'Request amount %s exceeded the amount available. Retry in %s' % (
                requested_amt, retry_time)
        )
        super(RequestExceededException, self).__init__(msg)
class RequestToken(object):
    """A token to pass as an identifier when consuming from the LeakyBucket"""
    pass
class TimeUtils(object):
    def time(self):
        """Get the current time back
        :rtype: float
        :returns: The current time in seconds
        """
        return time.time()
    def sleep(self, value):
        """Sleep for a designated time
        :type value: float
        :param value: The time to sleep for in seconds
        """
        return time.sleep(value)
class BandwidthLimiter(object):
    def __init__(self, leaky_bucket, time_utils=None):
        """Limits bandwidth for shared S3 transfers
        :type leaky_bucket: LeakyBucket
        :param leaky_bucket: The leaky bucket to use limit bandwidth
        :type time_utils: TimeUtils
        :param time_utils: Time utility to use for interacting with time.
        """
        self._leaky_bucket = leaky_bucket
        self._time_utils = time_utils
        if time_utils is None:
            self._time_utils = TimeUtils()
    def get_bandwith_limited_stream(self, fileobj, transfer_coordinator,
                                    enabled=True):
        """Wraps a fileobj in a bandwidth limited stream wrapper
        :type fileobj: file-like obj
        :param fileobj: The file-like obj to wrap
        :type transfer_coordinator: s3transfer.futures.TransferCoordinator
        param transfer_coordinator: The coordinator for the general transfer
            that the wrapped stream is a part of
        :type enabled: boolean
        :param enabled: Whether bandwidth limiting should be enabled to start
        """
        stream = BandwidthLimitedStream(
            fileobj, self._leaky_bucket, transfer_coordinator,
            self._time_utils)
        if not enabled:
            stream.disable_bandwidth_limiting()
        return stream
class BandwidthLimitedStream(object):
    def __init__(self, fileobj, leaky_bucket, transfer_coordinator,
                 time_utils=None, bytes_threshold=256 * 1024):
        """Limits bandwidth for reads on a wrapped stream
        :type fileobj: file-like object
        :param fileobj: The file like object to wrap
        :type leaky_bucket: LeakyBucket
        :param leaky_bucket: The leaky bucket to use to throttle reads on
            the stream
        :type transfer_coordinator: s3transfer.futures.TransferCoordinator
        param transfer_coordinator: The coordinator for the general transfer
            that the wrapped stream is a part of
        :type time_utils: TimeUtils
        :param time_utils: The time utility to use for interacting with time
        """
        self._fileobj = fileobj
        self._leaky_bucket = leaky_bucket
        self._transfer_coordinator = transfer_coordinator
        self._time_utils = time_utils
        if time_utils is None:
            self._time_utils = TimeUtils()
        self._bandwidth_limiting_enabled = True
        self._request_token = RequestToken()
        self._bytes_seen = 0
        self._bytes_threshold = bytes_threshold
    def enable_bandwidth_limiting(self):
        """Enable bandwidth limiting on reads to the stream"""
        self._bandwidth_limiting_enabled = True
    def disable_bandwidth_limiting(self):
        """Disable bandwidth limiting on reads to the stream"""
        self._bandwidth_limiting_enabled = False
    def read(self, amount):
        """Read a specified amount
        Reads will only be throttled if bandwidth limiting is enabled.
        """
        if not self._bandwidth_limiting_enabled:
            return self._fileobj.read(amount)
        # We do not want to be calling consume on every read as the read
        # amounts can be small causing the lock of the leaky bucket to
        # introduce noticeable overhead. So instead we keep track of
        # how many bytes we have seen and only call consume once we pass a
        # certain threshold.
        self._bytes_seen += amount
        if self._bytes_seen < self._bytes_threshold:
            return self._fileobj.read(amount)
        self._consume_through_leaky_bucket()
        return self._fileobj.read(amount)
    def _consume_through_leaky_bucket(self):
        # NOTE: If the read amonut on the stream are high, it will result
        # in large bursty behavior as there is not an interface for partial
        # reads. However given the read's on this abstraction are at most 256KB
        # (via downloads), it reduces the burstiness to be small KB bursts at
        # worst.
        while not self._transfer_coordinator.exception:
            try:
                self._leaky_bucket.consume(
                    self._bytes_seen, self._request_token)
                self._bytes_seen = 0
                return
            except RequestExceededException as e:
                self._time_utils.sleep(e.retry_time)
        else:
            raise self._transfer_coordinator.exception
    def signal_transferring(self):
        """Signal that data being read is being transferred to S3"""
        self.enable_bandwidth_limiting()
    def signal_not_transferring(self):
        """Signal that data being read is not being transferred to S3"""
        self.disable_bandwidth_limiting()
    def seek(self, where):
        self._fileobj.seek(where)
    def tell(self):
        return self._fileobj.tell()
    def close(self):
        if self._bandwidth_limiting_enabled and self._bytes_seen:
            # This handles the case where the file is small enough to never
            # trigger the threshold and thus is never subjugated to the
            # leaky bucket on read(). This specifically happens for small
            # uploads. So instead to account for those bytes, have
            # it go through the leaky bucket when the file gets closed.
            self._consume_through_leaky_bucket()
        self._fileobj.close()
    def __enter__(self):
        return self
    def __exit__(self, *args, **kwargs):
        self.close()
class LeakyBucket(object):
    def __init__(self, max_rate, time_utils=None, rate_tracker=None,
                 consumption_scheduler=None):
        """A leaky bucket abstraction to limit bandwidth consumption
        :type rate: int
        :type rate: The maximum rate to allow. This rate is in terms of
            bytes per second.
        :type time_utils: TimeUtils
        :param time_utils: The time utility to use for interacting with time
        :type rate_tracker: BandwidthRateTracker
        :param rate_tracker: Tracks bandwidth consumption
        :type consumption_scheduler: ConsumptionScheduler
        :param consumption_scheduler: Schedules consumption retries when
            necessary
        """
        self._max_rate = float(max_rate)
        self._time_utils = time_utils
        if time_utils is None:
            self._time_utils = TimeUtils()
        self._lock = threading.Lock()
        self._rate_tracker = rate_tracker
        if rate_tracker is None:
            self._rate_tracker = BandwidthRateTracker()
        self._consumption_scheduler = consumption_scheduler
        if consumption_scheduler is None:
            self._consumption_scheduler = ConsumptionScheduler()
    def consume(self, amt, request_token):
        """Consume an a requested amount
        :type amt: int
        :param amt: The amount of bytes to request to consume
        :type request_token: RequestToken
        :param request_token: The token associated to the consumption
            request that is used to identify the request. So if a
            RequestExceededException is raised the token should be used
            in subsequent retry consume() request.
        :raises RequestExceededException: If the consumption amount would
            exceed the maximum allocated bandwidth
        :rtype: int
        :returns: The amount consumed
        """
        with self._lock:
            time_now = self._time_utils.time()
            if self._consumption_scheduler.is_scheduled(request_token):
                return self._release_requested_amt_for_scheduled_request(
                    amt, request_token, time_now)
            elif self._projected_to_exceed_max_rate(amt, time_now):
                self._raise_request_exceeded_exception(
                    amt, request_token, time_now)
            else:
                return self._release_requested_amt(amt, time_now)
    def _projected_to_exceed_max_rate(self, amt, time_now):
        projected_rate = self._rate_tracker.get_projected_rate(amt, time_now)
        return projected_rate > self._max_rate
    def _release_requested_amt_for_scheduled_request(self, amt, request_token,
                                                     time_now):
        self._consumption_scheduler.process_scheduled_consumption(
            request_token)
        return self._release_requested_amt(amt, time_now)
    def _raise_request_exceeded_exception(self, amt, request_token, time_now):
        allocated_time = amt/float(self._max_rate)
        retry_time = self._consumption_scheduler.schedule_consumption(
            amt, request_token, allocated_time)
        raise RequestExceededException(
            requested_amt=amt, retry_time=retry_time)
    def _release_requested_amt(self, amt, time_now):
        self._rate_tracker.record_consumption_rate(amt, time_now)
        return amt
class ConsumptionScheduler(object):
    def __init__(self):
        """Schedules when to consume a desired amount"""
        self._tokens_to_scheduled_consumption = {}
        self._total_wait = 0
    def is_scheduled(self, token):
        """Indicates if a consumption request has been scheduled
        :type token: RequestToken
        :param token: The token associated to the consumption
            request that is used to identify the request.
        """
        return token in self._tokens_to_scheduled_consumption
    def schedule_consumption(self, amt, token, time_to_consume):
        """Schedules a wait time to be able to consume an amount
        :type amt: int
        :param amt: The amount of bytes scheduled to be consumed
        :type token: RequestToken
        :param token: The token associated to the consumption
            request that is used to identify the request.
        :type time_to_consume: float
        :param time_to_consume: The desired time it should take for that
            specific request amount to be consumed in regardless of previously
            scheduled consumption requests
        :rtype: float
        :returns: The amount of time to wait for the specific request before
            actually consuming the specified amount.
        """
        self._total_wait += time_to_consume
        self._tokens_to_scheduled_consumption[token] = {
            'wait_duration': self._total_wait,
            'time_to_consume': time_to_consume,
        }
        return self._total_wait
    def process_scheduled_consumption(self, token):
        """Processes a scheduled consumption request that has completed
        :type token: RequestToken
        :param token: The token associated to the consumption
            request that is used to identify the request.
        """
        scheduled_retry = self._tokens_to_scheduled_consumption.pop(token)
        self._total_wait = max(
            self._total_wait - scheduled_retry['time_to_consume'], 0)
class BandwidthRateTracker(object):
    def __init__(self, alpha=0.8):
        """Tracks the rate of bandwidth consumption
        :type a: float
        :param a: The constant to use in calculating the exponentional moving
            average of the bandwidth rate. Specifically it is used in the
            following calculation:
            current_rate = alpha * new_rate + (1 - alpha) * current_rate
            This value of this constant should be between 0 and 1.
        """
        self._alpha = alpha
        self._last_time = None
        self._current_rate = None
    @property
    def current_rate(self):
        """The current transfer rate
        :rtype: float
        :returns: The current tracked transfer rate
        """
        if self._last_time is None:
            return 0.0
        return self._current_rate
    def get_projected_rate(self, amt, time_at_consumption):
        """Get the projected rate using a provided amount and time
        :type amt: int
        :param amt: The proposed amount to consume
        :type time_at_consumption: float
        :param time_at_consumption: The proposed time to consume at
        :rtype: float
        :returns: The consumption rate if that amt and time were consumed
        """
        if self._last_time is None:
            return 0.0
        return self._calculate_exponential_moving_average_rate(
            amt, time_at_consumption)
    def record_consumption_rate(self, amt, time_at_consumption):
        """Record the consumption rate based off amount and time point
        :type amt: int
        :param amt: The amount that got consumed
        :type time_at_consumption: float
        :param time_at_consumption: The time at which the amount was consumed
        """
        if self._last_time is None:
            self._last_time = time_at_consumption
            self._current_rate = 0.0
            return
        self._current_rate = self._calculate_exponential_moving_average_rate(
            amt, time_at_consumption)
        self._last_time = time_at_consumption
    def _calculate_rate(self, amt, time_at_consumption):
        time_delta = time_at_consumption - self._last_time
        if time_delta <= 0:
            # While it is really unlikley to see this in an actual transfer,
            # we do not want to be returning back a negative rate or try to
            # divide the amount by zero. So instead return back an infinite
            # rate as the time delta is infinitesimally small.
            return float('inf')
        return amt / (time_delta)
    def _calculate_exponential_moving_average_rate(self, amt,
                                                   time_at_consumption):
        new_rate = self._calculate_rate(amt, time_at_consumption)
        return self._alpha * new_rate + (1 - self._alpha) * self._current_rate