Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
"""Application object for the Skybrush server."""

from collections import Counter, defaultdict
from inspect import isawaitable, isasyncgen
from os import environ
from platformdirs import AppDirs
from trio import (
    BrokenResourceError,
    move_on_after,
)
from typing import (
    Any,
    Iterable,
    Optional,
    Sequence,
    Union,
)

from flockwave.app_framework import DaemonApp
from flockwave.app_framework.configurator import AppConfigurator, Configuration
from flockwave.connections.base import ConnectionState
from flockwave.gps.vectors import GPSCoordinate
from flockwave.server.ports import get_port_map, set_base_port
from flockwave.server.utils import divide_by, rename_keys
from flockwave.server.utils.packaging import is_packaged
from flockwave.server.utils.system_time import (
    can_set_system_time_detailed_async,
    get_system_time_msec,
    set_system_time_msec_async,
)

from .commands import CommandExecutionManager, CommandExecutionStatus
from .errors import NotSupportedError
from .logger import log
from .message_hub import (
    BatchMessageRateLimiter,
    ConnectionStatusMessageRateLimiter,
    MessageHub,
    RateLimiters,
    UAVMessageRateLimiter,
)
from .message_handlers import MessageBodyTransformationSpec, transform_message_body
from .model.client import Client
from .model.devices import DeviceTree, DeviceTreeSubscriptionManager
from .model.errors import ClientNotSubscribedError, NoSuchPathError
from .model.log import LogMessage, Severity
from .model.messages import FlockwaveMessage, FlockwaveNotification, FlockwaveResponse
from .model.object import ModelObject
from .model.transport import TransportOptions
from .model.uav import is_uav, UAV, UAVDriver
from .model.world import World
from .registries import (
    ChannelTypeRegistry,
    ClientRegistry,
    ConnectionRegistry,
    ConnectionRegistryEntry,
    ObjectRegistry,
    UAVDriverRegistry,
    find_in_registry,
)
from .version import __version__ as server_version

__all__ = ("app",)

PACKAGE_NAME = __name__.rpartition(".")[0]


#: Table that describes the handlers of several UAV-related command requests
UAV_COMMAND_HANDLERS: dict[str, tuple[str, MessageBodyTransformationSpec]] = {
    "LOG-DATA": ("get_log", rename_keys({"logId": "log_id"})),
    "LOG-INF": ("get_log_list", None),
    "OBJ-CMD": ("send_command", None),
    "PRM-GET": ("get_parameter", None),
    "PRM-SET": ("set_parameter", None),
    "PRM-SET-MANY": ("set_parameters", None),
    "UAV-CALIB": ("calibrate_component", None),
    "UAV-FLY": (
        "send_fly_to_target_signal",
        {"target": GPSCoordinate.from_json},
    ),
    "UAV-HALT": ("send_shutdown_signal", {"transport": TransportOptions.from_json}),
    "UAV-HOVER": ("send_hover_signal", {"transport": TransportOptions.from_json}),
    "UAV-LAND": ("send_landing_signal", {"transport": TransportOptions.from_json}),
    "UAV-MOTOR": (
        "send_motor_start_stop_signal",
        {"transport": TransportOptions.from_json},
    ),
    "UAV-PREFLT": ("request_preflight_report", None),
    "UAV-RST": ("send_reset_signal", {"transport": TransportOptions.from_json}),
    "UAV-RTH": (
        "send_return_to_home_signal",
        {"transport": TransportOptions.from_json},
    ),
    "UAV-SIGNAL": (
        "send_light_or_sound_emission_signal",
        {"duration": divide_by(1000), "transport": TransportOptions.from_json},
    ),
    "UAV-SLEEP": (
        "enter_low_power_mode",
        {"transport": TransportOptions.from_json},
    ),
    "UAV-TAKEOFF": ("send_takeoff_signal", {"transport": TransportOptions.from_json}),
    "UAV-TEST": ("test_component", None),
    "UAV-VER": ("request_version_info", None),
    "UAV-WAKEUP": (
        "resume_from_low_power_mode",
        {"transport": TransportOptions.from_json},
    ),
}

#: Constant for a dummy UAV command handler that does nothing
NULL_HANDLER = (None, None)


class SkybrushServer(DaemonApp):
    """Main application object for the Skybrush server."""

    channel_type_registry: ChannelTypeRegistry
    """Central registry for types of communication channels that the server can
    handle and manage. Types of communication channels include Socket.IO
    streams, TCP or UDP sockets and so on.
    """

    client_registry: ClientRegistry
    """Registry for the clients that are currently connected to the server."""

    command_execution_manager: CommandExecutionManager
    """Object that manages the asynchronous execution of commands on remote UAVs
    (i.e. commands that cannot be executed immediately in a synchronous manner)
    """

    device_tree: DeviceTree
    """Tree-like data structure that contains a first-level node for every UAV
    and then contains additional nodes in each UAV subtree for the devices and
    channels of the UAV.
    """

    message_hub: MessageHub
    """Central messaging hub via which one can send Flockwave messages."""

    object_registry: ObjectRegistry
    """Central registry for the objects known to the server."""

    uav_driver_registry: UAVDriverRegistry
    """Registry for UAV drivers that are currently registered in the server."""

    world: World
    """A representation of the "world" in which the flock of UAVs live. By
    default, the world is empty but extensions may extend it with objects.
    """

    _registry_full_error_counts: Counter[Any]
    """Object that counts how many times we did report a registry full error
    for a given source. This is used to limit the number of warnings we print.
    """

    def cancel_async_operations(
        self, receipt_ids: Iterable[str], in_response_to: FlockwaveMessage
    ) -> FlockwaveResponse:
        """Handles a request to cancel one or more pending asynchronous operations,
        identified by their receipt IDs.

        Parameters:
            receipt_ids: the receipt IDs of the pending asynchronous operations
            in_response_to: the message that the constructed message will
                respond to
        """
        response = self.message_hub.create_response_or_notification(
            body={}, in_response_to=in_response_to
        )
        valid_ids: list[str] = []

        manager = self.command_execution_manager

        for receipt_id in receipt_ids:
            if manager.is_valid_receipt_id(receipt_id):
                valid_ids.append(receipt_id)
                response.add_success(receipt_id)
            else:
                response.add_error(receipt_id, "no such receipt")

        for receipt_id in valid_ids:
            manager.cancel(receipt_id)

        return response

    def create_CONN_INF_message_for(
        self,
        connection_ids: Iterable[str],
        in_response_to: Optional[FlockwaveMessage] = None,
    ) -> FlockwaveMessage:
        """Creates a CONN-INF message that contains information regarding
        the connections with the given IDs.

        Parameters:
            connection_ids (iterable): list of connection IDs
            in_response_to (FlockwaveMessage or None): the message that the
                constructed message will respond to. ``None`` means that the
                constructed message will be a notification.

        Returns:
            FlockwaveMessage: the CONN-INF message with the status info of
                the given connections
        """
        statuses = {}

        body = {"status": statuses, "type": "CONN-INF"}
        response = self.message_hub.create_response_or_notification(
            body=body, in_response_to=in_response_to
        )

        for connection_id in connection_ids:
            entry = self._find_connection_by_id(connection_id, response)
            if entry:
                statuses[connection_id] = entry.json

        return response

    def create_DEV_INF_message_for(
        self, paths: Iterable[str], in_response_to: Optional[FlockwaveMessage] = None
    ) -> FlockwaveMessage:
        """Creates a DEV-INF message that contains information regarding
        the current values of the channels in the subtrees of the device
        tree matched by the given device tree paths.

        Parameters:
            paths: list of device tree paths
            in_response_to: the message that the constructed message will
                respond to. ``None`` means that the constructed message will be
                a notification.

        Returns:
            the DEV-INF message with the current values of the channels in the
            subtrees matched by the given device tree paths
        """
        return self.device_tree_subscriptions.create_DEV_INF_message_for(
            paths, in_response_to
        )

    def create_DEV_LIST_message_for(
        self,
        object_ids: Iterable[str],
        in_response_to: FlockwaveMessage,
    ) -> FlockwaveMessage:
        """Creates a DEV-LIST message that contains information regarding
        the device trees of the objects with the given IDs.

        Parameters:
            object_ids: list of object IDs
            in_response_to: the message that the constructed message will
                respond to.

        Returns:
            the DEV-LIST message with the device trees of the given objects
        """
        devices = {}

        body = {"devices": devices, "type": "DEV-LIST"}
        response = self.message_hub.create_response_or_notification(
            body=body, in_response_to=in_response_to
        )

        for object_id in object_ids:
            object = self._find_object_by_id(object_id, response)
            if object:
                if object.device_tree_node:
                    devices[object_id] = object.device_tree_node.json  # type: ignore
                else:
                    devices[object_id] = {}

        return response

    def create_DEV_LISTSUB_message_for(
        self,
        client: Client,
        path_filter: Iterable[str],
        in_response_to: FlockwaveMessage,
    ):
        """Creates a DEV-LISTSUB message that contains information about the
        device tree paths that the given client is subscribed to.

        Parameters:
            client: the client whose subscriptions we are interested in
            path_filter: list of device tree paths whose subtrees
                the client is interested in
            in_response_to: the message that the constructed message will
                respond to. ``None`` means that the constructed message will be
                a notification.

        Returns:
            the DEV-LISTSUB message with the subscriptions of the client that
            match the path filters
        """
        manager = self.device_tree_subscriptions
        subscriptions = manager.list_subscriptions(client, path_filter)

        body = {"paths": list(subscriptions.elements()), "type": "DEV-LISTSUB"}

        response = self.message_hub.create_response_or_notification(
            body=body, in_response_to=in_response_to
        )

        return response

    def create_DEV_SUB_message_for(
        self,
        client: Client,
        paths: Iterable[str],
        lazy: bool,
        in_response_to: FlockwaveMessage,
    ) -> FlockwaveMessage:
        """Creates a DEV-SUB response for the given message and subscribes
        the given client to the given paths.

        Parameters:
            client: the client to subscribe to the given paths
            paths: list of device tree paths to subscribe the client to
            lazy: whether the client is allowed to subscribe to paths that do
                not exist yet.
            in_response_to: the message that the constructed message will
                respond to.

        Returns:
            the DEV-SUB message with the paths that the client was subscribed
            to, along with error messages for the paths that the client was not
            subscribed to
        """
        manager = self.device_tree_subscriptions
        response = self.message_hub.create_response_or_notification(
            {}, in_response_to=in_response_to
        )

        for path in paths:
            try:
                manager.subscribe(client, path, lazy)
            except NoSuchPathError:
                response.add_error(path, "No such device tree path")
            else:
                response.add_success(path)

        return response

    def create_DEV_UNSUB_message_for(
        self,
        client: Client,
        paths: Iterable[str],
        *,
        in_response_to: FlockwaveMessage,
        remove_all: bool,
        include_subtrees: bool,
    ) -> FlockwaveResponse:
        """Creates a DEV-UNSUB response for the given message and
        unsubscribes the given client from the given paths.

        Parameters:
            client: the client to unsubscribe from the given paths
            paths: list of device tree paths to unsubscribe the
                given client from
            in_response_to: the message that the
                constructed message will respond to.
            remove_all: when ``True``, the client will be unsubscribed
                from the given paths no matter how many times it is
                subscribed to them. When ``False``, an unsubscription will
                decrease the number of subscriptions to the given path by
                1 only.
            include_subtrees: when ``True``, subscriptions to nodes
                that are in the subtrees of the given paths will also be
                removed

        Returns:
            the DEV-UNSUB message with the paths that the client was
            unsubscribed from, along with error messages for the paths that the
            client was not unsubscribed from
        """
        manager = self.device_tree_subscriptions
        response = self.message_hub.create_response_or_notification(
            {}, in_response_to=in_response_to
        )

        if include_subtrees:
            # Collect all the subscriptions from the subtrees and pretend
            # that the user submitted that
            paths = manager.list_subscriptions(client, paths)

        for path in paths:
            try:
                manager.unsubscribe(client, path, force=remove_all)
            except NoSuchPathError:
                response.add_error(path, "No such device tree path")
            except ClientNotSubscribedError:
                response.add_error(path, "Not subscribed to this path")
            else:
                response.add_success(path)

        return response

    def create_SYS_MSG_message_from(
        self, messages: Iterable[LogMessage]
    ) -> FlockwaveNotification:
        """Creates a SYS-MSG message containing the given list of log messages.

        Typically, you should not use this method (unless you know what you are
        doing) because allows one to bypass the built-in rate limiting for
        SYS-MSG messages. If you only want to broadcast SYS-MSG messages to all
        interested parties, use ``request_to_send_SYS_MSG_message()``
        instead, which will send the notification immediately if the rate
        limiting constraints allow, but it may also wait a bit if the
        SYS-MSG messages are sent too frequently.

        Parameters:
            messages: iterable of log messages to put in the generated SYS-MSG
                message

        Returns:
            FlockwaveNotification: the SYS-MSG message with the given log
                messages
        """
        body = {"items": list(messages), "type": "SYS-MSG"}
        return self.message_hub.create_response_or_notification(body=body)

    def create_UAV_INF_message_for(
        self, uav_ids: Iterable[str], in_response_to: Optional[FlockwaveMessage] = None
    ):
        """Creates an UAV-INF message that contains information regarding
        the UAVs with the given IDs.

        Typically, you should not use this method from extensions because
        it allows one to bypass the built-in rate limiting for UAV-INF
        messages. The only exception is when ``in_response_to`` is set to
        a certain message identifier, in which case it makes sense to send
        the UAV-INF response immediately (after all, it was requested
        explicitly). If you only want to broadcast UAV-INF messages to all
        interested parties, use ``request_to_send_UAV_INF_message_for()``
        instead, which will send the notification immediately if the rate
        limiting constraints allow, but it may also wait a bit if the
        UAV-INF messages are sent too frequently.

        Parameters:
            uav_ids: list of UAV IDs
            in_response_to: the message that the constructed message will
                respond to. ``None`` means that the constructed message will be
                a notification.

        Returns:
            FlockwaveMessage: the UAV-INF message with the status info of
                the given UAVs
        """
        statuses = {}

        body = {"status": statuses, "type": "UAV-INF"}
        response = self.message_hub.create_response_or_notification(
            body=body, in_response_to=in_response_to
        )

        for uav_id in uav_ids:
            uav = self.find_uav_by_id(uav_id, response)
            if uav:
                statuses[uav_id] = uav.status.json  # type: ignore

        return response

    async def disconnect_client(
        self, client: Client, reason: Optional[str] = None, timeout: float = 10
    ) -> None:
        """Disconnects the given client from the server.

        Parameters:
            client: the client to disconnect
            reason: the reason for disconnection. WHen it is not ``None``,
                a ``SYS-CLOSE`` message is sent to the client before the
                connection is closed.
            timeout: maximum number of seconds to wait for the disconnection
                to happen gracefully. A forceful disconnection is attempted
                if the timeout expires.
        """
        if not client.channel:
            return

        if reason:
            message = self.message_hub.create_notification(
                body={"type": "SYS-CLOSE", "reason": reason}
            )
        else:
            message = None

        with move_on_after(timeout) as cancel_scope:
            if message:
                request = await self.message_hub.send_message(message, to=client)
                await request.wait_until_sent()
            await client.channel.close()

        if cancel_scope.cancelled_caught:
            await client.channel.close(force=True)

    async def dispatch_to_uav(
        self, message: FlockwaveMessage, sender: Client, *, id_property: str = "id"
    ) -> FlockwaveMessage:
        """Dispatches a message intended for a single UAV to the appropriate
        UAV driver.

        Parameters:
            message: the message that contains a request that is to be forwarded
                to a single UAV. The message is expected to have an ``id``
                property that contains the ID of the UAV to dispatch the message
                to. The name of the property can be overridden with the
                ``id_property`` parameter.
            sender: the client that sent the message
            id_property: name of the property in the message that contains the
                ID of the UAV to dispatch the message to

        Returns:
            a response to the original message that contains exactly one of the
            following three keys: ``result`` for the result of a successful
            message dispatch, ``error`` for a message dispatch that threw an
            error, or ``receipt`` if calling the message handler returned an
            awaitable
        """
        # Create the response
        response = self.message_hub.create_response_or_notification(
            body={}, in_response_to=message
        )

        # Process the body
        parameters = dict(message.body)
        message_type = parameters.pop("type")
        uav_id: Optional[str] = parameters.pop(id_property, None)
        uav: Optional[UAV] = None
        error: Optional[str] = None
        result: Any = None

        try:
            if uav_id is None:
                raise RuntimeError("message must contain a UAV ID")

            # Find the driver of the UAV
            uav = self.find_uav_by_id(uav_id)
            if uav is None:
                raise RuntimeError("no such UAV")

            # Find the method to invoke on the driver
            method_name, transformer = UAV_COMMAND_HANDLERS.get(
                message_type, NULL_HANDLER
            )

            # Transform the incoming arguments if needed before sending them
            # to the driver method
            parameters = transform_message_body(transformer, parameters)

            # Look up the method in the driver
            try:
                method = getattr(uav.driver, method_name)  # type: ignore
            except (AttributeError, RuntimeError, TypeError):
                raise RuntimeError("Operation not supported") from None

            # Execute the method and catch all runtime errors
            result = method(uav, **parameters)
        except NotImplementedError:
            error = "Operation not implemented"
        except NotSupportedError:
            error = "Operation not supported"
        except RuntimeError as ex:
            error = str(ex)
        except Exception as ex:
            error = "Unexpected error: {0}".format(ex)
            log.exception(ex)

        # Update the response
        if error is not None:
            response.body["error"] = error
        elif isinstance(result, Exception):
            response.body["error"] = str(result)
        elif isawaitable(result) or isasyncgen(result):
            assert uav is not None
            cmd_manager = self.command_execution_manager
            receipt = cmd_manager.new(client_to_notify=sender.id)
            response.body["receipt"] = receipt.id
            response.when_sent(cmd_manager.mark_as_clients_notified, receipt.id, result)
        else:
            response.body["result"] = result

        return response

    async def dispatch_to_uavs(
        self, message: FlockwaveMessage, sender: Client
    ) -> FlockwaveMessage:
        """Dispatches a message intended for multiple UAVs to the appropriate
        UAV drivers.

        Parameters:
            message: the message that contains a request that is to be forwarded
                to multiple UAVs. The message is expected to have an ``ids``
                property that lists the UAVs to dispatch the message to.
            sender: the client that sent the message

        Returns:
            a response to the original message that lists the IDs of the UAVs
            for which the message has been sent successfully and also the IDs of
            the UAVs for which the dispatch failed (in the ``success`` and
            ``failure`` keys).
        """
        # Create the response
        response = self.message_hub.create_response_or_notification(
            body={}, in_response_to=message
        )

        # Process the body
        parameters = dict(message.body)
        message_type = parameters.pop("type")
        uav_ids: Sequence[str] = parameters.pop("ids", ())
        transport: Any = parameters.get("transport")

        # Sort the UAVs being targeted by drivers. If `transport` is a
        # TransportOptions object and it indicates that we should ignore the
        # UAV IDs, get hold of all registered UAV drivers as well and extend
        # the uavs_by_drivers dict
        uavs_by_drivers = self.sort_uavs_by_drivers(uav_ids, response)
        if transport and isinstance(transport, dict) and transport.get("ignoreIds"):
            # TODO(ntamas): we do not have legitimate ways to communicate an
            # error back from a driver if the driver has no associated UAVs.
            for driver in self.uav_driver_registry:
                if driver not in uavs_by_drivers:
                    uavs_by_drivers[driver] = []

        # Find the method to invoke on the driver
        method_name, transformer = UAV_COMMAND_HANDLERS.get(message_type, NULL_HANDLER)

        # Transform the incoming arguments if needed before sending them
        # to the driver method
        parameters = transform_message_body(transformer, parameters)

        # Ask each affected driver to send the message to the UAV
        for driver, uavs in uavs_by_drivers.items():
            # Look up the method in the driver
            common_error, results = None, None
            try:
                method = getattr(driver, method_name)  # type: ignore
            except (AttributeError, RuntimeError, TypeError):
                common_error = "Operation not supported"
                method = None

            # Execute the method and catch all runtime errors
            if method is not None:
                try:
                    results = method(uavs, **parameters)
                except NotImplementedError:
                    common_error = "Operation not implemented"
                except NotSupportedError:
                    common_error = "Operation not supported"
                except Exception as ex:
                    common_error = "Unexpected error: {0}".format(ex)
                    log.exception(ex)

            # Update the response
            if common_error is not None:
                for uav in uavs:
                    response.add_error(uav.id, common_error)
            else:
                if isawaitable(results):
                    # Results are produced by an async function; we have to wait
                    # for it
                    # TODO(ntamas): no, we don't have to wait for it; we have
                    # to create a receipt for each UAV and then send a response
                    # now
                    try:
                        results = await results
                    except RuntimeError as ex:
                        # this is probably okay
                        results = ex
                    except Exception as ex:
                        # this is unexpected; let's log it
                        results = ex
                        log.exception(ex)

                if isinstance(results, Exception):
                    # Received an exception; send it back for all UAVs
                    for uav in uavs:
                        response.add_error(uav.id, str(results))
                elif not isinstance(results, dict):
                    # Common result has arrived, send it back for all UAVs
                    for uav in uavs:
                        response.add_result(uav.id, results)
                else:
                    # Results have arrived for each UAV individually, process them
                    for uav, result in results.items():
                        if isinstance(result, Exception):
                            response.add_error(uav.id, str(result))
                        elif isawaitable(result) or isasyncgen(result):
                            cmd_manager = self.command_execution_manager
                            receipt = cmd_manager.new(client_to_notify=sender.id)
                            response.add_receipt(uav.id, receipt)
                            response.when_sent(
                                cmd_manager.mark_as_clients_notified, receipt.id, result
                            )
                        else:
                            response.add_result(uav.id, result)

        return response

    def find_uav_by_id(
        self,
        uav_id: str,
        response: Optional[Union[FlockwaveResponse, FlockwaveNotification]] = None,
    ) -> Optional[UAV]:
        """Finds the UAV with the given ID in the object registry or registers
        a failure in the given response object if there is no UAV with the
        given ID.

        Parameters:
            uav_id: the ID of the UAV to find
            response: the response in which the failure can be registered

        Returns:
            the UAV with the given ID or ``None`` if there is no such UAV
        """
        return find_in_registry(
            self.object_registry,
            uav_id,
            predicate=is_uav,
            response=response,
            failure_reason="No such UAV",
        )  # type: ignore

    def handle_registry_full_error(self, source: Any, object: str) -> None:
        """Commomn handler for events when an extension tries to register an
        object in the object registry and the registry reaches the limits
        dictated by the license.

        Args:
            source: the source of the event. This object is simply used by
                identity to limit the number of warnings we print on the console
                for a single source.
            object: the type of the object that the caller attempted to register,
                in a human-readable form. This will be used in the warning being
                printed from this function.
        """
        cnt = self._registry_full_error_counts[source]
        if cnt <= 5:
            self._registry_full_error_counts[source] += 1
            self.log.warning(
                f"Error while registering a new {object}: server limits reached",
            )
            if cnt == 5:
                self.log.warning(
                    f"Further warnings about limits affecting a new {object} "
                    f"will be suppressed"
                )

    @property
    def num_clients(self) -> int:
        """The number of clients connected to the server."""
        return self.client_registry.num_entries

    def resume_async_operations(
        self,
        receipt_ids: Iterable[str],
        values: dict[str, Any],
        in_response_to: FlockwaveMessage,
    ) -> FlockwaveResponse:
        """Handles a request to resume one or more pending asynchronous operations,
        identified by their receipt IDs.

        Parameters:
            receipt_ids: the receipt IDs of the suspended asynchronous operations
            values: mapping from receipt IDs to the values to send back into
                the suspended asynchronous operations
            in_response_to: the message that the constructed message will
                respond to
        """
        response = self.message_hub.create_response_or_notification(
            body={}, in_response_to=in_response_to
        )
        valid_ids: list[str] = []

        if not isinstance(values, dict):
            for receipt_id in receipt_ids:
                response.add_error(receipt_id, "invalid values")
            return response

        manager = self.command_execution_manager

        for receipt_id in receipt_ids:
            if manager.is_valid_receipt_id(receipt_id):
                receipt = manager.find_by_id(receipt_id)
                if receipt.is_suspended:
                    valid_ids.append(receipt_id)
                    response.add_success(receipt_id)
                else:
                    response.add_error(receipt_id, "command is not suspended")
            else:
                response.add_error(receipt_id, "no such receipt")

        for receipt_id in valid_ids:
            manager.resume(receipt_id, values.get(receipt_id))

        return response

    def request_to_send_SYS_MSG_message(
        self,
        message: str,
        *,
        severity: Severity = Severity.INFO,
        sender: Optional[str] = None,
        timestamp: Optional[int] = None,
    ):
        """Requests the application to send a SYS-MSG message to the connected
        clients with the given message body, severity, sender ID and timestamp.
        The application may send the message immediately or opt to delay it a
        bit in order to ensure that SYS-MSG notifications are not emitted too
        frequently.

        Parameters:
            message: the body of the message
            severity: the severity level of the message
            sender: the ID of the object that the message originates from if
                the server is relaying messages from an object that it manages
                (e.g. an UAV), or `None` if the server sends the message on its
                own
            timestamp: the timestamp of the message; `None` means that the
                timestamp is not relevant and it will be omitted from the
                generated message
        """
        entry = LogMessage(
            message=message, severity=severity, sender=sender, timestamp=timestamp
        )
        self.rate_limiters.request_to_send("SYS-MSG", entry)

    def request_to_send_UAV_INF_message_for(self, uav_ids: Iterable[str]) -> None:
        """Requests the application to send an UAV-INF message that contains
        information regarding the UAVs with the given IDs. The application
        may send the message immediately or opt to delay it a bit in order
        to ensure that UAV-INF notifications are not emitted too frequently.

        Parameters:
            uav_ids: list of UAV IDs
        """
        self.rate_limiters.request_to_send("UAV-INF", uav_ids)

    def prepare(self, config: str | None = None, debug: bool = False) -> int | None:
        self._registry_full_error_counts = Counter()

        self.run_in_background(self.command_execution_manager.run)
        self.run_in_background(self.message_hub.run)
        self.run_in_background(self.rate_limiters.run)

        return super().prepare(config, debug)

    def sort_uavs_by_drivers(
        self, uav_ids: Iterable[str], response: Optional[FlockwaveResponse] = None
    ) -> dict[UAVDriver, list[UAV]]:
        """Given a list of UAV IDs, returns a mapping that maps UAV drivers
        to the UAVs specified by the IDs.

        Parameters:
            uav_ids: list of UAV IDs
            response: optional response in which UAV lookup failures can be
                registered

        Returns:
            mapping of UAV drivers to the UAVs that were selected by the given UAV IDs
        """
        result: defaultdict[UAVDriver, list[UAV]] = defaultdict(list)
        for uav_id in uav_ids:
            uav = self.find_uav_by_id(uav_id, response)
            if uav:
                result[uav.driver].append(uav)
        return result

    def _create_components(self) -> None:
        # Register skybrush.server.ext as an entry point group that is used to
        # discover extensions
        self.extension_manager.module_finder.add_entry_point_group(
            "skybrush.server.ext"
        )

        # Log requests to restart an extension
        self.extension_manager.restart_requested.connect(
            self._on_restart_requested, sender=self.extension_manager
        )

        # Create an object that can be used to get hold of commonly used
        # directories within the app
        self.dirs = AppDirs("Skybrush Server", "CollMot Robotics")

        # Create an object to hold information about all the registered
        # communication channel types that the server can handle
        self.channel_type_registry = ChannelTypeRegistry()

        # Create an object to hold information about all the connected
        # clients that the server can talk to
        self.client_registry = ClientRegistry(self.channel_type_registry)
        self.client_registry.count_changed.connect(
            self._on_client_count_changed, sender=self.client_registry
        )

        # Create an object that keeps track of commands being executed
        # asynchronously on remote UAVs
        self.command_execution_manager = CommandExecutionManager()
        self.command_execution_manager.progress_updated.connect(
            self._on_command_execution_progress_updated,
            sender=self.command_execution_manager,
        )
        self.command_execution_manager.suspended.connect(
            self._on_command_execution_suspended,
            sender=self.command_execution_manager,
        )
        self.command_execution_manager.expired.connect(
            self._on_command_execution_timeout, sender=self.command_execution_manager
        )
        self.command_execution_manager.finished.connect(
            self._on_command_execution_finished, sender=self.command_execution_manager
        )

        # Creates an object to hold information about all the connections
        # to external data sources that the server manages
        self.connection_registry = ConnectionRegistry()
        self.connection_registry.connection_state_changed.connect(
            self._on_connection_state_changed, sender=self.connection_registry
        )
        self.connection_registry.added.connect(
            self._on_connection_added, sender=self.connection_registry
        )
        self.connection_registry.removed.connect(
            self._on_connection_removed, sender=self.connection_registry
        )

        # Create an object that keeps track of registered UAV drivers
        self.uav_driver_registry = UAVDriverRegistry()

        # Create a message hub that will handle incoming and outgoing
        # messages
        self.message_hub = MessageHub()
        self.message_hub.channel_type_registry = self.channel_type_registry
        self.message_hub.client_registry = self.client_registry

        # Create an object that manages rate-limiting for specific types of
        # messages
        self.rate_limiters = RateLimiters(dispatcher=self.message_hub.send_message)
        self.rate_limiters.register(
            "CONN-INF",
            ConnectionStatusMessageRateLimiter(self.create_CONN_INF_message_for),
        )
        self.rate_limiters.register(
            "SYS-MSG", BatchMessageRateLimiter(self.create_SYS_MSG_message_from)
        )
        self.rate_limiters.register(
            "UAV-INF", UAVMessageRateLimiter(self.create_UAV_INF_message_for)
        )

        # Create an object to hold information about all the objects that
        # the server knows about
        self.object_registry = ObjectRegistry()
        self.object_registry.removed.connect(
            self._on_object_removed, sender=self.object_registry
        )

        # Create the global world object
        self.world = World()

        # Create a global device tree and ensure that new UAVs are
        # registered in it
        self.device_tree = DeviceTree()
        self.device_tree.object_registry = self.object_registry

        # Create an object to manage the associations between clients and
        # the device tree paths that the clients are subscribed to
        self.device_tree_subscriptions = DeviceTreeSubscriptionManager(
            self.device_tree,
            client_registry=self.client_registry,
            message_hub=self.message_hub,
        )

        # Ask the extension manager to scan the entry points for user-defined
        # extensions and plugins
        self.extension_manager.rescan()

    def _find_connection_by_id(
        self,
        connection_id: str,
        response: Optional[Union[FlockwaveResponse, FlockwaveNotification]] = None,
    ) -> Optional[ConnectionRegistryEntry]:
        """Finds the connection with the given ID in the connection registry
        or registers a failure in the given response object if there is no
        connection with the given ID.

        Parameters:
            connection_id (str): the ID of the connection to find
            response (Optional[FlockwaveResponse]): the response in which
                the failure can be registered

        Returns:
            Optional[ConnectionRegistryEntry]: the entry in the connection
                registry with the given ID or ``None`` if there is no such
                connection
        """
        return find_in_registry(
            self.connection_registry,
            connection_id,
            response=response,
            failure_reason="No such connection",
        )

    def _find_object_by_id(
        self,
        object_id: str,
        response: Optional[Union[FlockwaveResponse, FlockwaveNotification]] = None,
    ) -> Optional[ModelObject]:
        """Finds the object with the given ID in the object registry or registers
        a failure in the given response object if there is no object with the
        given ID.

        Parameters:
            object_id: the ID of the UAV to find
            response: the response in which the failure can be registered

        Returns:
            the object with the given ID or ``None`` if there is no such object
        """
        return find_in_registry(
            self.object_registry,
            object_id,
            response=response,
            failure_reason="No such object",
        )

    def _on_client_count_changed(self, sender: ClientRegistry) -> None:
        """Handler called when the number of clients attached to the server
        has changed.
        """
        if self.extension_manager:
            self.run_in_background(
                self.extension_manager.set_spinning, self.num_clients > 0
            )

    def _on_connection_state_changed(
        self,
        sender: ConnectionRegistry,
        entry: ConnectionRegistryEntry,
        old_state: ConnectionState,
        new_state: ConnectionState,
    ) -> None:
        """Handler called when the state of a connection changes somewhere
        within the server. Dispatches an appropriate ``CONN-INF`` message.

        Parameters:
            sender (ConnectionRegistry): the connection registry
            entry (ConnectionEntry): a connection entry from the connection
                registry
            old_state (ConnectionState): the old state of the connection
            new_state (ConnectionState): the old state of the connection
        """
        self.rate_limiters.request_to_send("CONN-INF", entry.id, old_state, new_state)

    def _on_command_execution_finished(
        self, sender: CommandExecutionManager, status: CommandExecutionStatus
    ) -> None:
        """Handler called when the execution of a remote asynchronous
        command finished. Dispatches an appropriate ``ASYNC-RESP`` message.

        Parameters:
            sender: the command execution manager
            status: the status object corresponding to the command whose
                execution has just finished.
        """
        body = {"type": "ASYNC-RESP", "id": status.id}

        if status.error:
            body["error"] = (
                str(status.error)
                if not hasattr(status.error, "json")
                else status.error.json  # type: ignore
            )
        else:
            body["result"] = status.result

        message = self.message_hub.create_response_or_notification(body)
        for client_id in status.clients_to_notify:
            self.message_hub.enqueue_message(message, to=client_id)

    def _on_command_execution_progress_updated(
        self, sender: CommandExecutionManager, status: CommandExecutionStatus
    ) -> None:
        """Handler called when the progress of the execution of a remote
        asynchronous command is updated. Dispatches an appropriate
        ``ASYNC-ST`` message.

        Parameters:
            sender: the command execution manager
            status: the status object corresponding to the command whose
                execution has just finished.
        """
        body = {
            "type": "ASYNC-ST",
            "id": status.id,
            "progress": status.progress.json,  # type: ignore
        }
        if status.is_suspended:
            body["suspended"] = True
        message = self.message_hub.create_response_or_notification(body)
        for client_id in status.clients_to_notify:
            self.message_hub.enqueue_message(message, to=client_id)

    _on_command_execution_suspended = _on_command_execution_progress_updated

    def _on_command_execution_timeout(
        self,
        sender: CommandExecutionManager,
        statuses: Iterable[CommandExecutionStatus],
    ) -> None:
        """Handler called when the execution of a remote asynchronous
        command was abandoned with a timeout. Dispatches an appropriate
        ``ASYNC-TIMEOUT`` message.

        Parameters:
            sender: the command execution manager
            statuses: the status objects corresponding to the commands whose
                execution has timed out.
        """
        # Multiple commands may have timed out at the same time, and we
        # need to sort them by the clients that originated these requests
        # so we can dispatch individual ASYNC-TIMEOUT messages to each of
        # them
        receipt_ids_by_clients: defaultdict[str, list[str]] = defaultdict(list)
        for status in statuses:
            receipt_id = status.id
            for client in status.clients_to_notify:
                receipt_ids_by_clients[client].append(receipt_id)

        hub = self.message_hub
        for client, receipt_ids in receipt_ids_by_clients.items():
            body = {"type": "ASYNC-TIMEOUT", "ids": receipt_ids}
            message = hub.create_response_or_notification(body)
            hub.enqueue_message(message, to=client)

    def _on_connection_added(
        self, sender: ConnectionRegistry, entry: ConnectionRegistryEntry
    ) -> None:
        """Handler called when a connection is added to the connection registry.

        Sends a CONN-INF notification to all connected clients so they know that
        the connection was added.

        Parameters:
            sender: the connection registry
            object: the connection that was added
        """
        notification = self.create_CONN_INF_message_for([entry.id])
        self.message_hub.enqueue_message(notification)

    def _on_connection_removed(
        self, sender: ConnectionRegistry, entry: ConnectionRegistryEntry
    ) -> None:
        """Handler called when a connection is removed from the connection
        registry.

        Sends a CONN-DEL notification to all connected clients so they know that
        the connection was removed.

        Parameters:
            sender: the connection registry
            object: the connection that was removed
        """
        notification = self.message_hub.create_response_or_notification(
            {"type": "CONN-DEL", "ids": [entry.id]}
        )
        try:
            self.message_hub.enqueue_message(notification)
        except BrokenResourceError:
            # App is probably shutting down, this is OK.
            pass

    def _on_object_removed(self, sender: ObjectRegistry, object: ModelObject) -> None:
        """Handler called when an object is removed from the object registry.

        Parameters:
            sender: the object registry
            object: the object that was removed
        """
        notification = self.message_hub.create_response_or_notification(
            {"type": "OBJ-DEL", "ids": [object.id]}
        )
        try:
            self.message_hub.enqueue_message(notification)
        except BrokenResourceError:
            # App is probably shutting down, this is OK.
            pass

    def _on_restart_requested(self, sender, name: str) -> None:
        """Handler called when an extension requests the server to restart
        itself.
        """
        self.log.warning(
            "The server should be restarted in order for the changes to take effect",
            extra={"id": name},
        )

    def _process_configuration(self, config: Configuration) -> Optional[int]:
        # Process the configuration options
        cfg = config.get("COMMAND_EXECUTION_MANAGER", {})
        self.command_execution_manager.timeout = cfg.get("timeout", 90)

        # Override the base port if needed
        port_from_env: Optional[str] = environ.get("PORT")
        port: Optional[int] = config.get("PORT")
        if port_from_env:
            try:
                port = int(port_from_env)
            except ValueError:
                pass
        if port is not None:
            set_base_port(port)

        # Force-load the ext_manager and the licensing extension
        cfg = config.setdefault("EXTENSIONS", {})
        cfg["ext_manager"] = {}
        cfg["license"] = {}

    def _setup_app_configurator(self, configurator: AppConfigurator) -> None:
        configurator.key_filter = str.isupper
        configurator.merge_keys = ["EXTENSIONS"]
        configurator.safe = is_packaged()


############################################################################

app = SkybrushServer("skybrush", PACKAGE_NAME)

# ######################################################################## #


@app.message_hub.on("ASYNC-CANCEL")
def handle_ASYNC_CANCEL(message: FlockwaveMessage, sender: Client, hub: MessageHub):
    return app.cancel_async_operations(message.get_ids(), in_response_to=message)


@app.message_hub.on("ASYNC-RESUME")
def handle_ASYNC_RESUME(message: FlockwaveMessage, sender: Client, hub: MessageHub):
    return app.resume_async_operations(
        message.get_ids(), message.body.get("values") or {}, in_response_to=message
    )


@app.message_hub.on("CONN-INF")
def handle_CONN_INF(message: FlockwaveMessage, sender: Client, hub: MessageHub):
    return app.create_CONN_INF_message_for(message.get_ids(), in_response_to=message)


@app.message_hub.on("CONN-LIST")
def handle_CONN_LIST(message: FlockwaveMessage, sender: Client, hub: MessageHub):
    return {"ids": list(app.connection_registry.ids)}


@app.message_hub.on("DEV-INF")
def handle_DEV_INF(message: FlockwaveMessage, sender: Client, hub: MessageHub):
    return app.create_DEV_INF_message_for(message.body["paths"], in_response_to=message)


@app.message_hub.on("DEV-LIST")
def handle_DEV_LIST(message: FlockwaveMessage, sender: Client, hub: MessageHub):
    return app.create_DEV_LIST_message_for(message.get_ids(), in_response_to=message)


@app.message_hub.on("DEV-LISTSUB")
def handle_DEV_LISTSUB(message: FlockwaveMessage, sender: Client, hub: MessageHub):
    return app.create_DEV_LISTSUB_message_for(
        client=sender,
        path_filter=message.body.get("pathFilter", ("/",)),
        in_response_to=message,
    )


@app.message_hub.on("DEV-SUB")
def handle_DEV_SUB(message: FlockwaveMessage, sender: Client, hub: MessageHub):
    return app.create_DEV_SUB_message_for(
        client=sender,
        paths=message.body["paths"],
        lazy=bool(message.body.get("lazy")),
        in_response_to=message,
    )


@app.message_hub.on("DEV-UNSUB")
def handle_DEV_UNSUB(message: FlockwaveMessage, sender: Client, hub: MessageHub):
    return app.create_DEV_UNSUB_message_for(
        client=sender,
        paths=message.body["paths"],
        in_response_to=message,
        remove_all=message.body.get("removeAll", False),
        include_subtrees=message.body.get("includeSubtrees", False),
    )


@app.message_hub.on("OBJ-LIST")
def handle_OBJ_LIST(message: FlockwaveMessage, sender: Client, hub: MessageHub):
    filter = message.body.get("filter")
    if filter is None:
        it = app.object_registry.ids
    else:
        it = app.object_registry.ids_by_types(filter)
    return {"ids": list(it)}


@app.message_hub.on("SYS-PING")
def handle_SYS_PING(message: FlockwaveMessage, sender: Client, hub: MessageHub):
    return hub.acknowledge(message)


@app.message_hub.on("SYS-PORTS")
def handle_SYS_PORTS(message: FlockwaveMessage, sender: Client, hub: MessageHub):
    return {"ports": dict(get_port_map())}


@app.message_hub.on("SYS-TIME")
async def handle_SYS_TIME(message: FlockwaveMessage, sender: Client, hub: MessageHub):
    adjustment = message.body.get("adjustment")
    if adjustment is not None:
        adjustment = float(adjustment)
        allowed, reason = await can_set_system_time_detailed_async()
        if not allowed:
            return hub.acknowledge(
                message, outcome=False, reason=f"Permission denied. {reason}"
            )

        if adjustment != 0:
            # This branch is required so the client can test whether time
            # adjustments are supported by sending an adjustment with zero delta
            adjusted_time_msec = get_system_time_msec() + adjustment
            try:
                await set_system_time_msec_async(adjusted_time_msec)
            except Exception as ex:
                return hub.acknowledge(message, outcome=False, reason=str(ex))

    return {"timestamp": get_system_time_msec()}


@app.message_hub.on("SYS-VER")
def handle_SYS_VER(message: FlockwaveMessage, sender: Client, hub: MessageHub):
    return {"software": "skybrushd", "version": server_version}


@app.message_hub.on("UAV-INF")
def handle_UAV_INF(message: FlockwaveMessage, sender: Client, hub: MessageHub):
    return app.create_UAV_INF_message_for(message.get_ids(), in_response_to=message)


@app.message_hub.on("UAV-LIST")
def handle_UAV_LIST(message: FlockwaveMessage, sender: Client, hub: MessageHub):
    return {"ids": list(app.object_registry.ids_by_type(UAV))}


@app.message_hub.on("LOG-DATA")
async def handle_single_uav_operations(
    message: FlockwaveMessage, sender: Client, hub: MessageHub
):
    if message.get_type() == "LOG-DATA":
        id_property = "uavId"
    else:
        id_property = "id"
    return await app.dispatch_to_uav(message, sender, id_property=id_property)


@app.message_hub.on(
    "LOG-INF",
    "OBJ-CMD",
    "PRM-GET",
    "PRM-SET",
    "PRM-SET-MANY",
    "UAV-CALIB",
    "UAV-FLY",
    "UAV-HALT",
    "UAV-HOVER",
    "UAV-LAND",
    "UAV-MOTOR",
    "UAV-PREFLT",
    "UAV-RST",
    "UAV-RTH",
    "UAV-SLEEP",
    "UAV-SIGNAL",
    "UAV-TAKEOFF",
    "UAV-TEST",
    "UAV-VER",
    "UAV-WAKEUP",
)
async def handle_multi_uav_operations(
    message: FlockwaveMessage, sender: Client, hub: MessageHub
):
    return await app.dispatch_to_uavs(message, sender)


# ######################################################################## #