Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
"""Base listener classes."""

import logging

from abc import ABCMeta, abstractmethod, abstractproperty
from blinker import Signal
from enum import Enum
from trio import CancelScope, Event, Nursery, TASK_STATUS_IGNORED
from trio.abc import Stream
from trio_util import AsyncBool
from typing import Callable, Optional


__all__ = ("Listener", "ListenerState", "ListenerBase")


class ListenerState(Enum):
    CLOSED = "CLOSED"
    PREPARING = "PREPARING"
    OPEN = "OPEN"
    CLOSING = "CLOSING"

    @property
    def is_transitioning(self) -> bool:
        return self in (ListenerState.PREPARING, ListenerState.CLOSING)


log = logging.getLogger(__name__.rpartition(".")[0])


class Listener(metaclass=ABCMeta):
    """Interface specification for stateful listener objects."""

    opened = Signal(
        doc="Signal sent after the listener started listening for incoming connections."
    )
    closed = Signal(
        doc="Signal sent after the listener stopped listening for incoming connections."
    )
    state_changed = Signal(
        doc="""\
        Signal sent whenever the state of the listener changes.

        Parameters:
            new_state: the new state
            old_state: the old state
        """
    )

    @abstractmethod
    async def open(self) -> None:
        """Opens the listener. No-op if the listener is listening already."""
        raise NotImplementedError

    @abstractmethod
    async def close(self) -> None:
        """Closes the listener. No-op if the listener is closed already."""
        raise NotImplementedError

    @property
    def is_closed(self) -> bool:
        """Returns whether the listener is closed (and not closing and
        not preparing)."""
        return self.state is ListenerState.CLOSED

    @property
    def is_open(self) -> bool:
        """Returns whether the connection is open."""
        return self.state is ListenerState.OPEN

    @property
    def is_transitioning(self) -> bool:
        """Returns whether listener is currently transitioning."""
        return self.state.is_transitioning

    @abstractproperty
    def state(self) -> ListenerState:
        """Returns the state of the listener; one of the constants from
        the ``ConnectionState`` enum.
        """
        raise NotImplementedError

    @abstractmethod
    async def wait_until_open(self) -> None:
        """Blocks the current task until the listener becomes open. Returns
        immediately if the listener is already open.
        """
        raise NotImplementedError

    @abstractmethod
    async def wait_until_closed(self) -> None:
        """Blocks the execution until the listener becomes closed. Return
        immediately if the connection is already disconnected.
        """
        raise NotImplementedError

    async def __aenter__(self):
        await self.open()
        return self

    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.close()


class ListenerBase(Listener):
    """Base class for stateful listener objects.

    Listener objects may be in one of the following four states:

        - ``CLOSED``: the listener is closed and not listening for incoming
          connections

        - ``PREPARING``: the listener is being prepared to accept connections

        - ``OPEN``: the listener is open and is actively listening for
          incoming connections

        - ``CLOSING``: the listener is being closed

    Each listener object provides three signals that interested parties
    may connect to if they want to be notified about changes in the listener
    states: ``state_changed``, ``open`` and ``closed``.
    ``state_changed`` is fired whenever the listener state changes.
    ``open`` is fired when the listener enters the ``OPEN`` state
    from any other state. ``closed`` is fired when the connection enters
    the ``CLOSED`` state from any other state.

    Classes derived from this base class *MUST NOT* set the ``_state`` variable
    directly; they *MUST* use the ``_set_state`` method instead to ensure that
    the signals are dispatched appropriately.
    """

    def __init__(self):
        """Constructor."""
        self._state = ListenerState.CLOSED

        self._is_open = AsyncBool(False)
        self._is_closed = AsyncBool(True)

    @property
    def state(self):
        """The state of the listener."""
        return self._state

    def _set_state(self, new_state):
        """Sets the state of the listener to a new value and sends the
        appropriate signals.
        """
        old_state = self._state
        if new_state == old_state:
            return

        self._state = new_state

        self.state_changed.send(self, old_state=old_state, new_state=new_state)

        if not self._is_open.value and new_state is ListenerState.OPEN:
            self._is_open.value = True
            self.opened.send(self)

        if not self._is_closed.value and new_state is ListenerState.CLOSED:
            self._is_closed.value = True
            self.closed.send(self)

        if self._is_open.value and new_state is not ListenerState.OPEN:
            self._is_open.value = False

        if self._is_closed.value and new_state is not ListenerState.CLOSED:
            self._is_closed.value = False

    async def close(self):
        """Base implementation of Listener.close() that manages the state of
        the connection correctly.

        Typically, you don't need to override this method in subclasses;
        override `_close()` instead.
        """
        if self.state is ListenerState.CLOSED:
            return
        elif self.state is ListenerState.CLOSING:
            return await self.wait_until_closed()
        elif self.state is ListenerState.PREPARING:
            await self.wait_until_open()

        self._set_state(ListenerState.CLOSING)
        success = False
        try:
            # TODO(ntamas): use a timeout here!
            await self._close()
            success = True
        finally:
            self._set_state(ListenerState.CLOSED if success else ListenerState.OPEN)

    async def open(self):
        """Base implementation of Listener.open() that manages the state
        of the connection correctly.

        Typically, you don't need to override this method in subclasses;
        override `_open()` instead.
        """
        if self.state is ListenerState.OPEN:
            return
        elif self.state is ListenerState.PREPARING:
            return await self.wait_until_open()
        elif self.state is ListenerState.CLOSING:
            await self.wait_until_closed()

        self._set_state(ListenerState.PREPARING)
        success = False
        try:
            # TODO(ntamas): use a timeout here!
            await self._open()
            success = True
        finally:
            self._set_state(ListenerState.OPEN if success else ListenerState.CLOSED)

    async def wait_until_open(self):
        """Blocks the execution until the listener becomes open."""
        await self._is_open.wait_value(True)

    async def wait_until_closed(self):
        """Blocks the execution until the listener becomes closed."""
        await self._is_closed.wait_value(True)

    @abstractmethod
    async def _open(self):
        """Internal implementation of `ListenerBase.open()`.

        Override this method in subclasses to implement how your listener
        is opened. No need to update the state variable from inside this
        method; the caller will do it automatically.
        """
        raise NotImplementedError

    @abstractmethod
    async def _close(self):
        """Internal implementation of `ListenerBase.close()`.

        Override this method in subclasses to implement how your listener
        is closed. No need to update the state variable from inside this
        method; the caller will do it automatically.
        """
        raise NotImplementedError


TrioConnectionHandler = Callable[[Stream], None]


class TrioListenerBase(ListenerBase):
    """Specialization of ListenerBase_ for Trio-style listeners.

    In Trio, the common pattern is that a listener can be opened by spawning
    a task with a dedicated function such as `serve_tcp()`. The listener
    listens for inbound connections in the task, and it can be closed by
    cancelling the task itself.

    Trio-style listeners need a Trio nursery that can be used to spawn a
    new task. The nursery has to be provided to the constructor of the listener,
    or, alternatively, after construction in the `nursery` property.

    In general, the owner of the nursery should not be worried about exceptions
    propagating out from the listener task; TrioListenerBase_ will handle these
    exceptions appropriately.
    """

    def __init__(
        self,
        *,
        handler: Optional[TrioConnectionHandler] = None,
        nursery: Optional[Nursery] = None,
    ):
        """Constructor.

        Parameters:
            handler: the handler function that will be called for every
                connection that was established
            nursery: the Trio nursery that will be the owner of the listener
                task spawned by this instance
        """
        super().__init__()

        self.nursery = nursery
        self.handler = handler

        self._cancel_scope = None
        self._task_exited = None

    async def _close(self):
        if self._cancel_scope:
            self._cancel_scope.cancel()
            self._cancel_scope = None

        await self._task_exited.wait()

    def _handle_exception(self, ex):
        """Handles an exception that happened while running the listener task.

        The default implementation logs the exception.
        """
        log.exception(ex)

    async def _open(self):
        if self.nursery is None:
            raise RuntimeError(
                "You must assign a nursery to a {!r} before opening it".format(
                    self.__class__
                )
            )

        await self.nursery.start(self._run_internal_trio_task)

    async def _run_internal_trio_task(self, *, task_status=TASK_STATUS_IGNORED):
        """Executes the internal, cancellable Trio task that listens for
        incoming connections.

        Do not override this method unless you know what you are doing.
        Subclasses should override the `_run()` method instead. This function
        sets up the cancellation scope for the internal task and catches
        exceptions raised from the task so you don't have to deal with these
        in `_run()`.
        """
        self._cancel_scope = CancelScope()
        self._task_exited = Event()

        with self._cancel_scope:
            try:
                await self._run(handler=self.handler, task_status=task_status)
            except Exception as ex:
                self._handle_exception(ex)

            # Task exited without it being cancelled; this means that the
            # listener must be closed explicitly.
            self.nursery.start_soon(self.close)

        self._cancel_scope = None
        self._task_exited.set()

    @abstractmethod
    async def _run(self, handler, task_status) -> None:
        """Executes the internal Trio task that listens for incoming
        connections.

        The listener is considered to be open while this task is running.
        When the task exits or is cancelled, the connection will be closed
        automatically.

        Parameters:
            handler: a connection handler that will be called by Trio for every
                connection that was established by the listener
            task_status: Trio task status argument. It is the responsibility
                of this method to call the `started()` method on this object
                to notify the listener instance that it is now open and
                accepting connections, or to arrange for someone else (e.g.,
                Trio's `serve_tcp()` or similar method) to call this method
                when needed. Failure to do so will mean that the listener is
                never considered as open.
        """
        raise NotImplementedError