Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-server / server / ext / crazyflie / driver.py
Size: Mime:
"""Driver class for Crazyflie drones."""

from __future__ import annotations

from collections import defaultdict
from colour import Color
from contextlib import asynccontextmanager, AsyncExitStack
from errno import EIO
from functools import partial
from logging import Logger
from math import ceil, hypot
from pathlib import Path
from random import random
from struct import Struct
from trio import Nursery, open_nursery, sleep
from trio_util import periodic
from typing import (
    Any,
    AsyncIterator,
    Callable,
    Iterable,
    Optional,
    Sequence,
    TYPE_CHECKING,
    cast,
)

from aiocflib.crazyflie import Crazyflie
from aiocflib.crtp.crtpstack import MemoryType
from aiocflib.crazyflie.high_level_commander import TrajectoryType
from aiocflib.crazyflie.log import LogSession
from aiocflib.crazyflie.mem import write_with_checksum
from aiocflib.errors import TimeoutError

from flockwave.gps.vectors import PositionXYZ, VelocityXYZ
from flockwave.server.command_handlers import (
    create_color_command_handler,
    create_parameter_command_handler,
    create_test_command_handler,
    create_version_command_handler,
)
from flockwave.server.errors import NotSupportedError
from flockwave.server.model.preflight import PreflightCheckInfo, PreflightCheckResult
from flockwave.server.model.transport import TransportOptions
from flockwave.server.model.uav import BatteryInfo, UAVBase, UAVDriver, VersionInfo
from flockwave.server.registries.errors import RegistryFull
from flockwave.server.show import (
    get_group_index_from_show_specification,
    get_home_position_from_show_specification,
    get_light_program_from_show_specification,
    get_trajectory_from_show_specification,
    TrajectorySpecification,
)
from flockwave.server.types import GCSLogMessageSender
from flockwave.server.utils import color_to_rgb8_triplet, nop, optional_float
from flockwave.spec.errors import FlockwaveErrorCode
from flockwave.spec.ids import make_valid_object_id

from .crtp_extensions import (
    DRONE_SHOW_PORT,
    DroneShowCommand,
    DroneShowExecutionStage,
    DroneShowStatus,
    GCSLightEffectType,
    LightProgramLocation,
    LightProgramType,
    PreflightCheckStatus,
)
from .fence import Fence, FenceConfiguration
from .trajectory import encode_trajectory, TrajectoryEncoding
from .types import ControllerType

if TYPE_CHECKING:
    from flockwave.server.app import SkybrushServer

__all__ = ("CrazyflieDriver",)


class CrazyflieDriver(UAVDriver["CrazyflieUAV"]):
    """Driver class for Crazyflie drones.

    Attributes:
        app (SkybrushServer): the app in which the driver lives
        fence_config: configuration of the safety fence to apply on drones
            managed by this driver
        id_format: Python format string that receives a numeric drone ID in the
            flock and returns its preferred formatted identifier that is used
            when the drone is registered in the server, or any other object that
            has a ``format()`` method accepting a single integer as an argument
            and returning the preferred UAV identifier
        preferred_controller: controller type to set on a drone during a show
            upload; ``None`` if no change is needed
        status_interval: number of seconds that should pass between consecutive
            status requests sent to a drone
        takeoff_altitude: altitude that a UAV should take off to when receiving
            a takeoff command
    """

    app: "SkybrushServer"
    debug: bool
    id_format: str
    log: Logger
    fence_config: FenceConfiguration
    preferred_controller: Optional[ControllerType]
    status_interval: float = 0.5
    takeoff_altitude: float = 1
    use_test_mode: bool = False

    _cache_folder: Optional[str]
    _address_space_by_uav_id: dict[str, Any]
    _uav_ids_by_address_space: dict[Any, dict[int, str]]

    def __init__(
        self,
        app=None,
        id_format: str = "{0:02}",
        cache: Optional[Path] = None,
    ):
        """Constructor.

        Parameters:
            app (SkybrushServer): the app in which the driver lives
            debug (bool): whether to log the incoming and outgoing messages of
                each drone created by the driver
            id_format: the format of the UAV IDs used by this driver.
                See the class documentation for more details.
            cache: optional cache folder that the driver can use to store the
                parameter and log TOCs of the Crazyflie drones that it encounters
        """
        super().__init__()

        self.app = app  # type: ignore
        self.debug = False
        self.fence_config = FenceConfiguration()
        self.id_format = id_format
        self.preferred_controller = None
        self.use_test_mode = False

        self._cache_folder = str(cache.resolve()) if cache else None
        self._address_space_by_uav_id = {}
        self._uav_ids_by_address_space = defaultdict(dict)

    def _create_uav(self, formatted_id: str) -> "CrazyflieUAV":
        """Creates a new UAV that is to be managed by this driver.

        Parameters:
            formatted_id: the formatted string identifier of the UAV
                to create

        Returns:
            an appropriate UAV object
        """
        uav = CrazyflieUAV(formatted_id, driver=self)
        uav.notify_updated = partial(
            self.app.request_to_send_UAV_INF_message_for, [formatted_id]
        )
        uav.send_log_message_to_gcs = partial(
            self.app.request_to_send_SYS_MSG_message, sender=uav.id
        )
        return uav

    @property
    def cache_folder(self) -> Optional[str]:
        """Returns the full path to a folder where the driver can store
        the parameter TOC files of the Crazyflie drones that it sees.
        """
        return self._cache_folder

    def get_or_create_uav(self, address_space, index: int) -> Optional["CrazyflieUAV"]:
        """Retrieves the UAV with the given index in the given address space
        or creates one if the driver has not seen a UAV with the given index in
        the given address space yet.

        Parameters:
            address_space: the address space
            index: the index of the address within the address space

        Returns:
            an appropriate UAV object or `None` if the UAV object cannot be
            added to the object registry due to the registry being full
        """
        uav_id_map = self._uav_ids_by_address_space.get(address_space)
        formatted_id = uav_id_map.get(index) if uav_id_map else None
        if formatted_id is None:
            formatted_id = make_valid_object_id(
                self.id_format.format(index, address_space)
            )
            self._uav_ids_by_address_space[address_space][index] = formatted_id
            self._address_space_by_uav_id[formatted_id] = address_space

        try:
            uav = cast(
                Any,
                self.app.object_registry.add_if_missing(
                    formatted_id, factory=self._create_uav
                ),
            )
        except RegistryFull:
            self.app.handle_registry_full_error(self, "Crazyflie UAV")
            return None

        if uav.uri is None:
            uav.uri = address_space[index]

        return uav

    async def handle_command_alt(
        self,
        uav: "CrazyflieUAV",
        z: Optional[str] = None,
    ):
        """Command that sends the UAV to a given altitude."""
        try:
            z_num = optional_float(z)
        except ValueError:
            raise RuntimeError("Invalid number found in input") from None

        if z_num is None:
            current = uav.status.position_xyz
            if current is None:
                raise RuntimeError("UAV has no known position yet")
            return f"Current altitude is {current.z:.2f} m"
        else:
            x_num, y_num, z_num = await uav.go_to(None, None, z_num)
            return f"Target set to ({x_num:.2f}, {y_num:.2f}, {z_num:.2f}) m"

    async def handle_command_fence(
        self,
        uav: "CrazyflieUAV",
        subcommand: Optional[str] = None,
        x_min: Optional[str] = None,
        y_min: Optional[str] = None,
        z_min: Optional[str] = None,
        x_max: Optional[str] = None,
        y_max: Optional[str] = None,
        z_max: Optional[str] = None,
    ):
        """Command that activates or deactivates the geofence on the UAV, or
        sets up a geofence based on an axis-aligned bounding box.
        """
        if subcommand is None:
            subcommand = "get"

        subcommand = subcommand.lower()
        fence = uav.fence

        if fence is None:
            raise RuntimeError("Drone has no fence object; this is probably a bug.")

        if subcommand == "get":
            enabled = await fence.is_enabled()
            return "Fence is active" if enabled else "Fence is disabled"
        elif subcommand == "on":
            await fence.set_enabled(True)
            return "Fence activated"
        elif subcommand == "off":
            await fence.set_enabled(False)
            return "Fence disabled"
        elif subcommand == "set":
            try:
                bounds = [float(x) for x in (x_min, y_min, z_min, x_max, y_max, z_max)]  # type: ignore
            except (TypeError, ValueError):
                raise RuntimeError(
                    "Invalid fence coordinates; expected xMin, yMin, zMin, "
                    "xMax, yMax, zMax, separated by spaces"
                ) from None

            await fence.set_axis_aligned_bounding_box(bounds[:3], bounds[3:])
            return "Fence activated"
        else:
            raise RuntimeError(f"Unknown subcommand: {subcommand}")

    async def handle_command_go(
        self,
        uav: "CrazyflieUAV",
        x: Optional[str] = None,
        y: Optional[str] = None,
        z: Optional[str] = None,
    ):
        """Command that sends the UAV to a given coordinate."""
        if x is None and y is None and z is None:
            raise RuntimeError(
                "You need to specify the target coordinate in X-Y-Z format"
            )

        try:
            coords = optional_float(x), optional_float(y), optional_float(z)
        except ValueError:
            raise RuntimeError("Invalid number found in input") from None

        x_num, y_num, z_num = coords
        x_num, y_num, z_num = await uav.go_to(x_num, y_num, z_num)
        return f"Target set to ({x_num:.2f}, {y_num:.2f}, {z_num:.2f}) m"

    async def handle_command_home(self, uav: "CrazyflieUAV"):
        """Command that retrieves the current home position of the UAV."""
        home = await uav.get_home_position()
        if home is None:
            return "No home position yet"
        else:
            x, y, z = home
            return f"Home: [{x:.2f}, {y:.2f}, {z:.2f}] m"

    async def handle_command_kalman(
        self, uav: "CrazyflieUAV", command: Optional[str] = None
    ) -> str:
        if command is None:
            return "Run 'kalman reset' to reset the Kalman filter"
        elif command == "reset":
            await uav.set_parameter("kalman.resetEstimation", 1)
            return "Kalman filter reset successfully"
        else:
            raise RuntimeError(f"Unknown command: {command}")

    async def handle_command_land(self, uav: "CrazyflieUAV") -> str:
        await uav.land()
        return "Land command sent successfully"

    async def handle_command_rush(
        self,
        uav: "CrazyflieUAV",
        x: Optional[str] = None,
        y: Optional[str] = None,
        z: Optional[str] = None,
        multiplier: Optional[str] = None,
    ):
        """Temporary command for Nina's project that sends the UAV to a given
        coordinate _very_ quickly."""
        if x is None and y is None and z is None:
            raise RuntimeError(
                "You need to specify the target coordinate in X-Y-Z format"
            )

        try:
            coords = optional_float(x), optional_float(y), optional_float(z)
            multiplier_num = optional_float(multiplier)
        except ValueError:
            raise RuntimeError("Invalid number found in input") from None

        if multiplier_num is None:
            multiplier_num = 1

        x_num, y_num, z_num = coords
        velocity_xy, velocity_z = 4, 1
        velocity_xy *= multiplier_num
        velocity_z *= multiplier_num
        x_num, y_num, z_num = await uav.go_to(
            x_num, y_num, z_num, velocity_xy, velocity_z, min_travel_time=0.2
        )

        return f"Target set to ({x_num:.2f}, {y_num:.2f}, {z_num:.2f}) m"

    async def handle_command_show(
        self, uav: "CrazyflieUAV", command: Optional[str] = None
    ) -> str:
        if command is None:
            raise RuntimeError(
                "Missing subcommand; run 'show remove' to remove the current show."
            )
        elif command in ("clear", "remove"):
            if uav.has_previously_uploaded_show:
                uav.forget_last_uploaded_show()
                await uav.reboot()
                return "Last uploaded show cleared, drone rebooted."
            else:
                return "No show was recently uploaded to this drone."
        else:
            raise RuntimeError(f"Unknown subcommand: {command!r}")

    async def handle_command_stop(self, uav: "CrazyflieUAV") -> str:
        """Stops the motors of the UAV immediately."""
        await uav.stop()
        return "Motor stop signal sent"

    async def handle_command_trick(self, uav: "CrazyflieUAV", *params: str) -> str:
        """Non-public command used for Nina's show as a last resort hack to send
        the drone through the ceiling if the high-level commander is not suitable
        for the task.
        """
        z_velocity = 1
        distance = 3
        hover_time = 5

        if len(params) > 0:
            z_velocity = max(0.2, float(params[0]))
        if len(params) > 1:
            distance = max(0.2, float(params[1]))
        if len(params) > 2:
            hover_time = float(params[2])

        # Nina's shoot-the-drone-through-the-ceiling trick
        await uav.perform_nina_trick(
            z_velocity=z_velocity, distance=distance, hover_time=hover_time
        )

        return (
            f"Velocity = {z_velocity} m/s, distance = {distance} m, "
            f"hover time = {hover_time} s"
        )

    async def handle_command___show_upload(self, uav: "CrazyflieUAV", *, show):
        """Handles a drone show upload request for the given UAV.

        This is a temporary solution until we figure out something that is
        more sustainable in the long run.

        Parameters:
            show: the show data
        """
        await uav.upload_show(show, remember=True)
        if self.preferred_controller is not None:
            await uav.set_parameter(
                "stabilizer.controller", int(self.preferred_controller)
            )

    handle_command_color = create_color_command_handler()
    handle_command_motoroff = handle_command_stop
    handle_command_param = create_parameter_command_handler()
    handle_command_test = create_test_command_handler(("battery", "motor", "led"))
    handle_command_version = create_version_command_handler()

    def sort_uav_ids_by_address_spaces(
        self, ids: Iterable[str]
    ) -> dict[Any, list[str]]:
        """Given a list of UAV IDs, returns a dictionary that maps address
        spaces to the UAV IDs accessible through these address spaces.
        """
        result: dict[Any, list[str]] = defaultdict(list)
        for uav_id in ids:
            address_space = self._address_space_by_uav_id.get(uav_id)
            if address_space is not None:
                result[address_space].append(uav_id)
        return dict(result)

    async def _enter_low_power_mode_single(
        self, uav: "CrazyflieUAV", *, transport: Optional[TransportOptions]
    ) -> None:
        await uav.enter_low_power_mode()

    async def _resume_from_low_power_mode_single(
        self, uav: "CrazyflieUAV", *, transport: Optional[TransportOptions]
    ) -> None:
        await uav.resume_from_low_power_mode()

    def _request_preflight_report_single(
        self, uav: "CrazyflieUAV"
    ) -> PreflightCheckInfo:
        return uav.preflight_status

    async def _request_version_info_single(self, uav: "CrazyflieUAV") -> VersionInfo:
        return await uav.get_version_info()

    async def _send_landing_signal_single(
        self, uav: "CrazyflieUAV", *, transport: Optional[TransportOptions]
    ) -> None:
        if uav.is_in_drone_show_mode:
            await uav.stop_drone_show()
        else:
            await uav.land()

    async def _send_light_or_sound_emission_signal_single(
        self,
        uav: "CrazyflieUAV",
        signals,
        duration,
        *,
        transport: Optional[TransportOptions],
    ) -> None:
        if "light" in signals:
            await uav.emit_light_signal()

    async def _send_motor_start_stop_signal_single(
        self,
        uav: "CrazyflieUAV",
        start: bool,
        force: bool,
        *,
        transport: Optional[TransportOptions],
    ) -> None:
        if start:
            await uav.arm(force=force)
        else:
            await uav.disarm(force=force)

    async def _send_reset_signal_single(
        self,
        uav: "CrazyflieUAV",
        component: str,
        *,
        transport: Optional[TransportOptions],
    ) -> None:
        if not component:
            # Resetting the whole UAV, this is supported
            # TODO(ntamas): log blocks have to be re-configured after reboot
            await uav.reboot()
        else:
            # No component resets are implemented on this UAV yet
            raise RuntimeError(f"Resetting {component!r} is not supported")

    async def _send_shutdown_signal_single(
        self, uav: "CrazyflieUAV", *, transport: Optional[TransportOptions]
    ) -> None:
        await uav.shutdown()

    async def _send_takeoff_signal_single(
        self, uav, *, scheduled: bool = False, transport: Optional[TransportOptions]
    ) -> None:
        if scheduled:
            # Handled by a broadcast signal in the extension class
            return

        if uav.is_in_drone_show_mode:
            await uav.start_drone_show()
        else:
            await uav.takeoff(altitude=self.takeoff_altitude, relative=True)

    async def _set_parameter_single(
        self, uav: "CrazyflieUAV", name: str, value: Any
    ) -> None:
        try:
            value_as_float = float(value)
        except ValueError:
            raise RuntimeError("parameter value must be numeric") from None
        await uav.set_parameter(name, value_as_float)


class CrazyflieUAV(UAVBase):
    """Subclass for UAVs created by the driver for Crazyflie drones.

    Attributes:
        uri: the Crazyflie URI of the drone
    """

    _preflight_result_map = [
        PreflightCheckResult.OFF,
        PreflightCheckResult.FAILURE,
        PreflightCheckResult.RUNNING,
        PreflightCheckResult.PASS,
        PreflightCheckResult.SOFT_FAILURE,
    ]

    driver: CrazyflieDriver
    notify_updated: Callable[[], None]
    notify_shutdown_suspend_or_reboot: Callable[[], None]
    send_log_message_to_gcs: GCSLogMessageSender
    uri: Optional[str]

    _airborne: bool
    _armed: bool
    _crazyflie: Optional[Crazyflie]
    _fence: Optional[Fence]
    _fence_breached: bool
    _log_session: Optional[LogSession]

    def __init__(self, *args, **kwds):
        super().__init__(*args, **kwds)

        self.uri = None
        self.notify_updated = nop
        self.notify_shutdown_suspend_or_reboot = nop
        self.send_log_message_to_gcs = nop

        self._crazyflie = None
        self._fence = None
        self._log_session = None
        self._last_uploaded_show = None

        self._reset_status_variables()

    def _get_crazyflie(self) -> Crazyflie:
        """Returns the internal Crazyflie_ object that represents the connection
        to the drone.

        Raises:
            RuntimeError: if the connection to the Crazyflie is not established
        """
        if self._crazyflie is None:
            raise RuntimeError(f"Not connected to the Crazyflie drone at {self.uri}")
        return self._crazyflie

    @property
    def fence(self) -> Optional[Fence]:
        return self._fence

    async def arm(self, force: bool = False) -> None:
        """Arms the motors of the Crazyflie."""
        await self._get_crazyflie().run_command(
            port=DRONE_SHOW_PORT,
            command=DroneShowCommand.ARM_OR_DISARM,
            data=Struct("<B").pack(3 if force else 1),
        )

    async def disarm(self, force: bool = False) -> None:
        """Disarms or force-disarms the motors of the Crazyflie."""
        await self._get_crazyflie().run_command(
            port=DRONE_SHOW_PORT,
            command=DroneShowCommand.ARM_OR_DISARM,
            data=Struct("<B").pack(2 if force else 0),
        )

    async def emit_light_signal(self) -> None:
        """Asks the UAV to emit a visible light signal from its LED ring to
        attract attention.
        """
        await self._get_crazyflie().led_ring.flash()

    async def enter_low_power_mode(self) -> None:
        """Sends the UAV to a low-power mode where only the radio chip is
        listening for incoming packets.
        """
        await self._get_crazyflie().suspend()
        self.notify_shutdown_suspend_or_reboot()

    def forget_last_uploaded_show(self) -> None:
        """Forgets the last uploaded show to this drone so it does not get
        re-uploaded if the drone is rebooted.
        """
        self._last_uploaded_show = None

    async def get_home_position(self) -> Optional[tuple[float, float, float]]:
        """Returns the current home position of the UAV."""
        x = await self.get_parameter("preflight.homeX", fetch=True)
        y = await self.get_parameter("preflight.homeY", fetch=True)
        z = await self.get_parameter("preflight.homeZ", fetch=True)
        if not x and not y and z <= -10000:
            return None
        else:
            return x, y, z

    async def get_parameter(self, name: str, fetch: bool = False) -> float:
        """Returns the value of a parameter from the Crazyflie."""
        return await self._get_crazyflie().param.get(name, fetch=fetch)

    async def get_version_info(self) -> VersionInfo:
        cf = self._get_crazyflie()
        version = await cf.platform.get_firmware_version()
        revision = await cf.platform.get_firmware_revision()
        return {"firmware": version or "", "revision": revision or ""}

    async def go_to(
        self,
        x: Optional[float] = None,
        y: Optional[float] = None,
        z: Optional[float] = None,
        velocity_xy: float = 2,
        velocity_z: float = 0.5,
        min_travel_time: float = 1,
    ) -> tuple[float, float, float]:
        """Sends the UAV to a given coordinate.

        Parameters:
            x: the X coordinate of the target; ``None`` means to use the current
                X coordinate
            y: the Y coordinate of the target; ``None`` means to use the current
                Y coordinate
            z: the Z coordinate of the target; ``None`` means to use the current
                Z coordinate
            velocity_xy: maximum allowed horizontal velocity, in m/s
            velocity_z: maximum allowed vertical velocity, in m/s
            min_travel_time: minimum travel time; useful for very small changes
                in the target position
        """
        current = self.status.position_xyz
        if current is None:
            raise RuntimeError("UAV has no known position yet")

        cf = self._get_crazyflie()

        target_x = current.x if x is None else x
        target_y = current.y if y is None else y
        target_z = current.z if z is None else z

        dx, dy, dz = target_x - current.x, target_y - current.y, target_z - current.z
        travel_time = max(
            min_travel_time, hypot(dx, dy) / velocity_xy, abs(dz) / velocity_z
        )

        # TODO(ntamas): keep current yaw!
        await cf.high_level_commander.go_to(
            target_x, target_y, target_z, yaw=0, duration=travel_time
        )

        return target_x, target_y, target_z

    @property
    def has_previously_uploaded_show(self) -> bool:
        """Returns whether the UAV knows about a show that was already uploaded
        to it at least once, possibly during a previous boot.
        """
        return self._last_uploaded_show is not None

    @property
    def is_in_drone_show_mode(self) -> bool:
        """Returns whether the UAV is in drone show mode."""
        return self._status.mode == "show"

    @property
    def is_running_show(self) -> bool:
        """Returns whether the UAV is currently executing a show."""
        return not self._show_execution_stage.is_idle

    async def land(self, altitude: float = 0.0, velocity: float = 0.5):
        """Initiates a landing to the given altitude (absolute or relative).

        Parameters:
            altitude: the altitude to reach at the end of the landing operation,
                in meters
            velocity: the desired takeoff velocity, in meters per second
        """
        # Let's try to make sure that the transition takes at least three
        # seconds. Otherwise, when landing from a low altitude (say, 30 cm),
        # the transition duration will be too low, the drone won't have time
        # to speed up, and then it will cut power to the motors mid-air.
        if altitude < self._position.z:
            delta_height = self._position.z - altitude
            dt = delta_height / velocity
            if dt < 3:
                dt = 3
                velocity = delta_height / dt
                if velocity < 0.1:
                    velocity = 0.1

        await self._get_crazyflie().high_level_commander.land(
            altitude, velocity=velocity
        )

    @property
    def last_uploaded_show(self):
        """Reference to the last show data that was uploaded to this Crazyflie,
        even if it was rebooted in the meanwhile.
        """
        return self._last_uploaded_show

    @property
    def log_session(self) -> Optional[LogSession]:
        """Returns the logging session that the Crazyflie currently uses."""
        return self._log_session

    @property
    def preflight_status(self) -> PreflightCheckInfo:
        return self._preflight_status

    async def perform_nina_trick(
        self, z_velocity: float = 1, distance: float = 3, hover_time: float = 5
    ) -> None:
        """Hacky implementation of Nina's send-the-drone-through-the-ceiling
        trick.

        Parameters:
            z_velocity: vertical velocity of the drone during ascent, in
                meters per seocnd
            distance: distance to travel during ascent, in meters. The duration
                of the ascent will be derived from the Z velocity and the distance
            hover_time: hover time _after_ the ascent and _before_ the motors are
                turned off, in seconds
        """
        cf = self._get_crazyflie()
        dt = 0.1

        # Disable the high-level commander so the show subsystem knows that we
        # have intervened. Try this multiple times in case the connection is
        # flaky.
        tries = 5
        while True:
            try:
                await cf.high_level_commander.disable()
            except Exception:
                if tries > 0:
                    tries -= 1
                else:
                    raise
            else:
                break

        # Perform the upward ascent
        duration = distance / z_velocity
        iterations = int(ceil(duration / dt))
        for _ in range(iterations):
            try:
                await cf.commander.send_altitude_hold_setpoint(z_velocity=z_velocity)
            except Exception:
                pass
            await sleep(dt)

        # Wait and hover a bit until the trap door is closed
        iterations = int(ceil(hover_time / dt))
        for _ in range(iterations):
            try:
                await cf.commander.send_altitude_hold_setpoint(z_velocity=0)
            except Exception:
                pass
            await sleep(dt)

        # Stop the motors and prevent them from starting again by pretending
        # that the Crazyflie has flipped
        tries = 20
        while True:
            try:
                await cf.commander.send_stop_setpoint()
                await cf.param.set("stabilizer.stop", 1)
            except Exception:
                if tries > 0:
                    tries -= 1
                else:
                    raise
            else:
                break

    async def process_console_messages(self) -> None:
        """Runs a task that processes incoming console messages and forwards
        them to the logger of the extension.
        """
        extra = {"id": self.id}
        console = self._get_crazyflie().console
        async for message in console.messages():
            self.driver.log.info(message, extra=extra)
            self.send_log_message_to_gcs(message)

    async def process_drone_show_status_messages(self, period: float = 0.5) -> None:
        """Runs a task that requests a drone show related status report from
        the Crazyflie drone repeatedly.

        Parameters:
            period: the number of seconds elapsed between consecutive status
                requests, in seconds
        """
        await sleep(random() * period)
        async for _ in periodic(period):
            cf = self._get_crazyflie()
            try:
                status = await cf.run_command(
                    port=DRONE_SHOW_PORT, command=DroneShowCommand.STATUS
                )
                status = DroneShowStatus.from_bytes(status)
            except TimeoutError:
                status = None
            except Exception:
                self.driver.log.warning(
                    "Malformed drone show status packet received, ignoring"
                )
                status = None

            if status:
                message = status.show_execution_stage.get_short_explanation()
                if status.testing:
                    message = f"(test) {message}"
                message = message.encode("utf-8")

                self._airborne = status.airborne
                self._armed = status.armed
                self._battery.charging = status.charging
                self._battery.voltage = status.battery_voltage
                self._battery.percentage = status.battery_percentage
                self._fence_breached = status.fence_breached
                self._position.update(
                    x=status.position[0], y=status.position[1], z=status.position[2]
                )
                self._show_execution_stage = status.show_execution_stage
                self._update_preflight_status_from_result_codes(status.preflight_checks)
                self._update_error_codes()
                self.update_status(
                    battery=self._battery,
                    mode=status.mode,
                    light=status.light,
                    position_xyz=self._position,
                    debug=message,
                    heading=status.yaw / 10,
                )
                self.notify_updated()

    async def process_log_messages(self) -> None:
        """Runs a task that processes incoming log messages and calls the
        appropriate log message handlers.
        """
        assert self._log_session is not None
        await self._log_session.process_messages()

    async def reboot(self):
        """Reboots the UAV."""
        await self._get_crazyflie().reboot()
        self.notify_shutdown_suspend_or_reboot()

    async def resume_from_low_power_mode(self) -> None:
        """Wakes up the UAV if it has been sent to a low-power mode earlier."""
        await self._get_crazyflie().resume()

    async def reupload_last_show(self) -> None:
        """Uploads the last show that was uploaded to this drone again."""
        if self.last_uploaded_show:
            await self.upload_show(self.last_uploaded_show, remember=True)

    async def run(self, disposer: Optional[Callable[[], None]] = None):
        """Starts the main message handler task of the UAV."""
        try:
            await CrazyflieHandlerTask(
                self,
                debug=self.driver.debug,
                log=self.driver.log,
                status_interval=self.driver.status_interval,
            ).run()
        finally:
            if disposer:
                disposer()

    async def set_parameter(self, name: str, value: float) -> None:
        """Sets the value of a parameter on the Crazyflie."""
        # If the value is also an integer, cast it into an integer. This is
        # because aiocflib is totally happy with sending a float parameter when
        # you provide it as an int, but not the other way round
        if isinstance(value, float) and value.is_integer():
            value_as_int_or_float = int(value)
        else:
            value_as_int_or_float = value
        await self._get_crazyflie().param.set(name, value_as_int_or_float)

    @asynccontextmanager
    async def set_and_restore_parameter(
        self, name: str, value: float
    ) -> AsyncIterator[None]:
        """Context manager that sets the value of a parameter on the UAV upon
        entering the context and resets it upon exiting.
        """
        async with self._get_crazyflie().param.set_and_restore(name, value):
            yield

    async def set_home_position(
        self, pos: Optional[tuple[float, float, float]]
    ) -> None:
        """Sets or clears the home position of the UAV.

        Parameters:
            pos: the home position of the UAV, in the local coordinate system.
                Units are in meters. `None` means to clear the home position.
        """
        x, y, z = (0, 0, -10000) if pos is None else pos
        await self.set_parameter("preflight.homeX", x)
        await self.set_parameter("preflight.homeY", y)
        await self.set_parameter("preflight.homeZ", z)

    async def set_led_color(self, color: Optional[Color] = None):
        """Overrides the color of the LED ring of the UAV.

        Parameters:
            color: the color to apply; `None` turns off the override.
        """
        if color is not None:
            red, green, blue = color_to_rgb8_triplet(color)
        else:
            red, green, blue = 0, 0, 0

        await self._get_crazyflie().run_command(
            port=DRONE_SHOW_PORT,
            command=DroneShowCommand.TRIGGER_GCS_LIGHT_EFFECT,
            data=Struct("<BBBB").pack(
                (
                    GCSLightEffectType.SOLID if color else GCSLightEffectType.OFF
                ),  # effect ID
                red,
                green,
                blue,
            ),
        )

    async def stop(self) -> None:
        """Stops the motors of the UAV immediately."""
        cf = self._get_crazyflie()
        await cf.commander.stop()
        await cf.high_level_commander.stop()

    async def shutdown(self) -> None:
        """Shuts down the UAV."""
        await self._get_crazyflie().shutdown()
        self.notify_shutdown_suspend_or_reboot()

    async def start_drone_show(self, delay: float = 0):
        """Instructs the UAV to start the pre-programmed drone show in drone
        show mode. Assumes that the UAV is already in drone show mode; it will
        _not_ attempt to switch the mode.
        """
        if delay > 0:
            delay = int(delay * 1000)
            if abs(delay) >= 32000:
                raise RuntimeError("Maximum allowed delay is 32 seconds")
            data = Struct("<h").pack(delay)
        else:
            data = None

        await self._get_crazyflie().run_command(
            port=DRONE_SHOW_PORT, command=DroneShowCommand.START, data=data
        )

    async def stop_drone_show(self):
        """Instructs the UAV to stop the pre-programmed drone show in drone
        show mode. Assumes that the UAV is already in drone show mode; it will
        _not_ attempt to switch the mode.
        """
        await self._get_crazyflie().run_command(
            port=DRONE_SHOW_PORT, command=DroneShowCommand.STOP
        )

    async def takeoff(
        self, altitude: float = 1.0, relative: bool = False, velocity: float = 0.5
    ):
        """Initiates a takeoff to the given altitude (absolute or relative).

        Parameters:
            altitude: the altitude to reach at the end of the takeoff operation,
                in meters
            relative: whether the altitude is relative to the current position
            velocity: the desired takeoff velocity, in meters per second
        """
        await self._get_crazyflie().high_level_commander.takeoff(
            altitude, relative=relative, velocity=velocity
        )

    async def test_component(self, component: str) -> None:
        """Tests a component of the UAV.

        Parameters:
            component: the component to test; currently we support ``motor``,
                ``battery`` and ``led``
        """
        if component == "motor":
            await self.set_parameter("health.startPropTest", 1)
        elif component == "led":
            await self._get_crazyflie().led_ring.test()
        elif component == "battery":
            await self.set_parameter("health.startBatTest", 1)
        else:
            raise NotSupportedError

    async def upload_show(self, show, *, remember: bool = True) -> None:
        home = get_home_position_from_show_specification(show)
        trajectory = get_trajectory_from_show_specification(show)
        group_index = get_group_index_from_show_specification(show)
        if group_index > 7:
            raise RuntimeError("Crazyflie drones support at most 8 groups only")

        scale = trajectory.propose_scaling_factor()
        if scale > 1:
            raise RuntimeError("Trajectory covers too large an area for a Crazyflie")

        light_program = get_light_program_from_show_specification(show)
        try:
            await self._upload_light_program(light_program)
        except OSError as ex:
            if ex.errno == EIO:
                raise RuntimeError(
                    "IO error while uploading light program; is it too large?"
                ) from None
            else:
                raise

        # TODO: support yaw control for Crazyflie drones as well

        try:
            await self._upload_trajectory_and_fence(
                trajectory, home, fence_config=self.driver.fence_config
            )
        except OSError as ex:
            if ex.errno == EIO:
                raise RuntimeError(
                    "IO error while uploading trajectory; is it too large?"
                ) from None
            else:
                raise

        assert self._crazyflie is not None
        await self._crazyflie.high_level_commander.set_group_mask(1 << group_index)

        await self._enable_show_mode()

        self._last_uploaded_show = show if remember else None

    @asynccontextmanager
    async def use(self, debug: bool = False):
        """Async context manager that establishes a low-level connection to the
        drone given its URI when the context is entered, and closes the
        connection when the context is exited.

        Parameters:
            debug: whether to print the messages passed between the drone and
                the server to the console
        """
        uri = self.uri

        if uri is None:
            raise RuntimeError("Crazyflie URI is not set yet")

        if debug and "+log" not in uri:
            uri = uri.replace("://", "+log://")

        try:
            async with Crazyflie(
                uri, cache=self.driver.cache_folder
            ) as self._crazyflie:
                self._fence = Fence(self._crazyflie)
                await self._crazyflie.log.validate()
                try:
                    self._log_session = self._setup_logging_session()
                    yield self._crazyflie
                finally:
                    self._log_session = None
        finally:
            self._fence = None
            self._crazyflie = None

    async def setup_flight_mode(self):
        """Sets up the appropriate flight mode (high level controller or drone
        show mode). This function should be called after (re-)establishing
        connection with a Crazyflie.

        The rule is that we set the Crazyflie into drone show mode if a drone
        show has been uploaded to it at least once (even if it was with an
        earlier booting attempt), otherwise we simply turn on the high level
        commander only.

        Note that the function is _not_ a context manager, i.e. it does not
        restore the original flight mode when exiting the context. This is
        intentional -- we don't want to get the Crazyflie out of its current
        flight mode if we accidentally lose contact with it for a split second
        only.
        """
        needs_show_mode = self.has_previously_uploaded_show
        cf = self._get_crazyflie()

        await cf.param.validate()
        await cf.high_level_commander.enable()

        if needs_show_mode:
            await self._enable_show_mode()

    @staticmethod
    def _create_empty_preflight_status_report() -> PreflightCheckInfo:
        """Creates an empty preflight status report that will be updated
        periodically.
        """
        report = PreflightCheckInfo()
        report.add_item("battery", "Battery")
        report.add_item("stabilizer", "Sensors")
        report.add_item("kalman", "Kalman filter")
        report.add_item("positioning", "Positioning")
        report.add_item("home", "Home position")
        report.add_item("trajectory", "Trajectory and lights")
        return report

    async def _enable_show_mode(self) -> None:
        """Enables the drone-show mode on the Crazyflie."""
        cf = self._get_crazyflie()
        await cf.param.set("kalman.robustTdoa", 1)
        await cf.param.set("show.enabled", 1)
        if self.driver.use_test_mode:
            await cf.param.set("show.testing", 1)

    def _on_battery_and_system_state_received(self, message):
        self._battery.voltage = message.items[0]
        self._battery.charging = message.items[1] == 1  # PM state 1 = charging
        self._armed = bool(message.items[2])
        self._update_error_codes()
        self.update_status(battery=self._battery)
        self.notify_updated()

    def _on_position_velocity_info_received(self, message):
        self._position.x, self._position.y, self._position.z = message.items[0:3]
        self._velocity.x, self._velocity.y, self._velocity.z = message.items[3:6]

        self._position.x /= 1000
        self._position.y /= 1000
        self._position.z /= 1000

        self._velocity.x /= 1000
        self._velocity.y /= 1000
        self._velocity.z /= 1000

        self._update_error_codes()

        self.update_status(
            position_xyz=self._position,
            velocity_xyz=self._velocity,
            heading=message.items[6],
        )

        self.notify_updated()

    def _reset_status_variables(self) -> None:
        """Resets the status variables of the UAV, typically after connecting
        to the UAV or after re-establishing a connection.
        """
        self._preflight_status = self._create_empty_preflight_status_report()
        self._airborne = False
        self._armed = True  # Crazyflies typically boot in an armed state
        self._fence_breached = False
        self._battery = BatteryInfo()
        self._position = PositionXYZ()
        self._show_execution_stage = DroneShowExecutionStage.UNKNOWN
        self._velocity = VelocityXYZ()

    def _setup_logging_session(self) -> LogSession:
        """Sets up the log blocks that contain the variables we need from the
        Crazyflie, and returns a LogSession object.
        """
        assert self._crazyflie is not None

        session = self._crazyflie.log.create_session()
        session.configure(graceful_cleanup=True)
        return session

        session.create_block(
            "pm.vbat",
            "pm.state",
            "sys.armed",
            period=1,
            handler=self._on_battery_and_system_state_received,
        )
        session.create_block(
            "stateEstimateZ.x",
            "stateEstimateZ.y",
            "stateEstimateZ.z",
            "stateEstimateZ.vx",
            "stateEstimateZ.vy",
            "stateEstimateZ.vz",
            "stateEstimate.yaw",
            period=1,
            handler=self._on_position_velocity_info_received,
        )
        return session

    def _update_error_codes(self) -> None:
        """Updates the set of error codes based on what we know about the current
        state of the drone.
        """
        self.ensure_error(
            FlockwaveErrorCode.ON_GROUND,
            present=not self._airborne
            and not self._show_execution_stage.is_likely_airborne,
        )

        self.ensure_error(
            FlockwaveErrorCode.PREARM_CHECK_IN_PROGRESS,
            present=(
                self._preflight_status.in_progress
                or self._show_execution_stage
                is DroneShowExecutionStage.WAIT_FOR_PREFLIGHT_CHECKS
            ),
        )
        self.ensure_error(
            FlockwaveErrorCode.PREARM_CHECK_FAILURE,
            present=self._preflight_status.failed_conclusively,
        )

        self.ensure_error(FlockwaveErrorCode.DISARMED, present=not self._armed)

        # TODO(ntamas): use GEOFENCE_VIOLATION_WARNING if the motors are not
        # running. Currently we have no information about whether the motors are
        # running or not.
        self.ensure_error(
            FlockwaveErrorCode.GEOFENCE_VIOLATION, present=self._fence_breached
        )

        self.ensure_error(
            FlockwaveErrorCode.TAKEOFF,
            present=self._show_execution_stage is DroneShowExecutionStage.TAKEOFF,
        )
        self.ensure_error(
            FlockwaveErrorCode.LANDING,
            present=self._show_execution_stage is DroneShowExecutionStage.LANDING,
        )
        self.ensure_error(
            FlockwaveErrorCode.LANDED,
            present=self._show_execution_stage is DroneShowExecutionStage.LANDED,
        )

        voltage = self._battery.voltage
        self.ensure_error(
            FlockwaveErrorCode.BATTERY_LOW_ERROR,
            present=voltage is not None and voltage <= 3.1,
        )
        self.ensure_error(
            FlockwaveErrorCode.BATTERY_LOW_WARNING,
            present=voltage is not None and (voltage <= 3.3 and voltage > 3.1),
        )

    def _update_preflight_status_from_result_codes(
        self, codes: Sequence[PreflightCheckStatus]
    ) -> None:
        """Updates the result of the local preflight check report data structure
        from the result codes received in a stauts package.
        """
        for check, code in zip(self._preflight_status.items, codes):
            code = code & 0x03
            if code == 3 and check.id == "kalman":
                # The Kalman filter is a soft failure only; the drone is
                # constantly attempting to bring the filter back into a
                # convergent state
                code = 4
            check.result = self._preflight_result_map[code]

        self._preflight_status.update_summary()

    async def _upload_light_program(self, data: bytes) -> None:
        """Uploads the given light program to the Crazyflie drone."""
        cf = self._get_crazyflie()
        try:
            memory = await cf.mem.find(MemoryType.APP)
        except ValueError:
            raise RuntimeError(
                "Light programs are not supported on this drone"
            ) from None
        addr = await write_with_checksum(memory, 0, data, only_if_changed=True)
        await cf.run_command(
            port=DRONE_SHOW_PORT,
            command=DroneShowCommand.DEFINE_LIGHT_PROGRAM,
            data=Struct("<BBBBII").pack(
                0,  # light program ID
                LightProgramLocation.MEM,
                LightProgramType.SKYBRUSH,
                0,  # fps, not used
                addr,  # address in memory
                len(data),  # length of light program
            ),
        )

    async def _upload_trajectory_and_fence(
        self,
        trajectory: TrajectorySpecification,
        home: Optional[tuple[float, float, float]],
        fence_config: FenceConfiguration,
    ) -> None:
        """Uploads the given trajectory data to the Crazyflie drone and applies
        the given safety fence configuration.
        """
        cf = self._get_crazyflie()

        try:
            trajectory_memory = await cf.mem.find(MemoryType.TRAJECTORY)
        except ValueError:
            raise RuntimeError("Trajectories are not supported on this drone") from None

        supports_fence = await self.fence.is_supported() if self.fence else False
        if not supports_fence:
            self.driver.log.warning(
                "Geofence is not supported on this drone; please update the firmware",
                extra={"id": self.id},
            )

        # Define the home position and the takeoff time first
        await self.set_home_position(home or trajectory.home_position)
        await self.set_parameter("show.takeoffTime", trajectory.takeoff_time)

        # Set the landing height
        await self.set_parameter("show.landingHeight", trajectory.landing_height)

        # Encode the trajectory and write it to the Crazyflie memory
        data = encode_trajectory(trajectory, encoding=TrajectoryEncoding.COMPRESSED)
        addr = await write_with_checksum(
            trajectory_memory, 0, data, only_if_changed=True
        )

        # Define the geofence first (for safety reasons)
        if supports_fence:
            assert self.fence is not None
            await fence_config.apply(self.fence, trajectory)

        # Now we can define the entire trajectory as well
        await cf.high_level_commander.define_trajectory(
            0, addr=addr, type=TrajectoryType.COMPRESSED
        )


class CrazyflieHandlerTask:
    """Class responsible for handling communication with a single Crazyflie
    drone.
    """

    _debug: bool
    _log: Logger
    _status_interval: float
    _uav: CrazyflieUAV

    def __init__(
        self,
        uav: CrazyflieUAV,
        log: Logger,
        debug: bool = False,
        status_interval: float = 0.5,
    ):
        """Constructor.

        Parameters:
            uav: the Crazyflie UAV to communicate with
            debug: whether to log the communication with the UAV on the console
            status_interval: number of seconds that should pass between consecutive
                status requests sent to a drone
        """
        self._log = log
        self._uav = uav
        self._debug = bool(debug)
        self._status_interval = float(status_interval)

    async def run(self) -> None:
        """Executes the task that handles communication with the associated
        Crazyflie drone.

        This task is guaranteed not to throw an exception so it won't crash the
        parent nursery it is running in. However, it will not handle
        reconnections either -- it will simply exit in case of a connection
        error.
        """
        try:
            await self._run()
        except IOError as ex:
            self._log.error(
                f"Error while handling Crazyflie: {str(ex)}",
                extra={"id": self._uav.id, "telemetry": "ignore"},
            )
            # We do not log the stack trace of IOErrors -- the stack trace is too long
            # and in 99% of the cases it is simply a communication error
        except Exception as ex:
            self._log.exception(
                f"Error while handling Crazyflie: {str(ex)}", extra={"id": self._uav.id}
            )

    async def _run(self) -> None:
        """Implementation of the task itself.

        This task is guaranteed not to throw an exception so it won't crash the
        parent nursery it is running in. However, it will not handle
        reconnections either -- it will simply exit in case of a connection
        error.
        """
        self._uav._reset_status_variables()

        try:
            async with AsyncExitStack() as stack:
                enter = stack.enter_async_context

                try:
                    await enter(self._uav.use(debug=self._debug))
                    assert self._uav.log_session is not None
                    await enter(self._uav.log_session)
                except TimeoutError:
                    self._log.error(
                        "Communication timeout while initializing connection",
                        extra={"id": self._uav.id, "telemetry": "ignore"},
                    )
                    return
                except Exception as ex:
                    self._log.error(
                        f"Error while initializing connection: {str(ex)}",
                        extra={"id": self._uav.id},
                    )
                    if not isinstance(ex, IOError):
                        self._log.exception(ex)
                    else:
                        # We do not log IOErrors -- the stack trace is too long
                        # and in 99% of the cases it is simply a communication error
                        pass
                    return

                nursery: Nursery = await enter(open_nursery())  # type: ignore
                self._uav.notify_shutdown_suspend_or_reboot = (
                    nursery.cancel_scope.cancel
                )
                nursery.start_soon(self._uav.process_console_messages)
                nursery.start_soon(
                    self._uav.process_drone_show_status_messages, self._status_interval
                )
                nursery.start_soon(self._uav.process_log_messages)

                await self._reupload_last_show_if_needed()

                # We need to set up the flight mode here after a bit of delay,
                # otherwise we try to set it too fast after a reboot and the
                # drone will ignore the request
                try:
                    await sleep(2)
                    await self._uav.setup_flight_mode()
                except TimeoutError:
                    self._log.error(
                        "Communication timeout while setting flight mode",
                        extra={"id": self._uav.id, "telemetry": "ignore"},
                    )
                    return
                except Exception as ex:
                    self._log.error(
                        f"Error while setting flight mode: {str(ex)}",
                        extra={"id": self._uav.id},
                    )
                    if not isinstance(ex, IOError):
                        self._log.exception(ex)
                    else:
                        # We do not log IOErrors -- the stack trace is too long
                        # and in 99% of the cases it is simply a communication error
                        pass
                    return
        finally:
            self._uav.notify_shutdown_suspend_or_reboot = nop

    async def _reupload_last_show_if_needed(self) -> None:
        try:
            if self._uav.has_previously_uploaded_show:
                # UAV was rebooted but we have already uploaded a show to it
                # before, so we should upload it again if the show framework
                # is in the idle state. First we wait two seconds to be sure
                # that we receive at least one show status packet from the
                # drone
                await sleep(2)
                if not self._uav.is_running_show:
                    await self._uav.reupload_last_show()
        except TimeoutError:
            # This is normal, it comes from aiocflib when the Crazyflie is
            # turned off again
            self._log.warning(
                "Failed to re-upload previously uploaded show to possibly "
                "rebooted drone due to a communication timeout",
                extra={"id": self._uav.id},
            )
        except Exception as ex:
            self._log.warning(
                "Failed to re-upload previously uploaded show to possibly "
                "rebooted drone",
                extra={"id": self._uav.id},
            )
            self._log.exception(ex)