Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-server / server / ext / mavlink / autopilots / ardupilot.py
Size: Mime:
from __future__ import annotations

import logging
from contextlib import aclosing
from copy import deepcopy
from dataclasses import dataclass
from functools import partial
from io import BytesIO
from struct import Struct
from time import monotonic
from typing import (
    IO,
    TYPE_CHECKING,
    AsyncIterator,
    Iterable,
    Sequence,
    Type,
    Union,
    cast,
)

from trio import TooSlowError, sleep

from flockwave.server.errors import NotSupportedError
from flockwave.server.model.commands import (
    Progress,
    ProgressEventsWithSuspension,
    Suspend,
)
from flockwave.server.model.geofence import (
    GeofenceAction,
    GeofenceConfigurationRequest,
    GeofenceStatus,
)
from flockwave.server.model.safety import (
    LowBatteryThresholdType,
    SafetyConfigurationRequest,
)
from flockwave.server.utils import clamp

from ..enums import (
    MAVAutopilot,
    MAVCommand,
    MAVMessageType,
    MAVModeFlag,
    MAVParamType,
    MAVProtocolCapability,
    MAVState,
    MAVSysStatusSensor,
    MAVType,
)
from ..errors import UnknownFlightModeError
from ..ftp import MAVFTP
from ..fw_upload import FirmwareUpdateResult, FirmwareUpdateTarget
from ..geofence import GeofenceManager, GeofenceType
from ..types import MAVLinkFlightModeNumbers, MAVLinkMessage
from ..utils import log_id_for_uav

if TYPE_CHECKING:
    from ..driver import MAVLinkUAV

from .base import Autopilot
from .registry import register_for_mavlink_type

__all__ = ("ArduPilot", "ArduPilotWithSkybrush")

log = logging.getLogger(__name__)

FlightModeMap = dict[int, tuple[str, ...]]


@register_for_mavlink_type(MAVAutopilot.ARDUPILOTMEGA)
class ArduPilot(Autopilot):
    """Class representing the ArduPilot autopilot firmware."""

    name = "ArduPilot"

    # Explicit quadcopter (multirotor) custom modes
    _quadcopter_custom_modes: FlightModeMap = {
        0: ("stab", "stabilize"),
        1: ("acro",),
        2: ("alt", "alt hold"),
        3: ("auto",),
        4: ("guided",),
        5: ("loiter",),
        6: ("rth",),
        7: ("circle",),
        9: ("land",),
        11: ("drift",),
        13: ("sport",),
        14: ("flip",),
        15: ("tune",),
        16: ("pos", "pos hold"),
        17: ("brake",),
        18: ("throw",),
        19: ("avoid ADSB", "avoid"),
        20: ("guided no GPS",),
        21: ("smart RTH",),
        22: ("flow", "flow hold"),
        23: ("follow",),
        24: ("zigzag",),
        25: ("system ID",),
        26: ("heli autorotate", "autorotate"),
        27: ("auto RTH",),
        28: ("turtle",),
    }

    # Backwards-compatible alias used elsewhere in the codebase
    _custom_modes: FlightModeMap = _quadcopter_custom_modes

    # Rover-specific custom modes (used for MAVType.GROUND_ROVER)
    _rover_custom_modes: FlightModeMap = {
        0: ("manual",),
        1: ("learning",),
        2: ("steer", "steering"),
        3: ("hold",),
        4: ("loiter",),
        10: ("auto",),
        11: ("rtl", "return"),
        15: ("guided",),
        16: ("pos", "position"),
        17: ("brake",),
    }

    # Per-vehicle-type custom modes mapping. Keys are MAVType enum values.
    _custom_modes_by_mav_type: dict[int, FlightModeMap] = {
        MAVType.QUADROTOR.value: _quadcopter_custom_modes,
        MAVType.GROUND_ROVER.value: _rover_custom_modes,
        # other vehicle types may be added here
    }

    _geofence_actions: dict[int, tuple[GeofenceAction, ...]] = {
        0: (GeofenceAction.REPORT,),
        1: (GeofenceAction.RETURN, GeofenceAction.LAND),
        2: (GeofenceAction.LAND,),
        3: (GeofenceAction.SMART_RETURN, GeofenceAction.RETURN, GeofenceAction.LAND),
        4: (GeofenceAction.STOP, GeofenceAction.LAND),
    }

    MAX_ACCELEROMETER_CALIBRATION_DURATION = 120
    """Maximum allowed duration of an accelerometer calibration, in seconds"""

    MAX_COMPASS_CALIBRATION_DURATION = 60
    """Maximum allowed duration of a compass calibration, in seconds"""

    @classmethod
    def describe_custom_mode(
        cls, base_mode: int, custom_mode: int, vehicle_type: int | MAVType | None = None
    ) -> str:
        """Returns the description of the current custom mode that the autopilot
        is in, given the base and the custom mode in the heartbeat message.

        Args:
            base_mode: the base mode from the heartbeat message
            custom_mode: the custom mode from the heartbeat message
            vehicle_type: the vehicle type from the heartbeat message. May be
                omitted; in this case the legacy/default mapping is used and
                a warning message is logged.
        """
        # Determine which mapping to use. Accept both MAVType enum members and ints.
        mapping: FlightModeMap

        if vehicle_type is not None:
            try:
                vt = (
                    int(cast(MAVType, vehicle_type).value)
                    if hasattr(vehicle_type, "value")
                    else int(vehicle_type)
                )
            except Exception:
                vt = None
            if vt is not None:
                mapping = cls._custom_modes_by_mav_type.get(vt, cls._custom_modes)
        else:
            # Warn if vehicle type is not provided so the user knows we're using the default
            log.warning("Vehicle type unknown; using default custom mode mapping")
            mapping = cls._custom_modes

        mode_attrs = mapping.get(custom_mode)
        return mode_attrs[0] if mode_attrs else f"mode {custom_mode}"

    def are_motor_outputs_disabled(
        self, heartbeat: MAVLinkMessage, sys_status: MAVLinkMessage
    ) -> bool:
        # ArduPilot uses the MOTOR_OUTPUTS "sensor" to indicate whether the
        # motor outputs are disabled. More precisely, it always has
        # MOTOR_OUTPUTS in the "present" and "health" field and the "enabled"
        # field specifies whether the motor outputs are enabled
        if (
            sys_status.onboard_control_sensors_health
            & MAVSysStatusSensor.MOTOR_OUTPUTS.value
            and sys_status.onboard_control_sensors_present
            & MAVSysStatusSensor.MOTOR_OUTPUTS.value
        ):
            return not bool(
                sys_status.onboard_control_sensors_enabled
                & MAVSysStatusSensor.MOTOR_OUTPUTS.value
            )
        else:
            return False

    async def calibrate_accelerometer(
        self, uav: MAVLinkUAV
    ) -> ProgressEventsWithSuspension[None, str]:
        # Reset our internal state object of the accelerometer calibration procedure
        uav.accelerometer_calibration.reset()

        # accelerometer calibration starts with sending a proper preflight
        # calib command
        success = await uav.driver.send_command_long(
            uav, MAVCommand.PREFLIGHT_CALIBRATION, 0, 0, 0, 0, 1
        )
        if not success:
            raise RuntimeError("Failed to start accelerometer calibration")

        successful_calibration = False
        timeout = self.MAX_ACCELEROMETER_CALIBRATION_DURATION

        try:
            async for progress in uav.accelerometer_calibration.updates(
                timeout=timeout, fail_on_timeout=False
            ):
                yield progress
                if isinstance(progress, Suspend):
                    # Accel calibration was suspended, but then we got here, so
                    # the user must have resumed the operation. Let's forward
                    # the resume instruction to the UAV.
                    success = await uav.driver.send_command_long(
                        uav,
                        MAVCommand.ACCELCAL_VEHICLE_POS,
                        uav.accelerometer_calibration.next_step,
                    )
                    if not success:
                        raise RuntimeError("Failed to resume accelerometer calibration")

                    uav.accelerometer_calibration.notify_resumed()
                elif isinstance(progress, Progress):
                    if progress.percentage == 100:
                        successful_calibration = True

        except TooSlowError:
            raise RuntimeError(
                f"Accelerometer calibration did not finish in {timeout} seconds"
            ) from None

        if successful_calibration:
            # Indicate to the user that we are now rebooting the drone, otherwise
            # it's confusing that the UI shows 100% but the operation is still in
            # progress
            yield Progress.done("Rebooting...")

            # Wait a bit so the user sees the LED flashes on the drone that indicate a
            # successful calibration
            await sleep(1.5)

            try:
                await uav.reboot()
            except Exception:
                raise RuntimeError(
                    "Failed to reboot UAV after successful accelerometer calibration"
                ) from None

        yield Progress.done("Acceelerometer calibration successful.")

    async def calibrate_compass(
        self, uav: MAVLinkUAV
    ) -> ProgressEventsWithSuspension[None, str]:
        calibration_messages = {
            int(MAVMessageType.MAG_CAL_PROGRESS): 1.0,
            int(MAVMessageType.MAG_CAL_REPORT): 1.0,
        }
        started, success = False, False
        timeout = self.MAX_COMPASS_CALIBRATION_DURATION

        try:
            async with uav.temporarily_request_messages(calibration_messages):
                # Reset our internal state object of the compass calibration procedure
                uav.compass_calibration.reset()

                # Messages are not handled here but in the MAVLinkNetwork,
                # which forwards them to the UAV, which in turn refreshes its
                # state variables in its CompassCalibration object. This is not
                # nice, but it works.
                await uav.driver.send_command_long(
                    uav,
                    MAVCommand.DO_START_MAG_CAL,
                    0,  # calibrate all compasses
                    0,  # retry on failure
                    1,  # autosave on success
                )
                started = True

                async for progress in uav.compass_calibration.updates(
                    timeout=timeout, fail_on_timeout=False
                ):
                    if isinstance(progress, Progress):
                        if progress.percentage == 100:
                            success = True
                    yield progress

        except TooSlowError:
            raise RuntimeError(
                f"Compass calibration did not finish in {timeout} seconds"
            ) from None

        except RuntimeError:
            raise

        except Exception:
            if not started:
                raise RuntimeError("Failed to start compass calibration") from None
            try:
                await uav.driver.send_command_long(uav, MAVCommand.DO_CANCEL_MAG_CAL)
            except Exception:
                uav.driver.log.warning(
                    "Failed to cancel compass calibration",
                    extra={"id": log_id_for_uav(uav)},
                )
            raise RuntimeError("Compass calibration terminated unexpectedly") from None

        if success:
            # Indicate to the user that we are now rebooting the drone, otherwise
            # it's confusing that the UI shows 100% but the operation is still in
            # progress
            yield Progress.done("Rebooting...")

            # Wait a bit so the user sees the LED flashes on the drone that indicate a
            # successful calibration
            await sleep(1.5)

            try:
                await uav.reboot()
            except Exception:
                raise RuntimeError(
                    "Failed to reboot UAV after successful compass calibration"
                ) from None

        yield Progress.done("Compass calibration successful.")

    def can_handle_firmware_update_target(self, target_id: str) -> bool:
        return target_id == FirmwareUpdateTarget.ABIN.value

    async def configure_geofence(
        self, uav: MAVLinkUAV, configuration: GeofenceConfigurationRequest
    ) -> None:
        fence_type = GeofenceType.OFF

        if configuration.min_altitude is not None:
            # Update the minimum altitude limit; note that ArduCopter supports
            # only the [-100; 100] range.
            min_altitude = float(clamp(configuration.min_altitude, -100, 100))
            await uav.set_parameter("FENCE_ALT_MIN", min_altitude)
            fence_type |= GeofenceType.FLOOR
        else:
            # Assume that the minimum altitude limit is disabled
            pass

        if configuration.max_altitude is not None:
            # Update the maximum altitude limit; note that the ArduCopter
            # parameter documentation suggests that it supports only the [10;
            # 1000] range, but it is most likely a software limitation only,
            # so we allow higher values here.
            max_altitude = float(clamp(configuration.max_altitude, 10, 3000))
            if max_altitude < configuration.max_altitude:
                raise NotSupportedError(
                    f"Geofence max altitude ({configuration.max_altitude} m) "
                    f"exceeds safety limit ({max_altitude} m)",
                )

            await uav.set_parameter("FENCE_ALT_MAX", max_altitude)
            fence_type |= GeofenceType.ALTITUDE
        else:
            # Assume that the maximum altitude limit is disabled
            pass

        if configuration.max_distance is not None:
            # Update the maximum distance; note that the ArduCopter
            # parameter documentation suggests that it supports only the [10;
            # 10000] range. 10 km is actually quite a large distance so we
            # stick to the recommendations here.
            max_distance = float(clamp(configuration.max_distance, 30, 10000))
            if max_distance < configuration.max_distance:
                raise NotSupportedError(
                    f"Geofence max distance ({configuration.max_distance} m) "
                    f"exceeds ArduPilot limit ({max_distance} m)",
                )

            await uav.set_parameter("FENCE_RADIUS", max_distance)
            fence_type |= GeofenceType.CIRCLE
        else:
            # Assume that the distance limit is disabled
            pass

        if configuration.polygons is not None:
            # Update geofence polygons
            manager = GeofenceManager.for_uav(uav)
            await manager.set_geofence_areas(configuration.polygons)
            fence_type |= GeofenceType.POLYGON
        else:
            # Assume that the polygon fence is disabled
            pass

        if configuration.rally_points is not None:
            if configuration.rally_points:
                raise NotImplementedError("rally points not supported yet")

        if configuration.action is not None:
            # Update geofence action
            action_map = {
                GeofenceAction.LAND: 2,  # always land
                GeofenceAction.REPORT: 0,  # report only
                GeofenceAction.RETURN: 1,  # RTH or land
                GeofenceAction.STOP: 4,  # brake or land
            }
            mapped_action = action_map.get(configuration.action)
            if mapped_action is not None:
                await uav.set_parameter("FENCE_ACTION", int(mapped_action))
            else:
                raise NotSupportedError(
                    f"geofence action {configuration.action!r} not supported on ArduPilot"
                )
        else:
            # Assume that we do not need to change the geofence action
            pass

        # Update the type of the geofence
        await uav.set_parameter("FENCE_TYPE", int(fence_type))

        if configuration.enabled is None:
            # Infer whether the fence should be enabled or disabled based on
            # fence_type
            fence_enabled = bool(fence_type)
        else:
            fence_enabled = bool(configuration.enabled)

        # Update whether the fence is enabled or disabled
        await uav.set_parameter("FENCE_ENABLE", int(fence_enabled))

    async def configure_safety(
        self, uav, configuration: SafetyConfigurationRequest
    ) -> None:
        if configuration.low_battery_threshold is not None:
            if configuration.low_battery_threshold.type == LowBatteryThresholdType.OFF:
                await uav.set_parameter("BATT_LOW_MAH", 0)
                await uav.set_parameter("BATT_LOW_VOLT", 0)
            elif (
                configuration.low_battery_threshold.type
                == LowBatteryThresholdType.VOLTAGE
            ):
                await uav.set_parameter(
                    "BATT_LOW_VOLT", configuration.low_battery_threshold.value
                )
                await uav.set_parameter("BATT_LOW_MAH", 0)
            elif (
                configuration.low_battery_threshold.type
                == LowBatteryThresholdType.PERCENTAGE
            ):
                capacity = await uav.get_parameter("BATT_CAPACITY")
                await uav.set_parameter(
                    "BATT_LOW_MAH",
                    capacity * configuration.low_battery_threshold.value / 100,
                )
                await uav.set_parameter("BATT_LOW_VOLT", 0)
            else:
                raise RuntimeError(
                    f"Unknown low battery threshold type: {configuration.low_battery_threshold.type!r}"
                )

        if configuration.critical_battery_voltage is not None:
            await uav.set_parameter(
                "BATT_CRT_VOLT", configuration.critical_battery_voltage
            )
        if configuration.return_to_home_altitude is not None:
            await uav.set_parameter(
                "RTL_ALT",
                int(configuration.return_to_home_altitude * 100),  # [m] -> [cm]
            )
        if configuration.return_to_home_speed is not None:
            await uav.set_parameter(
                "RTL_SPEED",
                int(configuration.return_to_home_speed * 100),  # [m/s] -> [cm/s]
            )

    def decode_param_from_wire_representation(
        self, value: Union[int, float], type: MAVParamType
    ) -> float:
        # ArduCopter does not implement the MAVLink specification correctly and
        # requires all parameter values to be sent as floats, no matter what
        # their type is. See this link from Gitter:
        #
        # https://gitter.im/ArduPilot/pymavlink?at=5bfb975587c4b86bcc1af3ee
        return float(value)

    def encode_param_to_wire_representation(
        self, value: Union[int, float], type: MAVParamType
    ) -> float:
        # ArduCopter does not implement the MAVLink specification correctly and
        # requires all parameter values to be sent as floats, no matter what
        # their type is. See this link from Gitter:
        #
        # https://gitter.im/ArduPilot/pymavlink?at=5bfb975587c4b86bcc1af3ee
        return float(value)

    def get_flight_mode_numbers(self, mode: str) -> MAVLinkFlightModeNumbers:
        mode = mode.lower().replace(" ", "")
        for number, names in self._custom_modes.items():
            for name in names:
                name = name.lower().replace(" ", "")
                if name == mode:
                    return (MAVModeFlag.CUSTOM_MODE_ENABLED, number, 0)

        raise UnknownFlightModeError(mode)

    async def get_geofence_status(self, uav: MAVLinkUAV) -> GeofenceStatus:
        status = GeofenceStatus()

        # Generic stuff comes here
        manager = GeofenceManager.for_uav(uav)
        await manager.get_geofence_areas_and_rally_points(status)

        # ArduCopter-specific parameters are used to extend the status
        value = await uav.get_parameter("FENCE_ENABLE")
        status.enabled = bool(value)

        value = await uav.get_parameter("FENCE_ALT_MIN")
        status.min_altitude = float(value)

        value = await uav.get_parameter("FENCE_ALT_MAX")
        status.max_altitude = float(value)

        value = await uav.get_parameter("FENCE_RADIUS")
        status.max_distance = float(value)

        value = await uav.get_parameter("FENCE_ACTION")
        status.actions = list(self._geofence_actions.get(int(value), ()))

        return status

    async def handle_firmware_update(
        self, uav: MAVLinkUAV, target_id: str, blob: bytes
    ) -> AsyncIterator[Progress]:
        assert self.can_handle_firmware_update_target(target_id)

        # TODO(ntamas): validate .abin firmware

        # Upload firmware
        async with aclosing(MAVFTP.for_uav(uav)) as ftp:
            async with ftp.put_gen(blob, "/ardupilot.abin") as gen:
                async for progress in gen:
                    # Scale progress down to a max of 90% -- the remaining
                    # 10% will be rebooting and checking the result
                    percentage = progress.percentage
                    if percentage is not None:
                        yield progress.update(percentage=int(percentage * 0.9))

        # Progress is now at 90%. Ask for a reboot to the bootloader
        yield Progress(percentage=90)
        await uav.reboot_after_update()

        # Wait until the UAV becomes disconnected, but at least two seconds
        start = monotonic()
        while True:
            await sleep(0.5)

            dt = monotonic() - start
            if dt >= 2 and not uav.is_connected:
                break
            if dt >= 5:
                raise RuntimeError("UAV failed to reboot after uploading new firmware")

        # We have no way to know when the update is finished, so we pretend
        # that it is going to take about a minute. We wait for at most two
        # minutes and update the progress slowly.
        start = monotonic()
        while not uav.is_connected:
            await sleep(0.5)

            dt = monotonic() - start
            if dt > 120:
                # We waited for two minutes, so we give up
                raise RuntimeError("Firmware update timed out")
            elif dt >= 100:
                # Pretend a slower percentage update from 98% onwards
                yield Progress(percentage=99)
            elif dt >= 80:
                # Pretend a slower percentage update from 98% onwards
                yield Progress(percentage=98)
            else:
                # Pretend a percentage update of 1% every 10 seconds
                yield Progress(percentage=90 + int(dt) // 10)

        # Wait 2 more seconds to make sure that the initialization process
        # has finished on the drone
        yield Progress(percentage=99)
        await sleep(2)

        # Check whether the firmware update was successful
        async with aclosing(MAVFTP.for_uav(uav)) as ftp:
            async with ftp.ls("/") as gen:
                entries: list[str] = []
                async for entry in gen:
                    entries.append(entry.name.lower())

            if "ardupilot.abin" in entries:
                result = FirmwareUpdateResult.UNSUPPORTED
            elif "ardupilot-verify.abin" in entries:
                result = FirmwareUpdateResult.FAILED_TO_VERIFY
            elif "ardupilot-verify-failed.abin" in entries:
                result = FirmwareUpdateResult.INVALID
            elif "ardupilot-flash.abin" in entries:
                result = FirmwareUpdateResult.FLASHING_FAILED
            elif "ardupilot-flashed.abin" in entries:
                result = FirmwareUpdateResult.SUCCESS
            else:
                result = FirmwareUpdateResult.UNSUPPORTED

            if not result.successful:
                raise RuntimeError(result.describe())

            # Try to delete the firmware file now that it is not needed but do
            # not raise an error if it fails
            try:
                await ftp.rm("/ardupilot-flashed.abin")
            except Exception:
                uav.driver.log.warning(
                    "Failed to delete the firmware file after update",
                    extra={"id": log_id_for_uav(uav)},
                )

        yield Progress(percentage=100)

    def is_duplicate_message(self, message: MAVLinkMessage) -> bool:
        return False

    def is_prearm_check_in_progress(
        self, heartbeat: MAVLinkMessage, sys_status: MAVLinkMessage
    ) -> bool:
        # Information not reported by ArduPilot by default
        return False

    def is_prearm_error_message(self, text: str) -> bool:
        return text.startswith("PreArm: ") or text.startswith("Arm: ")

    def is_rth_flight_mode(self, base_mode: int, custom_mode: int) -> bool:
        return bool(base_mode & 1) and (custom_mode == 6 or custom_mode == 21)

    def prepare_mavftp_parameter_upload(
        self, parameters: dict[str, float]
    ) -> tuple[str, bytes]:
        data = encode_parameters_to_packed_format(parameters)
        return "@PARAM/param.pck", data

    def process_prearm_error_message(self, text: str) -> str:
        return text[8:]

    def refine_with_capabilities(self, capabilities: int):
        result = super().refine_with_capabilities(capabilities)

        if isinstance(result, self.__class__) and not isinstance(
            result, ArduPilotWithSkybrush
        ):
            mask = ArduPilotWithSkybrush.CAPABILITY_MASK
            if (capabilities & mask) == mask:
                result = ArduPilotWithSkybrush(self)  # pyright: ignore[reportAbstractUsage]

        return result

    @property
    def is_battery_percentage_reliable(self) -> bool:
        # The battery percentage estimate of the stock ArduPilot is broken;
        # it is based on discharged current only so it always reports a
        # newly inserted battery as fully charged
        return False

    @property
    def supports_local_frame(self) -> bool:
        return True

    @property
    def supports_mavftp_parameter_upload(self) -> bool:
        return True

    @property
    def supports_repositioning(self) -> bool:
        # ArduCopter supports MAV_CMD_DO_REPOSITION since ArduCopter 4.1.0,
        # BUT it does not accept NaN in the altitude field. PX4 accepts NaN
        # and we rely on this to express our intention to use the current
        # altitude, so we cannot return True here until ArduCopter gains a
        # similar feature.
        return False

    @property
    def supports_scheduled_takeoff(self):
        return False


def extend_custom_modes(
    super: Type[ArduPilot], vehicle_type: int | MAVType, _new_modes: FlightModeMap
):
    """Helper function to extend the custom modes of an Autopilot_ subclass
    with new modes.
    """
    result = deepcopy(super._custom_modes_by_mav_type)
    mode_map = result.setdefault(vehicle_type, {})
    mode_map.update(_new_modes)
    return result


class ArduPilotWithSkybrush(ArduPilot):
    """Class representing the ArduCopter firmware with Skybrush-specific
    extensions to support drone shows.
    """

    name = "ArduPilot + Skybrush"
    _custom_modes_by_mav_type = extend_custom_modes(
        ArduPilot, MAVType.QUADROTOR, {127: ("show",)}
    )

    CAPABILITY_MASK = (
        MAVProtocolCapability.PARAM_FLOAT
        | MAVProtocolCapability.FTP
        | MAVProtocolCapability.SET_POSITION_TARGET_GLOBAL_INT
        | MAVProtocolCapability.SET_POSITION_TARGET_LOCAL_NED
        | MAVProtocolCapability.MAVLINK2
        | MAVProtocolCapability.DRONE_SHOW_MODE
    )

    def is_duplicate_message(self, message: MAVLinkMessage) -> bool:
        # We use the MSB of the compatibility flags to indicate that the message
        # is semantically equivalent to an earlier message of the same type from
        # the same source
        return message.get_header().compat_flags & 0x80

    def is_prearm_check_in_progress(
        self, heartbeat: MAVLinkMessage, sys_status: MAVLinkMessage
    ) -> bool:
        # Our patched firmware (ab)uses the CALIBRATING state in the heartbeat
        # for this before ArduCopter 4.0.5. From ArduCopter 4.0.5 onwwards,
        # there is a "preflight check" sensor so we use that
        mask = MAVSysStatusSensor.PREARM_CHECK.value
        if sys_status.onboard_control_sensors_present & mask:
            # ArduCopter version reports prearm check status with this message
            if sys_status.onboard_control_sensors_enabled & mask:
                # Prearm checks are enabled so return whether they pass or not
                return not bool(sys_status.onboard_control_sensors_health & mask)
            else:
                # Prearm checks are disabled so they are never in progress
                return False
        else:
            # ArduCopter version does not know about this flag so we assume that
            # we are running our firmware and that the CALIBRATING status is
            # used for reporting this
            return heartbeat.system_status == MAVState.CALIBRATING

    @ArduPilot.supports_scheduled_takeoff.getter
    def supports_scheduled_takeoff(self):
        return True


################################################################################
## ArduPilot packed parameter format handling
################################################################################


@dataclass
class PackedParameter:
    """A single entry in an ArduPilot-specific packed parameter representation."""

    name: bytes
    """Name of the parameter."""

    type: MAVParamType | None
    """Type of the parameter in the packed representation. Not necessarily the
    same as the type of the parameter in the onboard storage; ArduPilot will
    convert between the two if needed.
    """

    value: float
    """The value of the parameter."""

    default_value: float | None = None
    """The default value of the parameter, if known. Can be left empty if you
    want to _encode_ parameters into a packed representation instead of
    decoding them.
    """


_packed_param_header = Struct("<HHH")
_packed_type_to_mav_type: list[int] = [
    0,
    MAVParamType.INT8,
    MAVParamType.INT16,
    MAVParamType.INT32,
    MAVParamType.REAL32,
] + [0] * 11
_mav_type_to_packed_type: dict[MAVParamType, int] = {
    MAVParamType.INT8: 1,
    MAVParamType.INT16: 2,
    MAVParamType.INT32: 3,
    MAVParamType.REAL32: 4,
}
_packed_param_formats: list[Struct | None] = [
    None,
    Struct("<b"),
    Struct("<h"),
    Struct("<i"),
    Struct("<f"),
] + [None] * 11


def decode_parameters_from_packed_format(
    data: bytes | IO[bytes],
) -> Iterable[PackedParameter]:
    """Decodes an ArduPilot packed parameter bundle into an iterable of
    PackedParameter_ instances.

    The input must be a packed parameter bundle retrieved from the vehicle
    by downloading ``@PARAM/param.pck`` via MAVFTP. See the following file for
    more details:

    https://github.com/ArduPilot/ardupilot/blob/master/libraries/AP_Filesystem/README.md

    Args:
        data: the packed parameter bundle as a bytes object or as a readable
            stream

    Yields:
        PackedParameter_ instances decoded from the packed parameter bundle.
    """
    fp: IO[bytes] = BytesIO(data) if isinstance(data, bytes) else cast(IO[bytes], data)

    header_bytes = fp.read(_packed_param_header.size)

    magic: int
    num_params: int

    magic, num_params, _ = _packed_param_header.unpack(header_bytes)

    if magic != 0x671B and magic != 0x671C:
        raise RuntimeError("invalid magic bytes in packed param stream")

    has_defaults = magic == 0x671C

    prev_name = b""

    reader = partial(fp.read, 1)

    for _ in range(num_params):
        # Skip leading zeros
        for byte in iter(reader, b""):
            val = ord(byte)
            if val:
                type = val & 0x0F
                flags = (val & 0xF0) >> 4
                break
        else:
            # End of file, stop iteration
            break

        byte = reader()
        common_length, length = ord(byte) & 0x0F, (ord(byte) >> 4) + 1

        name = prev_name[:common_length] + fp.read(length)
        param_type: MAVParamType = MAVParamType(_packed_type_to_mav_type[type])
        struct = _packed_param_formats[type]
        value = struct.unpack(fp.read(struct.size)) if struct else None

        if has_defaults:
            if flags & 0x01:
                # Default value also provided
                default_value = struct.unpack(fp.read(struct.size)) if struct else None
            else:
                # Parameter is at its default value
                default_value = value
        else:
            default_value = None

        if value is not None:
            yield PackedParameter(
                name,
                param_type,
                value[0],
                default_value[0] if default_value is not None else None,
            )

        prev_name = name


def _propose_mav_type_for_value(value: float) -> MAVParamType:
    if isinstance(value, int) or value.is_integer():
        if -128 <= value <= 127:
            return MAVParamType.INT8
        elif -32768 <= value <= 32767:
            return MAVParamType.INT16
        elif -2147483648 <= value <= 2147483647:
            return MAVParamType.INT32
    return MAVParamType.REAL32


def encode_parameters_to_packed_format(
    parameters: Sequence[PackedParameter] | dict[str, float],
) -> bytes:
    """Encodes a sequence of PackedParameter_ instances into a packed parameter
    bundle that is suitable for uploading to `@PARAM/param.pck` via MAVFTP.

    Default values are ignored in the input. You may also provide a dict of
    name-value pairs.

    Note that the decoding and the encoding process is not symmetric. During
    encoding, the `total_length` field in the header is the total length of the
    bundle, in bytes. During decoding, the field contains the total number of
    parameters on the vehicle.

    Args:
        parameters: the PackedParameter_ instances or name-value pairs to encode

    Returns:
        The packed parameter bundle as a bytes object.
    """
    buf: list[bytes] = []

    # We don't know the total length yet so encode it as zero
    buf.append(_packed_param_header.pack(0x671B, len(parameters), 0))

    # Construct the parameter iterator
    if isinstance(parameters, dict):
        param_iter = (
            PackedParameter(name.upper().encode("ascii", "replace"), None, float(value))
            for name, value in parameters.items()
        )
    else:
        param_iter = iter(parameters)

    prev_name = b""

    for param in sorted(param_iter, key=lambda p: p.name.upper()):
        name = param.name
        if len(name) > 16:
            raise RuntimeError(f"Parameter name too long: {name!r}")

        mav_type = param.type or _propose_mav_type_for_value(param.value)
        packed_type = _mav_type_to_packed_type[mav_type]

        # Find length of longest common prefix of name and prev_name
        for i, (a, b) in enumerate(zip(name, prev_name, strict=False)):
            if a != b:
                common_len = i
                break
        else:
            # Since the iterator is sorted by name, this can happen only if
            # we have duplicate names or if prev_name is a prefix of name
            if name == prev_name:
                raise RuntimeError(f"Duplicate parameter name: {param.name!r}")
            else:
                common_len = len(prev_name)

        assert common_len < 16
        name_len = len(name) - common_len
        encoded_length = common_len | ((name_len - 1) << 4)

        buf.append(bytes([packed_type, encoded_length]))
        buf.append(name[common_len:])

        param_format = _packed_param_formats[packed_type]
        assert param_format is not None

        value = (
            int(param.value) if mav_type != MAVParamType.REAL32 else float(param.value)
        )
        buf.append(param_format.pack(value))

        prev_name = name

    total_length = sum(len(x) for x in buf)
    if total_length > 65535:
        raise RuntimeError(f"Packed parameter bundle too large: {total_length} bytes")

    # Now we can re-encode the header
    buf[0] = _packed_param_header.pack(0x671B, len(parameters), total_length)
    return b"".join(buf)