Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# Copyright 2017, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Base class for concurrency policy."""

from __future__ import absolute_import, division

import abc
import collections
import copy
import logging
import random
import time

from google.api_core import exceptions
import six

from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.subscriber import _consumer
from google.cloud.pubsub_v1.subscriber._protocol import requests
from google.cloud.pubsub_v1.subscriber._protocol import histogram


_LOGGER = logging.getLogger(__name__)

# Namedtuples for management requests. Used by the Message class to communicate
# items of work back to the policy.
AckRequest = requests.AckRequest
DropRequest = requests.DropRequest
LeaseRequest = requests.LeaseRequest
ModAckRequest = requests.ModAckRequest
NackRequest = requests.NackRequest

_LeasedMessage = collections.namedtuple(
    '_LeasedMessage',
    ['added_time', 'size'])


@six.add_metaclass(abc.ABCMeta)
class BasePolicy(object):
    """Abstract class defining a subscription policy.

    Although the :class:`~.pubsub_v1.subscriber.policy.thread.Policy` class,
    based on :class:`threading.Thread`, is fine for most cases,
    advanced users may need to implement something based on a different
    concurrency model.

    This class defines the interface for the policy implementation;
    subclasses may be passed as the ``policy_class`` argument to
    :class:`~.pubsub_v1.client.SubscriberClient`.

    Args:
        client (google.cloud.pubsub_v1.subscriber.client.Client): The
            subscriber client used to create this instance.
        subscription (str): The name of the subscription. The canonical
            format for this is
            ``projects/{project}/subscriptions/{subscription}``.
        flow_control (google.cloud.pubsub_v1.types.FlowControl): The flow
            control settings.
        histogram_data (dict): Optional: A structure to store the histogram
            data for predicting appropriate ack times. If set, this should
            be a dictionary-like object.

            .. note::
                Additionally, the histogram relies on the assumption
                that the dictionary will properly sort keys provided
                that all keys are positive integers. If you are sending
                your own dictionary class, ensure this assumption holds
                or you will get strange behavior.
    """
    _RETRYABLE_STREAM_ERRORS = (
        exceptions.DeadlineExceeded,
        exceptions.ServiceUnavailable,
        exceptions.InternalServerError,
        exceptions.Unknown,
        exceptions.GatewayTimeout,
    )

    def __init__(self, client, subscription,
                 flow_control=types.FlowControl(), histogram_data=None):
        self._client = client
        self._subscription = subscription
        self._consumer = _consumer.Consumer()
        self._ack_deadline = 10
        self._last_histogram_size = 0
        self._future = None
        self.flow_control = flow_control
        self.histogram = histogram.Histogram(data=histogram_data)
        """.Histogram: the histogram tracking ack latency."""
        self.leased_messages = {}
        """dict[str, float]: A mapping of ack IDs to the local time when the
            ack ID was initially leased in seconds since the epoch."""

        # These are for internal flow control tracking.
        # They should not need to be used by subclasses.
        self._bytes = 0
        self._ack_on_resume = set()

    @property
    def ack_deadline(self):
        """Return the appropriate ack deadline.

        This method is "sticky". It will only perform the computations to
        check on the right ack deadline if the histogram has gained a
        significant amount of new information.

        Returns:
            int: The correct ack deadline.
        """
        target = min([
            self._last_histogram_size * 2,
            self._last_histogram_size + 100,
        ])
        if len(self.histogram) > target:
            self._ack_deadline = self.histogram.percentile(percent=99)
        return self._ack_deadline

    @property
    def future(self):
        """Return the Future in use, if any.

        Returns:
            google.cloud.pubsub_v1.subscriber.futures.Future: A Future
            conforming to the :class:`~concurrent.futures.Future` interface.
        """
        return self._future

    @property
    def subscription(self):
        """Return the subscription.

        Returns:
            str: The subscription
        """
        return self._subscription

    @property
    def _load(self):
        """Return the current load.

        The load is represented as a float, where 1.0 represents having
        hit one of the flow control limits, and values between 0.0 and 1.0
        represent how close we are to them. (0.5 means we have exactly half
        of what the flow control setting allows, for example.)

        There are (currently) two flow control settings; this property
        computes how close the subscriber is to each of them, and returns
        whichever value is higher. (It does not matter that we have lots of
        running room on setting A if setting B is over.)

        Returns:
            float: The load value.
        """
        return max([
            len(self.leased_messages) / self.flow_control.max_messages,
            self._bytes / self.flow_control.max_bytes,
            self._consumer.pending_requests / self.flow_control.max_requests
        ])

    def _maybe_resume_consumer(self):
        """Check the current load and resume the consumer if needed."""
        # If we have been paused by flow control, check and see if we are
        # back within our limits.
        #
        # In order to not thrash too much, require us to have passed below
        # the resume threshold (80% by default) of each flow control setting
        # before restarting.
        if not self._consumer.paused:
            return

        if self._load < self.flow_control.resume_threshold:
            self._consumer.resume()
        else:
            _LOGGER.debug('Did not resume, current load is %s', self._load)

    def ack(self, items):
        """Acknowledge the given messages.

        Args:
            items(Sequence[AckRequest]): The items to acknowledge.
        """
        # If we got timing information, add it to the histogram.
        for item in items:
            time_to_ack = item.time_to_ack
            if time_to_ack is not None:
                self.histogram.add(int(time_to_ack))

        ack_ids = [item.ack_id for item in items]
        if self._consumer.active:
            # Send the request to ack the message.
            request = types.StreamingPullRequest(ack_ids=ack_ids)
            self._consumer.send_request(request)
        else:
            # If the consumer is inactive, then queue the ack_ids here; it
            # will be acked as part of the initial request when the consumer
            # is started again.
            self._ack_on_resume.update(ack_ids)

        # Remove the message from lease management.
        self.drop(items)

    def call_rpc(self, request_generator):
        """Invoke the Pub/Sub streaming pull RPC.

        Args:
            request_generator (Generator): A generator that yields requests,
                and blocks if there are no outstanding requests (until such
                time as there are).

        Returns:
            Iterable[~google.cloud.pubsub_v1.types.StreamingPullResponse]: An
            iterable of pull responses.
        """
        return self._client.api.streaming_pull(request_generator)

    def drop(self, items):
        """Remove the given messages from lease management.

        Args:
            items(Sequence[DropRequest]): The items to drop.
        """
        # Remove the ack ID from lease management, and decrement the
        # byte counter.
        for item in items:
            if self.leased_messages.pop(item.ack_id, None) is not None:
                self._bytes -= item.byte_size
            else:
                _LOGGER.debug('Item %s was not managed.', item.ack_id)

        if self._bytes < 0:
            _LOGGER.debug(
                'Bytes was unexpectedly negative: %d', self._bytes)
            self._bytes = 0

        self._maybe_resume_consumer()

    def get_initial_request(self):
        """Return the initial request.

        This defines the initial request that must always be sent to Pub/Sub
        immediately upon opening the subscription.

        Returns:
            google.cloud.pubsub_v1.types.StreamingPullRequest: A request
            suitable for being the first request on the stream (and not
            suitable for any other purpose).
        """
        # Any ack IDs that are under lease management and not being acked
        # need to have their deadline extended immediately.
        lease_ids = set(self.leased_messages.keys())
        # Exclude any IDs that we're about to ack.
        lease_ids = lease_ids.difference(self._ack_on_resume)

        # Put the request together.
        request = types.StreamingPullRequest(
            ack_ids=list(self._ack_on_resume),
            modify_deadline_ack_ids=list(lease_ids),
            modify_deadline_seconds=[self.ack_deadline] * len(lease_ids),
            stream_ack_deadline_seconds=self.histogram.percentile(99),
            subscription=self.subscription,
        )

        # Clear the ack_ids set.
        self._ack_on_resume.clear()

        # Return the initial request.
        return request

    def lease(self, items):
        """Add the given messages to lease management.

        Args:
            items(Sequence[LeaseRequest]): The items to lease.
        """
        for item in items:
            # Add the ack ID to the set of managed ack IDs, and increment
            # the size counter.
            if item.ack_id not in self.leased_messages:
                self.leased_messages[item.ack_id] = _LeasedMessage(
                    added_time=time.time(),
                    size=item.byte_size)
                self._bytes += item.byte_size
            else:
                _LOGGER.debug(
                    'Message %s is already lease managed', item.ack_id)

        # Sanity check: Do we have too many things in our inventory?
        # If we do, we need to stop the stream.
        if self._load >= 1.0:
            self._consumer.pause()

    def maintain_leases(self):
        """Maintain all of the leases being managed by the policy.

        This method modifies the ack deadline for all of the managed
        ack IDs, then waits for most of that time (but with jitter), and
        then calls itself.

        .. warning::
            This method blocks, and generally should be run in a separate
            thread or process.

            Additionally, you should not have to call this method yourself,
            unless you are implementing your own policy. If you are
            implementing your own policy, you _should_ call this method
            in an appropriate form of subprocess.
        """
        while True:
            # Sanity check: Should this infinite loop quit?
            if not self._consumer.active:
                _LOGGER.debug('Consumer inactive, ending lease maintenance.')
                return

            # Determine the appropriate duration for the lease. This is
            # based off of how long previous messages have taken to ack, with
            # a sensible default and within the ranges allowed by Pub/Sub.
            p99 = self.histogram.percentile(99)
            _LOGGER.debug('The current p99 value is %d seconds.', p99)

            # Make a copy of the leased messages. This is needed because it's
            # possible for another thread to modify the dictionary while
            # we're iterating over it.
            leased_messages = copy.copy(self.leased_messages)

            # Drop any leases that are well beyond max lease time. This
            # ensures that in the event of a badly behaving actor, we can
            # drop messages and allow Pub/Sub to resend them.
            cutoff = time.time() - self.flow_control.max_lease_duration
            to_drop = [
                DropRequest(ack_id, item.size)
                for ack_id, item
                in six.iteritems(leased_messages)
                if item.added_time < cutoff]

            if to_drop:
                _LOGGER.warning(
                    'Dropping %s items because they were leased too long.',
                    len(to_drop))
                self.drop(to_drop)

            # Remove dropped items from our copy of the leased messages (they
            # have already been removed from the real one by self.drop).
            for item in to_drop:
                leased_messages.pop(item.ack_id)

            # Create a streaming pull request.
            # We do not actually call `modify_ack_deadline` over and over
            # because it is more efficient to make a single request.
            ack_ids = list(leased_messages.keys())
            if ack_ids:
                _LOGGER.debug('Renewing lease for %d ack IDs.', len(ack_ids))

                # NOTE: This may not work as expected if ``consumer.active``
                #       has changed since we checked it. An implementation
                #       without any sort of race condition would require a
                #       way for ``send_request`` to fail when the consumer
                #       is inactive.
                self.modify_ack_deadline([
                    ModAckRequest(ack_id, p99) for ack_id in ack_ids])

            # Now wait an appropriate period of time and do this again.
            #
            # We determine the appropriate period of time based on a random
            # period between 0 seconds and 90% of the lease. This use of
            # jitter (http://bit.ly/2s2ekL7) helps decrease contention in cases
            # where there are many clients.
            snooze = random.uniform(0.0, p99 * 0.9)
            _LOGGER.debug('Snoozing lease management for %f seconds.', snooze)
            time.sleep(snooze)

    def modify_ack_deadline(self, items):
        """Modify the ack deadline for the given messages.

        Args:
            items(Sequence[ModAckRequest]): The items to modify.
        """
        ack_ids = [item.ack_id for item in items]
        seconds = [item.seconds for item in items]

        request = types.StreamingPullRequest(
            modify_deadline_ack_ids=ack_ids,
            modify_deadline_seconds=seconds,
        )
        self._consumer.send_request(request)

    def nack(self, items):
        """Explicitly deny receipt of messages.

        Args:
            items(Sequence[NackRequest]): The items to deny.
        """
        self.modify_ack_deadline([
            ModAckRequest(ack_id=item.ack_id, seconds=0)
            for item in items])
        self.drop(
            [DropRequest(*item) for item in items])

    @abc.abstractmethod
    def close(self):
        """Close the existing connection.

        Raises:
            NotImplementedError: Always
        """
        raise NotImplementedError

    @abc.abstractmethod
    def on_exception(self, exception):
        """Called when a gRPC exception occurs.

        If this method does nothing, then the stream is re-started. If this
        raises an exception, it will stop the consumer thread. This is
        executed on the response consumer helper thread.

        Implementations should return :data:`True` if they want the consumer
        thread to remain active, otherwise they should return :data:`False`.

        Args:
            exception (Exception): The exception raised by the RPC.

        Raises:
            NotImplementedError: Always
        """
        raise NotImplementedError

    def on_request(self, request):
        """Called whenever a request has been sent to gRPC.

        This allows the policy to measure the rate of requests sent along the
        stream and apply backpressure by pausing or resuming the consumer
        if needed.

        Args:
            request (Any): The protobuf request that was sent to gRPC.
        """
        self._maybe_resume_consumer()

    @abc.abstractmethod
    def on_response(self, response):
        """Process a response from gRPC.

        This gives the consumer control over how responses are scheduled to
        be processed. This method is expected to not block and instead
        schedule the response to be consumed by some sort of concurrency.

        For example, if a the Policy implementation takes a callback in its
        constructor, you can schedule the callback using a
        :class:`concurrent.futures.ThreadPoolExecutor`::

            self._pool.submit(self._callback, response)

        This is called from the response consumer helper thread.

        Args:
            response (Any): The protobuf response from the RPC.

        Raises:
            NotImplementedError: Always
        """
        raise NotImplementedError

    @abc.abstractmethod
    def open(self, callback):
        """Open a streaming pull connection and begin receiving messages.

        For each message received, the ``callback`` function is fired with
        a :class:`~.pubsub_v1.subscriber.message.Message` as its only
        argument.

        This method is virtual, but concrete implementations should return
        a :class:`~google.api_core.future.Future` that provides an interface
        to block on the subscription if desired, and handle errors.

        Args:
            callback (Callable[Message]): A callable that receives a
                Pub/Sub Message.

        Raises:
            NotImplementedError: Always
        """
        raise NotImplementedError