Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-server / server / registries / channels.py
Size: Mime:
"""A registry that contains information about all the different types of
communication channels that the server can handle.

Communication channels may include things like Socket.IO connections, bare
TCP or UDP connections and so on.

Note that the registry keeps track of the different *types* of communication
channels, not each individual channel between a client and the server.
"""

from blinker import Signal
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Callable, Generic, Iterator, TypeVar

from flockwave.connections import IPAddressAndPort

from ..logger import log as base_log
from ..model import CommunicationChannel
from .base import RegistryBase

__all__ = ("ChannelTypeRegistry",)

log = base_log.getChild("registries.channels")

T = TypeVar("T")


@dataclass(frozen=True)
class ChannelTypeDescriptor(Generic[T]):
    id: str
    """The ID of the communication channel type."""

    factory: Callable[[], CommunicationChannel[T]]
    """A callable that constructs a new communication channel of this type
    when invoked with no arguments.
    """

    broadcaster: Callable[[T], None] | None = None
    """An optional callable that implements broadcasting a message to all clients
    who are currently connected to the server with this communication channel
    type. The callable will be called with the message to be sent as its only
    argument.

    ``None`` means that no such broadcasting is possible and the application
    has to fall back to sending individual messages.
    """

    ssdp_location: Callable[[IPAddressAndPort | None], str | None] | None = None
    """An optional callable that can be called with a single host-port pair or
    ``None`` and that returns a URI describing the location where the communication
    channel can be accessed from the outside. `None` as the input argument means
    a generic query without a specific remote address in mind.
    """

    def get_ssdp_location(self, source: IPAddressAndPort | None = None) -> str | None:
        locator = self.ssdp_location
        return locator(source) if callable(locator) else None


class ChannelTypeRegistry(RegistryBase[ChannelTypeDescriptor], Generic[T]):
    """Registry that contains information about all the communication channel
    types that the server can handle.

    Attributes:
        added (Signal): signal that is sent by the registry when a new
            communication channel type has been registered in the registry.
            The signal has a keyword argment named ``descriptor`` that
            contains information about the channel type that was added.
            ``descriptor`` will be an instance of ChannelTypeDescriptor_.

        count_changed (Signal): signal that is sent by the registry when the
            number of registered communication channel types has changed.

        removed (Signal): signal that is sent by the registry when a
            communication channel type has been removed from the registry.
            The signal has a keyword argment named ``descriptor`` that
            contains information about the channel type that was removed.
            ``descriptor`` will be an instance of ChannelTypeDescriptor_.
    """

    added = Signal()
    count_changed = Signal()
    removed = Signal()

    def add(
        self,
        channel_id: str,
        factory: Callable[[], CommunicationChannel[T]],
        broadcaster: Callable[[T], None] | None = None,
        ssdp_location: Callable[[IPAddressAndPort | None], str | None] | None = None,
    ):
        """Adds a new communication channel class to the registry.

        This function throws an error if the ID is already taken.

        Arguments:
            channel_id: the ID of the communication channel type
            factory: a callable that constructs a new communication channel
                of this type when invoked with no arguments. The callable is
                typically a class that extends CommunicationChannel_ and has an
                appropriate constructor, but can be an arbitrary callable as
                long as it returns an instance of CommunicationChannel_.
            address: a callable that can be called with a single host-port pair
                or ``None`` and that returns a tuple consisting of a hostname
                and the corresponding port where the communication channel can
                be accessed from the outside. The callable may be ``None`` or
                may return ``None`` if such a location cannot sensibly be derived.
                The argument of the callable describes the remote address that is
                interested in the location of the channel; the implementation
                should strive to return an IP address that is on the same subnet
                as the remote address.
            broadcaster: a callable that implements broadcasting a message to
                all clients that are currently connected to the server with this
                communication channel type. The callable will be called with the
                message to be sent as its only argument. When this property is
                ``None``, it is assumed that there is no compact way to broadcast
                a message to all the clients who are connected with this
                channel type, and the application will fall back to sending
                individual messages.
            ssdp_location: a callable that can be called with a single host-port
                pair or ``None`` and that returns a URI describing the location
                where the communication channel can be accessed from the outside.
                For instance, a TCP channel may return ``tcp://192.168.1.17:1234``
                there if the server is listening on 192.168.1.17, port 1234. The
                callable may be ``None`` or may return ``None`` if such a
                location cannot sensibly be derived. The argument of the
                callable describes the remote address that is interested in the
                channel; the implementation should strive to return an IP
                address that is on the same subnet as the remote address.
        """
        if channel_id in self:
            return

        descriptor = ChannelTypeDescriptor(
            id=channel_id,
            factory=factory,
            broadcaster=broadcaster,
            ssdp_location=ssdp_location,
        )
        self._entries[channel_id] = descriptor

        log.debug("Channel registered", extra={"id": channel_id})

        self.added.send(self, id=channel_id, descriptor=descriptor)
        self.count_changed.send(self)

    def create_channel_for(self, channel_id: str) -> CommunicationChannel[T]:
        """Creates a new communication channel with the type whose ID is
        given in the first argument.

        Arguments:
            channel_id (str): the ID of the communication channel type

        Returns:
            CommunicationChannel: a new communication channel of the given
                type.
        """
        result = self._entries[channel_id].factory()
        assert isinstance(result, CommunicationChannel), (
            "communication channel factory did not return a CommunicationChannel"
        )
        return result

    @property
    def num_entries(self):
        """Returns the number of channel types currently registered in the
        registry.
        """
        return len(self._entries)

    def remove(self, channel_id) -> None:
        """Removes a communication channel class by ID from the set of
        channels registered in the registry.

        This function is a no-op if the channel class was already removed.

        The behaviour of the server is undefined if there are still clients
        who use a communication channel of this type.

        Arguments:
            channel_id (str): the ID of the channel type to remove
        """
        try:
            descriptor = self._entries.pop(channel_id)
        except KeyError:
            return

        log.debug("Channel deregistered", extra={"id": channel_id})
        self.count_changed.send(self)
        self.removed.send(self, id=channel_id, descriptor=descriptor)

    @contextmanager
    def use(
        self,
        name: str,
        *,
        factory: Callable[[], CommunicationChannel[T]],
        broadcaster: Callable[[T], None] | None = None,
        ssdp_location: Callable[[IPAddressAndPort | None], str | None] | None = None,
    ) -> Iterator[None]:
        """Context manager that temporarily adds a channel to the channel
        registry and unregisters it upon exiting the context.

        Keyword arguments are forwarded intact to `add()`. See the list of
        supported arguments there.

        Parameters:
            name: name of the channel to register
        """
        try:
            self.add(
                name,
                factory=factory,
                broadcaster=broadcaster,
                ssdp_location=ssdp_location,
            )
            yield
        finally:
            self.remove(name)