Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
"""A registry that contains information about all the clients that the
server is currently connected to.
"""

from blinker import Signal
from collections import defaultdict
from contextlib import contextmanager
from time import time
from typing import Iterable, Iterator

from flockwave.server.model.client import Client
from flockwave.server.registries.channels import ChannelTypeRegistry
from flockwave.server.logger import log as base_log

from .base import RegistryBase

__all__ = ("ClientRegistry",)

log = base_log.getChild("registries.clients")


class ClientRegistry(RegistryBase[Client]):
    """Registry that contains information about all the clients that the
    server is currently connected to.

    Attributes:
        added: signal that is sent by the registry when a new client has been
            added to the registry. The signal has a keyword argument named
            ``client`` that contains the client that has just been added to the
            registry.

        channel_type_registry: the channel type registry that the client
            registry turns to when it has to construct a new communication
            channel instance to a client.

        count_changed: signal that is sent by the registry when the
            number of connected clients changed. This can be used by
            extensions to optimize their behaviour when no clients are
            connected.

        removed: signal that is sent by the registry when a client has been
            removed from the registry. The signal has a keyword argument named
            ``client`` that contains the client that has just been removed from
            the registry.
    """

    added: Signal = Signal()
    count_changed: Signal = Signal()
    removed: Signal = Signal()

    channel_type_registry: ChannelTypeRegistry

    _client_id_to_channel_type: dict[str, str]
    _entries_by_channel_type: dict[str, set[str]]

    def __init__(self, channel_type_registry: ChannelTypeRegistry):
        """Constructor."""
        super().__init__()
        self.channel_type_registry = channel_type_registry
        self._client_id_to_channel_type = {}
        self._entries_by_channel_type = defaultdict(set)

    def add(self, client_id: str, channel_type: str) -> Client:
        """Adds a new client to the set of clients connected to the server.

        This function is a no-op if the client is already added. It is
        assumed that a client may not connect twice to the server with the
        same ID.

        Arguments:
            client_id: the ID of the client
            channel_type: the type of the communication channel that
                connects the client to the server. It must be registered in
                the channel type registry.

        Returns:
            the client object that was added
        """
        if client_id in self:
            return self[client_id]

        channel = self.channel_type_registry.create_channel_for(channel_type)
        client = Client(_id=client_id, _channel=channel, _connected_at=time())
        channel.bind_to(client)

        self._entries[client_id] = client
        self._client_id_to_channel_type[client_id] = channel_type
        self._entries_by_channel_type[channel_type].add(client_id)

        log.info("Client connected", extra={"id": client_id})

        self.added.send(self, client=client)
        self.count_changed.send(self)

        return client

    def client_ids_for_channel_type(self, channel_type: str) -> Iterable[str]:
        """Returns an iterator that contains the IDs of all the clients who
        are registered in the registry with the given channel type.

        Arguments:
            channel_type (str): the communication channel type to query

        Returns:
            Iterator[str]: an iterable that yields the IDs of all the clients
                who are registered in the registry with the given channel type
        """
        return iter(self._entries_by_channel_type.get(channel_type, []))

    def has_clients_for_channel_type(self, channel_type: str) -> bool:
        """Returns whether there is at least one connected client for the
        given channel type.

        Arguments:
            channel_type: the communication channel type to query

        Returns:
            ``True`` if there is at least one connected client with the given
            channel type, ``False`` otherwise
        """
        return bool(self._entries_by_channel_type.get(channel_type))

    @property
    def num_entries(self) -> int:
        """Returns the number of clients currently connected to the
        server.
        """
        return len(self._entries)

    def remove(self, client_id: str) -> None:
        """Removes a client from the set of clients connected to the server.

        This function is a no-op if the client was already removed.

        Arguments:
            client_id: the ID of the client to remove
        """
        try:
            client = self._entries.pop(client_id)
        except KeyError:
            return

        try:
            channel_type = self._client_id_to_channel_type.pop(client_id)
        except KeyError:
            # This should not happen
            log.warning(
                "Cannot find channel type for client ID {0!r}".format(client_id)
            )
            return

        try:
            self._entries_by_channel_type[channel_type].remove(client_id)
        except KeyError:
            # This should not happen
            log.warning(
                f"Cannot remove channel type index entry for client ID {client_id!r}"
            )

        log.info("Client disconnected", extra={"id": client_id})

        self.count_changed.send(self)
        self.removed.send(self, client=client)

    @contextmanager
    def use(self, client_id: str, channel_type: str) -> Iterator[Client]:
        """Temporarily adds a new client with the given client ID and
        channel type, hands control back to the caller in a context, and
        then removes the client when the caller exits the context.

        Arguments:
            client_id: the ID of the client
            channel_type: the type of the communication channel that
                connects the client to the server. It must be registered in
                the channel type registry.

        Yields:
            Client: the client object that was added
        """
        if client_id in self:
            yield self[client_id]
        else:
            client = self.add(client_id, channel_type)
            try:
                yield client
            finally:
                self.remove(client_id)