Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-server / src / flockwave / server / ext / show / takeoff.py
Size: Mime:
from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass
from logging import Logger
from time import time
from trio import (
    TASK_STATUS_IGNORED,
    CapacityLimiter,
    MemorySendChannel,
    current_time,
    open_memory_channel,
    open_nursery,
    sleep,
    TooSlowError,
    WouldBlock,
)
from trio.lowlevel import ParkingLot
from trio_util import periodic
from typing import Generic, Iterator, Optional, TypeVar
from weakref import WeakKeyDictionary

from flockwave.server.model import UAV

from .config import (
    AuthorizationScope,
    DroneShowConfiguration,
    StartMethod,
)

__all__ = ("ScheduledTakeoffManager", "TakeoffConfiguration")


@dataclass
class TakeoffConfiguration:
    """Simple value object that encapsulates an optional desired start time as a UNIX
    timestamp, an authorization scope, and whether the start time should be
    updated on the drones or not.
    """

    authorization_scope: AuthorizationScope
    """The scope of authorization for the start of the show."""

    should_update_takeoff_time: bool = True
    """Whether the start time should be updated on the drones according to the
    `takeoff_time` property of this object.
    """

    takeoff_time: Optional[int] = None
    """The desired takeoff time of the swarm; `None` if the takeoff time should
    be cleared. Ignored if `should_update_takeoff_time` is set to `False`.
    """

    @classmethod
    def from_show_config(
        cls, config: DroneShowConfiguration, start_time: Optional[float]
    ):
        """Returns the desired start time in seconds and the desired state
        of the takeoff authorization flag on all the UAVs.
        """
        # We need to decide whether to set or clear the start time of each drone,
        # and whether to set or clear the authorization flag.
        #
        # The rules are as follows:
        #
        # If the swarm is configured to start automatically
        # =================================================
        #
        # First we check whether there is a configured start time and if so, we
        # forward the start time to the swarm. If there is no configured start
        # time, we attempt to clear the start time on the swarm. At the same
        # time, we check whether the start has been authorized and update the
        # "authorized" flag on the swarm accordingly.
        #
        # If the swarm is configured to start with the RC
        # ===============================================
        #
        # First we check whether the start has been authorized and update the
        # "authorized" flag on the swarm accordingly. If the start has been
        # authorized, we never mess around with the scheduled start time of the
        # drone. If the start has not been authorized, we clear the scheduled
        # start time of the drone.

        desired_auth_scope = config.scope_iff_authorized

        if config.start_method == StartMethod.AUTO:
            if start_time is not None:
                # User has a show clock and the show clock has a scheduled
                # start time so we want to use that
                return cls(
                    takeoff_time=int(start_time), authorization_scope=desired_auth_scope
                )
            else:
                # User has no show clock or the show clock is stopped, so we
                # want to clear what there is on the drone
                return cls(authorization_scope=desired_auth_scope)

        elif config.start_method == StartMethod.RC:
            if desired_auth_scope is not AuthorizationScope.NONE:
                # User authorized the start so we don't mess around with the
                # takeoff time, it is the responsibility of the person holding
                # the RC to set the takeoff time
                return cls(
                    authorization_scope=desired_auth_scope,
                    should_update_takeoff_time=False,
                )
            else:
                # User did not authorize the start yet so the start time must
                # be cleared
                return cls(authorization_scope=desired_auth_scope)

        else:
            return cls(authorization_scope=AuthorizationScope.NONE)

    @property
    def authorized(self) -> bool:
        """Returns whether the takeoff is authorized to start (by any means,
        i.e. lights-only mode is also considered as being authorized).
        """
        return self.authorization_scope is not AuthorizationScope.NONE

    @property
    def is_takeoff_in_the_future(self) -> bool:
        """Returns whether the desired takeoff time is in the future."""
        return self.takeoff_time is not None and self.takeoff_time >= time()

    @property
    def takeoff_time_in_legacy_format(self) -> Optional[int]:
        """Returns the desired takeoff time in the legacy format we used in
        earlier versions of the code.

        Returns:
            -1 if the takeoff time should not be updated, `None` if the takeoff
            time should be cleared, or the real takeoff time otherwise
        """
        return self.takeoff_time if self.should_update_takeoff_time else -1


TUAV = TypeVar("TUAV", bound="UAV")


class ScheduledTakeoffManager(ABC, Generic[TUAV]):
    """Class that manages the automatic takeoff process on a group of drones.

    The class provides the following facilities:

    - maintains a current show configuration and takeoff time
    - broadcasts messages containing the current takeoff time and authorization
      scope periodically (typically at 1 Hz)
    - watches UAVs and sends individual configuration messages to them if they
      did not respond to broadcast requests in time

    This class is meant to serve as a base class for concrete implementations
    in the context of another extension. For instance, in the `mavlink`
    extension, each MAVLink network has an instance of this object that manages
    the takeoff for that given MAVLink network.
    """

    _config: Optional[DroneShowConfiguration] = None
    """The configuration of the show to start, including the start method,
    the clock that the start is synchronized to, the start time according to
    the given clock, and the list of UAVs to start.
    """

    _limiter: CapacityLimiter
    """Capacity limiter that controls how many individual drones we are trying
    to configure at the same time if the broadcast messages did not reach them
    in time.
    """

    _log: Optional[Logger] = None
    """The logger that the takeoff manager uses to log events."""

    _parking_lot: ParkingLot
    """Low-level task coordination primitive that manages the execution of
    background tasks performed by this object.
    """

    _start_time: Optional[float] = None
    """The start time of the show, expressed as the number of seconds since
    the UNIX epoch.
    """

    _uavs_to_update: set[TUAV]
    """Set of UAVs waiting to be updated individually when the start time or the
    start configuration of the show changes.
    """

    _uavs_last_updated_at: WeakKeyDictionary[TUAV, float]
    """Records the timestamps when UAVs were last updated on an individual
    basis. Used for rate-limiting the individual configuration requests.
    """

    unicast_cooldown_period: float = 3
    """Number of seconds to wait between consecutive attempts to configure a
    drone via unicast requests.
    """

    def __init__(
        self,
        *,
        log: Optional[Logger] = None,
        capacity_limiter: Optional[CapacityLimiter] = None,
    ):
        """Constructor.

        Parameters:
            log: the logger to use to log messages from this object
            capacity_limiter: the capacity limiter to use to limit the number of
            concurrent updates
        """
        self._log = log

        self._limiter = capacity_limiter or CapacityLimiter(5)
        self._parking_lot = ParkingLot()
        self._uavs_to_update = set()
        self._uavs_last_updated_at = WeakKeyDictionary()

    @abstractmethod
    async def broadcast_takeoff_configuration(
        self, config: TakeoffConfiguration
    ) -> None:
        """Broadcasts a message that configures the start time and authorization
        scope for all UAVs.

        May be a no-op if broadcasts are not supported; in this case the manager
        will fall back to individual configuration requests.

        Exceptions from this method are caught and ignored by the manager.
        If you want to log them, add your own logging in the implementation of
        this method.
        """
        # packet = create_start_time_configuration_packet(
        #     start_time=config.takeoff_time,
        #     authorization_scope=config.authorization_scope,
        #     should_update_takeoff_time=config.should_update_takeoff_time,
        # )
        # await self._network.broadcast_packet(packet)
        ...

    @abstractmethod
    def iter_uavs_to_schedule(self) -> Iterator[TUAV]:
        """Returns an iterator over the UAVs managed by this object that are
        to be updated on an individual basis if they do not receive the
        broadcast configuration packet or do not respond to it.

        May return an empty iterator if you do not want to support individual
        configuration for the UAVs.
        """
        # for uav in self._network.uavs():
        #     if uav.is_connected and uav.supports_scheduled_takeoff:
        #         yield uav
        ...

    @abstractmethod
    def uav_needs_update(self, uav: TUAV, config: TakeoffConfiguration) -> bool:
        """Returns whether the given UAV needs to be updated if the desired
        takeoff configuration is the one provided as `config`.

        May return False unconditionally if you do not want to support individual
        configuration for the UAVs.

        Args:
            uav: the UAV to check
            config: the desired takeoff configuration to check against
        """
        ...

    @abstractmethod
    async def update_uav(self, uav: TUAV, config: TakeoffConfiguration) -> None:
        """Updates the given UAV with the desired takeoff configuration.

        This method is called by the manager when it needs to update a UAV
        individually. It should not block for too long, as it is called from
        a background task that processes multiple UAVs in parallel.
        """
        ...

    @property
    def config(self) -> Optional[DroneShowConfiguration]:
        return self._config

    def notify_config_changed(self, config: DroneShowConfiguration) -> None:
        """Notifies the manager that the scheduled takeoff configuration has
        changed.
        """
        self._config = config
        self._parking_lot.unpark_all()
        self._trigger_uav_updates_soon()

    def notify_start_time_changed(self, start_time: Optional[float]) -> None:
        """Notifies the manager that the scheduled start time of the show has
        been changed. This is typically a side effect of the user adjusting the
        start time manually, but it may also be related to the adjustment of
        some other clock that the show clock is synchronized to.
        """
        self._start_time = start_time
        self._trigger_uav_updates_soon()

    async def run(self, *, task_status=TASK_STATUS_IGNORED) -> None:
        """Background task that checks the scheduled start times on the UAVs
        of this network regularly and updates them as needed.
        """
        while True:
            try:
                await self._run(task_status=task_status)
            except Exception:
                if self._log:
                    self._log.exception(
                        "Scheduled takeoff manager stopped unexpectedly, restarting..."
                    )

                # Ensure that we call task_status.started() only once
                task_status = TASK_STATUS_IGNORED
                await sleep(0.5)

    async def _run(self, *, task_status=TASK_STATUS_IGNORED) -> None:
        log = self._log

        async with open_nursery() as nursery:
            queue_tx: MemorySendChannel[TUAV] = await nursery.start(
                self._process_uavs_scheduled_for_updates
            )
            task_status.started()

            async with queue_tx:
                async for _ in periodic(1):
                    config = self._config
                    if not config:
                        # No scheduled takeoff config yet, wait for one
                        await self._parking_lot.park()
                        continue

                    # Figure out what the desired takeoff time and the auth flag
                    # should be
                    # TODO(ntamas): this could be cached; the desired takeoff
                    # configuration should depend on 'config' only
                    takeoff_config = TakeoffConfiguration.from_show_config(
                        config, self._start_time
                    )

                    # Broadcast a packet that contains the desired takeoff time
                    # and the auth scope. If it fails, well, it does not matter
                    # because we will check the UAVs one by one as well. We
                    # send this packet only if the start time is in the future
                    # or if the takeoff time was cleared in the configuration.
                    if (
                        takeoff_config.takeoff_time is None
                        or takeoff_config.is_takeoff_in_the_future
                    ):
                        try:
                            await self.broadcast_takeoff_configuration(takeoff_config)
                        except Exception:
                            # Do not blow up if the broadcasting fails for any reason
                            pass

                    # First we scan the _uavs array and find all UAVs that need to
                    # be configured. The actual configuration will take place in a
                    # separate task to ensure that we don't block the entire process
                    # with a single UAV that takes too much time to configure
                    for uav in self.iter_uavs_to_schedule():
                        if uav in self._uavs_to_update:
                            # An update is already scheduled for this UAV so
                            # we can skip it for now
                            continue

                        if not self.uav_needs_update(uav, takeoff_config):
                            # UAV does not need an update
                            continue

                        timestamp = self._uavs_last_updated_at.get(uav)
                        if (
                            timestamp
                            and current_time() - timestamp
                            < self.unicast_cooldown_period
                        ):
                            # We have tried updating this UAV recently so
                            # let's wait a bit more
                            continue

                        try:
                            queue_tx.send_nowait(uav)
                        except WouldBlock:
                            # Okay, doesn't matter, we'll try again in the next
                            # iteration
                            if log:
                                log.warning(
                                    "Cannot schedule UAV for an update, will try later",
                                    extra={"id": uav.id},
                                )
                        else:
                            self._uavs_to_update.add(uav)

    async def _process_uavs_scheduled_for_updates(
        self, *, task_status=TASK_STATUS_IGNORED
    ) -> None:
        """Task that reads the queue in which we put the UAVs scheduled for an
        update and processes them one by one by spawning further background
        tasks for it.
        """
        queue_tx, queue_rx = open_memory_channel[TUAV](1024)
        async with open_nursery() as nursery:
            async with queue_rx:
                task_status.started(queue_tx)
                async for uav in queue_rx:
                    nursery.start_soon(
                        self._process_single_uav_scheduled_for_update, uav
                    )

    async def _process_single_uav_scheduled_for_update(self, uav: TUAV) -> None:
        """Background task updates the desired start time and automatic takeoff
        authorization on a single UAV.

        Many of these tasks may be executed in parallel when we are configuring
        UAVs.

        Parameters:
            uav: the UAV to configure
        """
        assert self._config is not None

        try:
            async with self._limiter:
                log = self._log
                if log:
                    log.debug(f"Updating takeoff configuration of {uav.id}")

                takeoff_config = TakeoffConfiguration.from_show_config(
                    self._config, self._start_time
                )
                await self.update_uav(uav, takeoff_config)
        except TooSlowError:
            log = self._log
            if log:
                log.warning(
                    f"UAV {uav.id} did not respond to a takeoff configuration request"
                )
        except Exception:
            log = self._log
            if log:
                log.exception(
                    f"Unexpected exception while updating takeoff configuration on UAV {uav.id}"
                )
        finally:
            # Remember that we sent a command to update the takeoff configuration
            # on this UAV and that it was sent successfully so we don't try it
            # again in the next few seconds even if the status of the UAV is
            # not updated yet from another status packet
            self._uavs_last_updated_at[uav] = current_time()

            try:
                self._uavs_to_update.remove(uav)
            except KeyError:
                log = self._log
                if log:
                    log.warning(
                        f"UAV {uav.id} missing from _uavs_to_update, might be a bug"
                    )

    def _trigger_uav_updates_soon(self) -> None:
        """Ensures that new configurations get propagated to the UAVs as
        soon as possible.

        This is achieved by clearing the "last updated" timestamps of the UAVs;
        otherwise the manager would not try to update a UAV if it was updated
        recently in the last three seconds.
        """
        self._uavs_last_updated_at.clear()