Repository URL to install this package:
|
Version:
2.40.3 ▾
|
"""Driver class for MAVLink-based drones."""
from __future__ import annotations
from collections import defaultdict
from contextlib import aclosing, asynccontextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
from functools import partial
from logging import Logger
from math import inf, isfinite
from time import monotonic
from typing import Any, AsyncIterator, Awaitable, Callable, Optional, Union
from colour import Color
from flockwave.concurrency import FutureCancelled, delayed
from flockwave.gps.time import datetime_to_gps_time_of_week, gps_time_of_week_to_utc
from flockwave.gps.vectors import GPSCoordinate, VelocityNED
from flockwave.spec.errors import FlockwaveErrorCode
from trio import Event, TooSlowError, fail_after, move_on_after, sleep
from flockwave.server.command_handlers import (
create_calibration_command_handler,
create_color_command_handler,
create_parameter_command_handler,
create_test_command_handler,
create_version_command_handler,
)
from flockwave.server.errors import NotSupportedError
from flockwave.server.ext.show.config import AuthorizationScope
from flockwave.server.model.battery import BatteryInfo
from flockwave.server.model.commands import (
Progress,
ProgressEvents,
ProgressEventsWithSuspension,
)
from flockwave.server.model.devices import DeviceTreeMutator
from flockwave.server.model.geofence import GeofenceConfigurationRequest, GeofenceStatus
from flockwave.server.model.gps import GPSFix
from flockwave.server.model.gps import GPSFixType as OurGPSFixType
from flockwave.server.model.log import FlightLog, FlightLogMetadata
from flockwave.server.model.preflight import PreflightCheckInfo, PreflightCheckResult
from flockwave.server.model.safety import SafetyConfigurationRequest
from flockwave.server.model.transport import TransportOptions
from flockwave.server.model.uav import (
BulkParameterUploadResponse,
UAVBase,
UAVDriver,
VersionInfo,
)
from flockwave.server.show import (
get_altitude_reference_from_show_specification,
get_coordinate_system_from_show_specification,
get_geofence_configuration_from_show_specification,
get_light_program_from_show_specification,
get_trajectory_from_show_specification,
)
from flockwave.server.show.formats import SkybrushBinaryShowFile
from flockwave.server.types import GCSLogMessageSender
from flockwave.server.utils import color_to_rgb8_triplet, to_uppercase_string
from flockwave.server.utils.generic import nop
from .accelerometer import AccelerometerCalibration
from .autopilots import ArduPilot, Autopilot, UnknownAutopilot
from .channel import Channel
from .compass import CompassCalibration
from .enums import (
ConnectionState,
GPSFixType,
MAVCommand,
MAVDataStream,
MAVFrame,
MAVMessageType,
MAVModeFlag,
MAVProtocolCapability,
MAVResult,
MAVState,
MAVSysStatusSensor,
MAVType,
MotorTestOrder,
MotorTestThrottleType,
PositionTargetTypemask,
RebootShutdownConditions,
SkybrushUserCommand,
)
from .ftp import MAVFTP
from .log_download import MAVLinkLogDownloader
from .packets import (
DroneShowExecutionStage,
DroneShowStatus,
authorization_scope_to_int,
create_led_control_packet,
)
from .rssi import RSSIMode, rtcm_counter_to_rssi
from .types import MAVLinkMessage, PacketBroadcasterFn, PacketSenderFn, spec
from .utils import (
can_communicate_infer_from_heartbeat,
log_id_for_uav,
mavlink_version_number_to_semver,
)
__all__ = ("MAVLinkDriver",)
#: Conversion constant from seconds to microseconds
SEC_TO_USEC = 1000000
#: Magic number to force an arming or disarming operation even if it is unsafe
#: to do so
FORCE_MAGIC = 21196
#: "Not a number" constant, used in some MAVLink messages to indicate a default
#: value
nan = float("nan")
def transport_options_to_channel(options: Optional[TransportOptions]) -> str:
"""Converts a transport options object sent by the user to a specific
MAVLink channel that satisfies the transport options.
We do not check whether the channel exists or not; it is the responsibility
of the CommunicationManager to fall back to another channel if the specified
channel is not open.
"""
if options is not None and getattr(options, "channel", 0) > 0:
return Channel.SECONDARY
else:
return Channel.PRIMARY
class MAVLinkDriver(UAVDriver["MAVLinkUAV"]):
"""Driver class for MAVLink-based drones."""
assume_data_streams_configured: bool = False
"""Whether to assume that UAVs managed by this driver already have their
MAVLink data streams configured appropriately. Used to skip the initialization
part, which is useful if you have thousands of drones and you know that
they are configured correctly.
"""
autopilot_factory: Optional[Callable[[], Autopilot]] = None
"""Factory function that returns a new Autopilot_ instance to be used by
drones managed by this driver. `None` means to infer the autopilot type
automatically from the heartbeat and the autopilot capabilities. Used to
skip the extra messages associated with the process, which is useful if you
have thousands of drones and you know which autopilot they are using.
"""
broadcast_packet: PacketBroadcasterFn
"""A function that should be called by the driver whenever it wants to
broadcast a packet. The function must be called with the packet to send.
"""
create_device_tree_mutator: Callable[[], DeviceTreeMutator]
"""A function that should be called by the driver as a context manager
whenever it wants to mutate the state of the device tree.
"""
log: Logger
"""Logger to use to write log messages."""
mandatory_custom_mode: Optional[int]
"""Custom mode to switch drones to when they are seen for the first time."""
run_in_background: Callable[[Callable[[], Awaitable[None]]], None]
"""A function that should be called by the driver whenever it wants to
run an asynchronous function in the background.
"""
send_packet: PacketSenderFn
"""A function that should be called by the driver whenever it wants to send
a packet. The function must be called with the packet to send, and a pair
formed by the medium via which the packet should be forwarded and the
destination address in that medium.
"""
use_bulk_parameter_uploads: bool = False
"""Whether to use bulk parameter uploads instead of individual uploads if
the autopilot supports bulk uploads.
"""
def __init__(self, app=None):
"""Constructor.
Parameters:
app: the app in which the driver lives
"""
super().__init__()
self.app = app # type: ignore
self.autopilot_factory = None
self.broadcast_packet = None # type: ignore
self.create_device_tree_mutator = None # type: ignore
self.log = None # type: ignore
self.mandatory_custom_mode = None
self.run_in_background = None # type: ignore
self.send_packet = None # type: ignore
self._default_timeout = 2
self._default_retries = 10
self._default_delay = 0.1
async def broadcast_command_long_with_retries(
self,
command_id: int,
param1: float = 0,
param2: float = 0,
param3: float = 0,
param4: float = 0,
param5: float = 0,
param6: float = 0,
param7: float = 0,
*,
timeout: Optional[float] = None,
retries: Optional[int] = None,
delay: Optional[float] = None,
channel: str = Channel.PRIMARY,
) -> None:
"""Broadcasts a MAVLink command to all UAVs reachable on a given
communication channel a given number of times, without waiting for an
acknowledgment.
Due to the broadcasting nature of this command, we cannot wait for
acknowledgments as we do not know how many acknowledgments we are
expecting.
Parameters:
target: the UAV to send the command to
param1: the first parameter of the command
param2: the second parameter of the command
param3: the third parameter of the command
param4: the fourth parameter of the command
param5: the fifth parameter of the command
param6: the sixth parameter of the command
param7: the seventh parameter of the command
timeout: timeout in seconds for _sending_ the command; `None` means
to use the default timeout for the driver
retries: number of times the command will be sent, no matter whether
a response is received or not; `None` means to use the default
retry count for the driver
delay: number of seconds to wait between sending attempts; `None`
means to use the default setting for the driver
channel: the channel to send the command on
Raises:
NotSupportedError: if the driver does not support broadcasting
"""
if self.broadcast_packet is None:
raise NotSupportedError("This driver does not support broadcasting")
if timeout is None or timeout <= 0:
timeout = self._default_timeout
if retries is None or retries <= 0:
retries = self._default_retries
if delay is None or delay < 0:
delay = self._default_delay
tried, sent = 0, 0
while tried < retries:
try:
with fail_after(timeout):
message = spec.command_long(
command=command_id,
param1=param1,
param2=param2,
param3=param3,
param4=param4,
param5=param5,
param6=param6,
param7=param7,
confirmation=0,
target_system=0,
target_component=0,
)
tried += 1
await self.broadcast_packet(message, channel=channel)
sent += 1
except TooSlowError:
pass
if delay > 0:
await sleep(delay)
if sent < tried:
if sent > 1:
self.log.warning(
f"Tried to send broadcast command {tried} times but only "
f"{sent} were successful"
)
elif sent > 0:
self.log.warning(
f"Tried to send broadcast command {tried} times but only "
f"one was successful"
)
elif tried > 1:
self.log.warning(
f"Tried to send broadcast command {tried} times but none "
f"were successful"
)
else:
self.log.warning("Failed to send broadcast command")
def create_uav(self, id: str) -> "MAVLinkUAV":
"""Creates a new UAV that is to be managed by this driver.
Parameters:
id: the identifier of the UAV to create
Returns:
MAVLinkUAV: an appropriate MAVLink UAV object
"""
assert self.app is not None
uav = MAVLinkUAV(id, driver=self)
uav.notify_updated = partial(self.app.request_to_send_UAV_INF_message_for, [id])
uav.send_log_message_to_gcs = partial(
self.app.request_to_send_SYS_MSG_message, sender=id
)
return uav
def get_time_boot_ms(self) -> int:
"""Returns a monotonic "time since boot" timestamp in milliseconds that
can be used in MAVLink messages.
"""
return int(monotonic() * 1000)
handle_command_calib = create_calibration_command_handler(
("accel", "baro", "compass", "gyro", "level")
)
handle_command_color = create_color_command_handler()
handle_command_param = create_parameter_command_handler(
name_validator=to_uppercase_string
)
handle_command_test = create_test_command_handler(
("camera", "motor", "led", "pyro")
)
handle_command_version = create_version_command_handler()
async def handle_command_mode(self, uav: "MAVLinkUAV", mode: Optional[str] = None):
"""Returns or sets the (custom) flight mode of the UAV.
Parameters:
mode: the name of the mode to set
"""
if mode is None:
return getattr(uav.status, "mode", "unknown mode")
else:
await uav.set_mode(mode)
return f"Mode changed to {mode!r}"
async def handle_command_servo(
self, uav: "MAVLinkUAV", servo: Union[int, str], value: Union[int, str]
):
"""Sets the value of a servo channel on the UAV.
Parameters:
servo: the servo channel to set (1-based)
value: the value to set for the servo channel as a raw PWM value
"""
servo = int(servo)
value = int(value)
await uav.set_servo(servo, value)
return f"Servo {servo} set to {value}"
async def handle_command_show(
self, uav: "MAVLinkUAV", command: Optional[str] = None
):
"""Allows the user to remove the current show file.
Parameters:
command: must be 'remove' to remove the current show
"""
if command is None:
raise RuntimeError(
"Missing subcommand; add 'remove' to remove the current show."
)
elif command in ("clear", "remove"):
await uav.remove_show()
return "Show removed."
else:
raise RuntimeError(f"Unknown subcommand: {command!r}")
async def handle_command___show_upload(self, uav: "MAVLinkUAV", *, show):
"""Handles a drone show upload request for the given UAV.
This is a temporary solution until we figure out something that is
more sustainable in the long run.
Parameters:
show: the show data
"""
try:
await uav.upload_show(show)
except TooSlowError as ex:
self.log.error(str(ex), extra={"id": log_id_for_uav(uav)})
raise
except Exception as ex:
self.log.error(str(ex), extra={"id": log_id_for_uav(uav)})
raise
async def send_command_int(
self,
target: "MAVLinkUAV",
command_id: int,
param1: float = 0,
param2: float = 0,
param3: float = 0,
param4: float = 0,
x: int = 0,
y: int = 0,
z: float = 0,
*,
frame: MAVFrame = MAVFrame.GLOBAL,
timeout: Optional[float] = None,
retries: Optional[int] = None,
) -> bool:
"""Sends a MAVLink command to a given UAV and waits for an acknowlegment.
Parameters 5 and 6 are integers; everything else is a float.
Parameters:
target: the UAV to send the command to
param1: the first parameter of the command
param2: the second parameter of the command
param3: the third parameter of the command
param4: the fourth parameter of the command
x: the fifth parameter of the command
y: the sixth parameter of the command
z: the seventh parameter of the command
frame: the reference frame of the coordinates transmitted in the command
timeout: command timeout in seconds; `None` means to use the default
timeout for the driver. Retries will be attempted if no response
arrives to the command within the given time interval
retries: maximum number of retries for the command (not counting the
initial attempt); `None` means to use the default retry count
for the driver.
Returns:
True if the command was sent successfully and a positive acknowledgment
(`MAV_RESULT_ACCEPTED`) was received in time, False if the command
was sent successfully and the response was not `MAV_RESULT_ACCEPTED`
and not `MAV_RESULT_UNSUPPORTED`. An exception will be thrown if no
acknowledgment is received in time (even after resending).
Raises:
TooSlowError: if the UAV failed to respond in time, even after
re-sending the command as needed
NotSupportedError: if the command is not supported by the UAV (i.e.
we received a response with `MAV_RESULT_UNSUPPORTED`)
"""
if timeout is None or timeout <= 0:
timeout = self._default_timeout
if retries is None or retries < 0:
retries = self._default_retries
result = None
while retries >= 0:
try:
with fail_after(timeout):
message = spec.command_int(
frame=frame,
command=command_id,
current=0,
autocontinue=0,
param1=param1,
param2=param2,
param3=param3,
param4=param4,
x=x,
y=y,
z=z,
)
response = await self.send_packet(
message,
target,
wait_for_response=("COMMAND_ACK", {"command": command_id}),
)
assert response is not None
result = response.result
break
except TooSlowError:
retries -= 1
if result is None:
raise TooSlowError(f"No response received for command {command_id} in time")
if result == MAVResult.UNSUPPORTED:
raise NotSupportedError
return result == MAVResult.ACCEPTED
async def send_command_long(
self,
target: "MAVLinkUAV",
command_id: int,
param1: float = 0,
param2: float = 0,
param3: float = 0,
param4: float = 0,
param5: float = 0,
param6: float = 0,
param7: float = 0,
*,
timeout: Optional[float] = None,
retries: Optional[int] = None,
channel: str = Channel.PRIMARY,
allow_in_progress: bool = False,
) -> bool:
"""Sends a MAVLink command to a given UAV and waits for an acknowlegment.
Parameters:
target: the UAV to send the command to
param1: the first parameter of the command
param2: the second parameter of the command
param3: the third parameter of the command
param4: the fourth parameter of the command
param5: the fifth parameter of the command
param6: the sixth parameter of the command
param7: the seventh parameter of the command
timeout: command timeout in seconds; `None` means to use the default
timeout for the driver. Retries will be attempted if no response
arrives to the command within the given time interval
retries: maximum number of retries for the command (not counting the
initial attempt); `None` means to use the default retry count
for the driver.
channel: the channel to send the command on
allow_in_progress: whether to treat MAVResult.IN_PROGRESS as a
result value indicating success.
Returns:
True if the command was sent successfully and a positive acknowledgment
(`MAV_RESULT_ACCEPTED`) was received in time, False if the command
was sent successfully and the response was not `MAV_RESULT_ACCEPTED`
and not `MAV_RESULT_UNSUPPORTED`. An exception will be thrown if no
acknowledgment is received in time (even after resending).
Raises:
TooSlowError: if the UAV failed to respond in time, even after
re-sending the command as needed
NotSupportedError: if the command is not supported by the UAV (i.e.
we received a response with `MAV_RESULT_UNSUPPORTED`)
"""
if channel != Channel.PRIMARY:
# We allow and expect ACKs only on the primary channel; the backup
# channel is assumed to be one-way only so we fall back to the
# non-ACKed version
await self.send_command_long_without_ack(
target,
command_id,
param1,
param2,
param3,
param4,
param5,
param6,
param7,
timeout=timeout,
channel=channel,
)
# Pretend that we have received an ACK
return True
if timeout is None or timeout <= 0:
timeout = self._default_timeout
if retries is None or retries < 0:
retries = self._default_retries
confirmation = 0
result = None
while retries >= 0:
try:
with fail_after(timeout):
message = spec.command_long(
command=command_id,
param1=param1,
param2=param2,
param3=param3,
param4=param4,
param5=param5,
param6=param6,
param7=param7,
confirmation=confirmation,
)
response = await self.send_packet(
message,
target,
wait_for_response=("COMMAND_ACK", {"command": command_id}),
channel=channel,
)
assert response is not None
result = response.result
break
except TooSlowError:
retries -= 1
confirmation = 1
if result is None:
raise TooSlowError(f"No response received for command {command_id} in time")
if result == MAVResult.UNSUPPORTED:
raise NotSupportedError
return result == MAVResult.ACCEPTED or (
allow_in_progress and result == MAVResult.IN_PROGRESS
)
async def send_command_long_without_ack(
self,
target: "MAVLinkUAV",
command_id: int,
param1: float = 0,
param2: float = 0,
param3: float = 0,
param4: float = 0,
param5: float = 0,
param6: float = 0,
param7: float = 0,
*,
timeout: Optional[float] = None,
channel: str = Channel.PRIMARY,
) -> None:
"""Sends a MAVLink command to a given UAV, without waiting for an
acknowledgment.
This function may be useful in one-way radio links where the UAV has
no way to respond.
Parameters:
target: the UAV to send the command to
param1: the first parameter of the command
param2: the second parameter of the command
param3: the third parameter of the command
param4: the fourth parameter of the command
param5: the fifth parameter of the command
param6: the sixth parameter of the command
param7: the seventh parameter of the command
timeout: timeout in seconds for _sending_ the command; `None` means
to use the default timeout for the driver
channel: the channel to send the command on
Raises:
TooSlowError: if the link to the UAV failed to send the command in
time, even after re-sending the command as needed
"""
if timeout is None or timeout <= 0:
timeout = self._default_timeout
sent = False
try:
with fail_after(timeout):
message = spec.command_long(
command=command_id,
param1=param1,
param2=param2,
param3=param3,
param4=param4,
param5=param5,
param6=param6,
param7=param7,
confirmation=0,
)
await self.send_packet(message, target, channel=channel)
sent = True
except TooSlowError:
pass
if not sent:
raise TooSlowError(f"failed to send command {command_id} in time")
async def send_packet_with_retries(
self,
spec,
target,
*,
wait_for_response=None,
wait_for_one_of=None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
channel: str = Channel.PRIMARY,
) -> MAVLinkMessage:
"""Sends a packet to the given target UAV, waiting for a matching reply
packet and re-sending the packet a given number of times when no
response arrives in time.
Parameters:
spec: the specification of the MAVLink message to send
target: the UAV to send the message to
wait_for_response: when not `None`, specifies a MAVLink message to
wait for as a response. The message specification will be
matched with all incoming MAVLink messages that have the same
type as the type in the specification; all parameters of the
incoming message must be equal to the template specified in
this argument to accept it as a response. The source system of
the MAVLink message must also be equal to the system ID of the
UAV where this message was sent.
timeout: timeout in seconds; `None` means to use the default
timeout for the driver. Retries will be attempted if no response
arrives to the packet within the given time interval
retries: maximum number of retries for the packet (not counting the
initial attempt); `None` means to use the default retry count
for the driver.
channel: the channel to send the packet on
Raises:
TooSlowError: if the UAV failed to respond in time
"""
if timeout is None or timeout <= 0:
timeout = self._default_timeout
if retries is None or retries < 0:
retries = self._default_retries
response = None
while retries >= 0:
try:
with fail_after(timeout):
if wait_for_response is not None:
response = await self.send_packet(
spec,
target,
wait_for_response=wait_for_response,
channel=channel,
)
elif wait_for_one_of is not None:
response = await self.send_packet(
spec,
target,
wait_for_one_of=wait_for_one_of,
channel=channel,
)
else:
raise RuntimeError(
"At least one of 'wait_for_response' and 'wait_for_one_of' "
"must be provided"
)
break
except TooSlowError:
retries -= 1
if response is None:
raise TooSlowError("No response received for the outbound packet in time")
return response
async def _enter_low_power_mode_broadcast(
self, *, transport: Optional[TransportOptions] = None
) -> None:
channel = transport_options_to_channel(transport)
await self.broadcast_command_long_with_retries(
MAVCommand.PREFLIGHT_REBOOT_SHUTDOWN,
param1=126, # request low power mode
channel=channel,
)
async def _enter_low_power_mode_single(
self, uav: "MAVLinkUAV", *, transport: Optional[TransportOptions] = None
) -> None:
# Effectively the same as shutdown, but without the attempt to stop the
# motors (so drones where the motors are running will not be affected)
channel = transport_options_to_channel(transport)
if not await self.send_command_long(
uav,
MAVCommand.PREFLIGHT_REBOOT_SHUTDOWN,
126, # request low power mode
channel=channel,
):
raise RuntimeError("Failed to request low-power mode from autopilot")
async def get_log(
self, uav: "MAVLinkUAV", log_id: str
) -> ProgressEvents[Optional[FlightLog]]:
try:
log_number = int(log_id)
except ValueError:
raise RuntimeError(f"Invalid log ID: {log_id!r}") from None
async for maybe_log_or_progress in uav.log_downloader.get_log(log_number):
if maybe_log_or_progress is None:
raise RuntimeError("No log with the given ID: {log_number!r}")
else:
yield maybe_log_or_progress
async def _get_log_list_single(self, uav: "MAVLinkUAV") -> list[FlightLogMetadata]:
return await uav.log_downloader.get_log_list()
async def _get_parameter_single(self, uav: "MAVLinkUAV", name: str) -> float:
return await uav.get_parameter(name)
def _request_preflight_report_single(self, uav: "MAVLinkUAV") -> PreflightCheckInfo:
return uav.preflight_status
def _request_version_info_single(self, uav: "MAVLinkUAV") -> VersionInfo:
return uav.get_version_info()
async def _resume_from_low_power_mode_broadcast(
self, *, transport: Optional[TransportOptions] = None
) -> None:
# This is not supported by standard MAVLink so it relies on a custom
# protocol extension
channel = transport_options_to_channel(transport)
await self.broadcast_command_long_with_retries(
MAVCommand.PREFLIGHT_REBOOT_SHUTDOWN,
param1=127, # resume autopilot
channel=channel,
)
# TODO(ntamas): shall we notify all the UAVs that they are about to
# be resumed (i.e. _notify_rebooted_by_us())?
async def _resume_from_low_power_mode_single(
self, uav: "MAVLinkUAV", *, transport: Optional[TransportOptions] = None
) -> None:
# This is not supported by standard MAVLink so it relies on a custom
# protocol extension
channel = transport_options_to_channel(transport)
if not await self.send_command_long(
uav,
MAVCommand.PREFLIGHT_REBOOT_SHUTDOWN,
127, # resume autopilot
channel=channel,
):
raise RuntimeError("Failed to wake up autopilot from low-power mode")
uav._notify_rebooted_by_us()
async def _send_fly_to_target_signal_single(
self, uav: "MAVLinkUAV", target: GPSCoordinate
) -> None:
await uav.fly_to(target)
async def _send_hover_signal_broadcast(self, *, transport=None) -> None:
channel = transport_options_to_channel(transport)
# TODO(ntamas): HACK HACK HACK This won't work for a PixHawk as we are
# hardcoding the ArduCopter mode numbers here. If we wanted to do this
# properly, we would not be able to broadcast because different UAVs
# may have different autopilots and the mode numbers might be different.
base_mode, mode, submode = MAVModeFlag.CUSTOM_MODE_ENABLED, 16, 0
await self.broadcast_command_long_with_retries(
MAVCommand.DO_SET_MODE,
param1=float(base_mode),
param2=float(mode),
param3=float(submode),
channel=channel,
)
async def _send_hover_signal_single(
self, uav: "MAVLinkUAV", *, transport=None
) -> None:
channel = transport_options_to_channel(transport)
await uav.set_mode("pos hold", channel=channel)
async def _send_landing_signal_broadcast(self, *, transport=None) -> None:
channel = transport_options_to_channel(transport)
await self.broadcast_command_long_with_retries(
MAVCommand.NAV_LAND, channel=channel
)
async def _send_landing_signal_single(
self, uav: "MAVLinkUAV", *, transport=None
) -> None:
channel = transport_options_to_channel(transport)
success = await self.send_command_long(
uav, MAVCommand.NAV_LAND, channel=channel
)
if not success:
raise RuntimeError("Landing command failed")
async def _send_light_or_sound_emission_signal_broadcast(
self, signals: list[str], duration: int, *, transport=None
) -> None:
channel = transport_options_to_channel(transport)
if "light" in signals:
message = create_led_control_packet(broadcast=True)
await self.broadcast_packet(message, channel=channel)
async def _send_light_or_sound_emission_signal_single(
self, uav: "MAVLinkUAV", signals: list[str], duration: int, *, transport=None
) -> None:
channel = transport_options_to_channel(transport)
if "light" in signals:
message = create_led_control_packet()
await self.send_packet(message, uav, channel=channel)
async def _send_motor_start_stop_signal_broadcast(
self, start: bool, force: bool = False, *, transport=None
) -> None:
channel = transport_options_to_channel(transport)
await self.broadcast_command_long_with_retries(
MAVCommand.COMPONENT_ARM_DISARM,
1 if start else 0,
FORCE_MAGIC if force else 0,
channel=channel,
)
async def _send_motor_start_stop_signal_single(
self, uav: "MAVLinkUAV", start: bool, force: bool = False, *, transport=None
) -> None:
channel = transport_options_to_channel(transport)
if not await self.send_command_long(
uav,
MAVCommand.COMPONENT_ARM_DISARM,
1 if start else 0,
FORCE_MAGIC if force else 0,
channel=channel,
):
raise RuntimeError(
"Failed to arm motors" if start else "Failed to disarm motors"
)
async def _send_reset_signal_broadcast(self, component, *, transport=None) -> None:
channel = transport_options_to_channel(transport)
if not component:
# Resetting all the UAVs, this is supported
await self.broadcast_command_long_with_retries(
MAVCommand.PREFLIGHT_REBOOT_SHUTDOWN,
1, # reboot autopilot
channel=channel,
)
# TODO(ntamas): shall we notify all the UAVs that they are about to
# be rebooted (i.e. _notify_rebooted_by_us())?
else:
# No per-component resets are implemented yet
raise RuntimeError(f"Resetting {component!r} is not supported")
async def _send_reset_signal_single(
self, uav: "MAVLinkUAV", component: str, *, transport=None
) -> None:
channel = transport_options_to_channel(transport)
if not component:
# Resetting the whole UAV, this is supported
await uav.reboot(channel=channel)
else:
# No per-component resets are implemented on this UAV yet
raise RuntimeError(f"Resetting {component!r} is not supported")
async def _send_return_to_home_signal_broadcast(self, *, transport=None) -> None:
channel = transport_options_to_channel(transport)
await self.broadcast_command_long_with_retries(
MAVCommand.NAV_RETURN_TO_LAUNCH, channel=channel
)
async def _send_return_to_home_signal_single(
self, uav: "MAVLinkUAV", *, transport=None
) -> None:
channel = transport_options_to_channel(transport)
success = await self.send_command_long(
uav, MAVCommand.NAV_RETURN_TO_LAUNCH, channel=channel
)
if not success:
raise RuntimeError("Return to home command failed")
async def _send_shutdown_signal_broadcast(self, *, transport=None) -> None:
channel = transport_options_to_channel(transport)
await self._send_motor_start_stop_signal_broadcast(
start=False, force=True, transport=transport
)
await self.broadcast_command_long_with_retries(
MAVCommand.PREFLIGHT_REBOOT_SHUTDOWN,
2, # shutdown autopilot
param6=RebootShutdownConditions.FORCE,
channel=channel,
)
async def _send_shutdown_signal_single(
self, uav: "MAVLinkUAV", *, transport=None
) -> None:
channel = transport_options_to_channel(transport)
await self._send_motor_start_stop_signal_single(
uav, start=False, force=True, transport=transport
)
if not await self.send_command_long(
uav,
MAVCommand.PREFLIGHT_REBOOT_SHUTDOWN,
2, # shutdown autopilot
param6=RebootShutdownConditions.FORCE,
channel=channel,
):
raise RuntimeError("Failed to send shutdown command to autopilot")
async def _send_takeoff_signal_single(
self, uav: "MAVLinkUAV", *, scheduled: bool = False, transport=None
) -> None:
if scheduled:
# Ignore this; scheduled takeoffs are managed by the ScheduledTakeoffManager
return
channel = transport_options_to_channel(transport)
await self._send_motor_start_stop_signal_single(
uav, start=True, transport=transport
)
# Wait a bit to give the autopilot some time to start the motors, just
# in case. Not sure whether this is needed.
await sleep(0.1)
# Send the takeoff command
await uav.takeoff_to_relative_altitude(2.5, channel=channel)
async def _set_parameter_single(
self, uav: "MAVLinkUAV", name: str, value: Any
) -> None:
try:
value_as_float = float(value)
except ValueError:
raise RuntimeError(f"Value of parameter {name!r} must be numeric") from None
await uav.set_parameter(name, value_as_float)
async def _set_parameters_single(
self, uav: "MAVLinkUAV", parameters: dict[str, Any]
) -> ProgressEvents[BulkParameterUploadResponse]:
if self.use_bulk_parameter_uploads:
parameters_as_float = {}
for name, value in parameters.items():
try:
value_as_float = float(value)
except ValueError:
raise RuntimeError(
f"Value of parameter {name!r} must be numeric"
) from None
parameters_as_float[name] = value_as_float
try:
await uav.set_parameters(parameters_as_float)
except Exception:
if self.log:
self.log.exception("Failed to set parameters")
yield {"success": False}
else:
yield {"success": True}
else:
async for event in super()._set_parameters_single(uav, parameters):
yield event
@dataclass
class MAVLinkMessageRecord:
"""Simple object holding a pair of a MAVLink message and the corresponding
monotonic timestamp when the message was observed.
"""
message: MAVLinkMessage = None
timestamp: float = 0
@property
def age(self) -> float:
"""Returns the number of seconds elapsed since the record was updated
the last time.
"""
return monotonic() - self.timestamp
def update(self, message: MAVLinkMessage) -> None:
"""Updates the record with a new MAVLink message."""
self.message = message
self.timestamp = monotonic()
class MAVLinkUAV(UAVBase[MAVLinkDriver]):
"""Subclass for UAVs created by the driver for MAVLink-based drones."""
notify_updated: Callable[[], None]
send_log_message_to_gcs: GCSLogMessageSender
_accelerometer_calibration: Optional[AccelerometerCalibration] = None
"""Accelerometer calibration status of the drone, constructed lazily.
Use the `accelerometer_calibration` getter to access this property; this
will ensure that the accelerometer calibration status object is created
on-demand.
"""
_battery: BatteryInfo
"""Battery status of the drone"""
_compass_calibration: Optional[CompassCalibration] = None
"""Compass calibration status of the drone, constructed lazily.
Use the `compass_calibration` getter to access this property; this will
ensure that the compass calibration status object is created on-demand.
"""
_connected_event: Event
"""Event that is emitted when the connection state of the UAV becomes
connected.
"""
_connection_state: ConnectionState = ConnectionState.DISCONNECTED
"""State of the connection to this drone."""
_gps_fix: GPSFix
"""Current GPS fix status and position accuracy of the drone."""
_last_messages: defaultdict[int, MAVLinkMessageRecord]
"""Stores a mapping of each MAVLink message type received from the drone
to the most recent copy of the message of that type. Some of these
records may be cleared when we don't detect a heartbeat from the
drone any more.
"""
_last_skybrush_status_info: Optional[DroneShowStatus] = None
_log_downloader: Optional[MAVLinkLogDownloader] = None
"""Log downloader for the drone, constructed lazily.
Use the `log_downloader` getter to access this property; this will
ensure that the log downloader object is created on-demand.
"""
_network_id: str = ""
"""Stores the MAVLink network ID of the drone (not part of the MAVLink
messages; used by us to track which MAVLink network of ours the
drone belongs to).
"""
_system_id: int = 0
"""MAVLink system ID of the drone. Zero if unspecified or unknown."""
_autopilot: Autopilot
"""Model of the autopilot used by this drone"""
_battery: BatteryInfo
"""Battery status of the drone"""
_configuring_data_streams: bool = False
"""Stores whether we are currently configuring the data stream rates for
the drone. Used to avoid multiple parallel configuration attempts.
"""
_first_connection: bool = True
"""Stores whether we are connecting to the drone for the first time;
used to prevent a "probably rebooted warning" for the first connection.
"""
_gps_fix: GPSFix
"""Current GPS fix status of the drone"""
_last_autopilot_capabilities_requested_at: Optional[float] = None
"""Stores the time when we attempted to retrieve the autopilot capabilities
for the last time. Used to avoid frequent requests.
"""
_last_data_stream_configuration_attempted_at: Optional[float] = None
"""Stores the time when we attempted to configure the data streams for
the last time. Used to avoid frequent reconfiguration attempts.
"""
_last_skybrush_status_info: Optional[DroneShowStatus] = None
"""The last Skybrush-specific status packet received from the UAV if it ever
sent one.
"""
_preflight_status: PreflightCheckInfo
"""The status of the preflight checks on the drone"""
_position: GPSCoordinate
"""The current global position of the drone"""
_rssi_mode: RSSIMode
"""Specifies how the RSSI values of the communication channels of the drone
should be calculated.
"""
_scheduled_takeoff_authorization_scope: AuthorizationScope = AuthorizationScope.NONE
"""The current authorization scope of the scheduled takeoff of the drone."""
_scheduled_takeoff_time: Optional[int] = None
"""Scheduled takeoff time of the drone, as a UNIX timestamp, in seconds"""
_scheduled_takeoff_time_gps_time_of_week: Optional[int] = None
"""Scheduled takeoff time of the drone, as a GPS time-of-week timestamp,
in seconds"""
_velocity: VelocityNED
"""Current velocity of the drone in NED coordinate system, m/sec"""
def __init__(self, *args, **kwds):
super().__init__(*args, **kwds)
make_autopilot = self.driver.autopilot_factory or UnknownAutopilot
self._autopilot = make_autopilot()
self._battery = BatteryInfo()
self._connected_event = Event()
self._gps_fix = GPSFix()
self._last_messages = defaultdict(MAVLinkMessageRecord) # type: ignore
self._preflight_status = PreflightCheckInfo()
self._position = GPSCoordinate()
self._rssi_mode = RSSIMode.NONE
self._velocity = VelocityNED()
self.notify_updated = None # type: ignore
self.send_log_message_to_gcs = nop
self._reset_mavlink_version()
def assign_to_network_and_system_id(self, network_id: str, system_id: int) -> None:
"""Assigns the UAV to the MAVLink network with the given network ID.
The UAV is assumed to have the given system ID in the given network, and
it is assumed to have a component ID of 1 (primary autopilot). We are
not talking to any other component of a MAVLink system yet.
"""
if self._network_id:
raise RuntimeError(
f"This UAV is already a member of MAVLink network {self._network_id}"
)
elif not network_id:
raise RuntimeError("MAVLink network ID may not be empty")
self._network_id = network_id
self._system_id = system_id
async def calibrate_accelerometer(self) -> ProgressEventsWithSuspension[None, str]:
"""Calibrates the accelerometers of the UAV.
Yields:
events describing the progress of the calibration
Raises:
NotSupportedError: if the accelerometer calibration is not supported
on the UAV
"""
try:
async for event in self._autopilot.calibrate_accelerometer(self): # type: ignore
yield event
except NotImplementedError:
# Turn NotImplementedError from the autopilot into a NotSupportedError
raise NotSupportedError from None
async def calibrate_compass(self) -> ProgressEventsWithSuspension[None, str]:
"""Calibrates the compasses of the UAV.
Yields:
events describing the progress of the calibration
Raises:
NotSupportedError: if the compass calibration is not supported on
the UAV
"""
try:
async for event in self._autopilot.calibrate_compass(self): # type: ignore
yield event
except NotImplementedError:
# Turn NotImplementedError from the autopilot into a NotSupportedError
raise NotSupportedError from None
async def calibrate_component(
self, component: str
) -> ProgressEventsWithSuspension[None, str]:
"""Calibrates a component of the UAV.
Parameters:
component: the component to calibrate; currently we support
``accel``, ``baro``, ``compass``, ``gyro`` or ``level``.
Raises:
NotSupportedError: if the calibration of the given component is not
supported on this UAV
RuntimeError: if the UAV rejected to calibrate the component
"""
if component == "accel":
# Acceleration calibration takes a long time, needs user input and
# involves progress handling so that's handled in a separate function
async for event in self.calibrate_accelerometer():
yield event
return
if component == "compass":
# Compass calibration takes a long time and involves progress
# handling so that's handled in a separate function
async for event in self.calibrate_compass():
yield event
return
# all other calibration procedures are handled with a single command
params = [0] * 7
if component == "baro":
params[2] = 1
elif component == "gyro":
params[0] = 1
elif component == "level":
params[4] = 2
else:
raise NotSupportedError
# In ArduCopter 4.5 (and maybe in other versions), the baro calibration
# returns MAVResult.IN_PROGRESS if there is an airspeed sensor, which
# we need to treat as a successful calibration.
success = await self.driver.send_command_long(
self,
MAVCommand.PREFLIGHT_CALIBRATION,
*params,
allow_in_progress=(component == "baro"),
)
if not success:
raise RuntimeError(f"Failed to calibrate component: {component!r}")
def can_handle_firmware_update_target(self, target_id: str) -> bool:
"""Returns whether the virtual UAV can handle uploads with the given
target.
"""
return self._autopilot.can_handle_firmware_update_target(target_id)
async def clear_scheduled_takeoff_time(self) -> None:
"""Clears the scheduled takeoff time of the UAV."""
await self.set_scheduled_takeoff_time(None)
async def configure_geofence(
self, configuration: GeofenceConfigurationRequest
) -> None:
"""Configures the geofence on the UAV."""
return await self._autopilot.configure_geofence(self, configuration)
async def configure_safety(self, configuration: SafetyConfigurationRequest) -> None:
"""Configures the safety features on the UAV."""
return await self._autopilot.configure_safety(self, configuration)
def get_age_of_message(self, type: int, now: Optional[float] = None) -> float:
"""Returns the number of seconds elapsed since we have last seen a
message of the given type.
"""
record = self._last_messages.get(int(type))
if now is None:
now = monotonic()
return now - record.timestamp if record else inf
async def get_geofence_status(self) -> GeofenceStatus:
"""Returns the status of the geofence of the UAV."""
return await self._autopilot.get_geofence_status(self)
def get_last_message(self, type: int) -> Optional[MAVLinkMessage]:
"""Returns the last MAVLink message that was observed with the given
type or `None` if we have not observed such a message yet.
"""
record = self._last_messages.get(int(type))
return record.message if record else None
async def get_parameter(self, name: str, fetch: bool = False) -> float:
"""Returns the value of a parameter from the UAV.
Due to the nature of the MAVLink protocol, we will not be able to
detect if a parameter does not exist as there will be no reply from
the drone -- which is indistinguishable from a lost packet.
"""
response = await self._get_parameter(name)
return self._autopilot.decode_param_from_wire_representation(
response.param_value, response.param_type
)
async def _get_parameter(self, name: str) -> MAVLinkMessage:
"""Retrieves the value of a parameter from the UAV and returns a
MAVLink message encapsulating the name, index, value and type of
the parameter.
"""
param_id = name.encode("utf-8")[:16]
return await self.driver.send_packet_with_retries(
spec.param_request_read(param_id=param_id, param_index=-1),
target=self,
wait_for_response=spec.param_value(param_id=param_id),
timeout=0.7,
)
def get_version_info(self) -> VersionInfo:
"""Returns a dictionary mapping component names of this UAV to the
corresponding version numbers.
"""
version_info = self.get_last_message(MAVMessageType.AUTOPILOT_VERSION)
result = {}
for version in ("flight", "middleware", "os"):
if getattr(version_info, f"{version}_sw_version", 0) > 0:
result[f"{version}_sw"] = mavlink_version_number_to_semver(
getattr(version_info, f"{version}_sw_version", 0),
getattr(version_info, f"{version}_custom_version", None),
)
if version_info is not None and version_info.board_version > 0:
result["board"] = mavlink_version_number_to_semver(
version_info.board_version
)
return result
async def flash_led(self, *, channel: str = Channel.PRIMARY) -> None:
"""Flashes the LED of the drone.
Parameters:
channel: the communication channel to send the command on
"""
message = create_led_control_packet()
await self.driver.send_packet(message, self, channel=channel)
async def fly_to(self, target: GPSCoordinate) -> None:
"""Sends a command to the UAV to reposition it to the given coordinate,
where the altitude may be specified in AMSL or AHL.
"""
if self._autopilot.supports_repositioning:
# Implementation of fly_to() with the MAVLink DO_REPOSITION command
await self._fly_to_with_repositioning(target)
else:
# Implementation of fly_to() with a guided mode command
await self._fly_to_in_guided_mode(target)
async def _fly_to_in_guided_mode(self, target: GPSCoordinate) -> None:
"""Implementation of `fly_to()` using a MAVLink
SET_POSITION_TARGET_GLOBAL_INT guided mode message.
"""
type_mask = (
PositionTargetTypemask.VX_IGNORE
| PositionTargetTypemask.VY_IGNORE
| PositionTargetTypemask.VZ_IGNORE
| PositionTargetTypemask.AX_IGNORE
| PositionTargetTypemask.AY_IGNORE
| PositionTargetTypemask.AZ_IGNORE
| PositionTargetTypemask.YAW_IGNORE
| PositionTargetTypemask.YAW_RATE_IGNORE
)
if target.amsl is None:
frame = MAVFrame.GLOBAL_RELATIVE_ALT_INT
if target.ahl is None:
# We cannot simply set Z_IGNORE in the type mask because that
# does not work with ArduCopter (it would ignore the whole
# position).
altitude = self.status.position.ahl
else:
altitude = target.ahl
else:
frame = MAVFrame.GLOBAL_INT
altitude = target.amsl
lat, lon = int(target.lat * 1e7), int(target.lon * 1e7)
message = spec.set_position_target_global_int(
time_boot_ms=self.driver.get_time_boot_ms(),
coordinate_frame=frame,
type_mask=type_mask,
# position
lat_int=lat,
lon_int=lon,
alt=altitude,
# velocity
vx=0,
vy=0,
vz=0,
# acceleration or force
afx=0,
afy=0,
afz=0,
# yaw
yaw=0,
yaw_rate=0,
)
# There is usually no confirmation for the guided mode command, unless
# the drone streams us the POSITION_TARGET_GLOBAL_INT message, which it
# does not necessarily do. So, we send the packet five times, with
# 200 msec between attempts, and wait for a matching POSITION_TARGET_GLOBAL_INT
# message, but we don't freak out if we don't get any response.
response = spec.position_target_global_int(
# position
lat_int=lat,
lon_int=lon,
# note that we don't check the altitude in the response because the
# position target feedback could come in AMSL or AHL
)
try:
await self.driver.send_packet_with_retries(
message, self, wait_for_response=response, timeout=0.2, retries=4
)
except TooSlowError:
# Maybe it's okay anyway, see comment above
pass
async def _fly_to_with_repositioning(self, target: GPSCoordinate) -> None:
"""Implementation of `fly_to()` using a MAVLink DO_REPOSITION command
with proper confirmation.
"""
# PX4 supports AMSL only so we always convert to AMSL; NaN means to
# hold the current altitude
if target.amsl is not None:
altitude = target.amsl
else:
altitude = (
self.convert_ahl_to_amsl(target.ahl) if target.ahl is not None else nan
)
lat, lon = int(target.lat * 1e7), int(target.lon * 1e7)
success = await self.driver.send_command_int(
self,
MAVCommand.DO_REPOSITION,
frame=MAVFrame.GLOBAL_INT,
param1=-1, # speed (default)
param2=0, # flags
param3=0, # reserved
param4=nan, # yaw mode
x=lat, # latitude
y=lon, # longitude
z=altitude, # altitude
)
if not success:
raise RuntimeError("Fly to waypoint command failed")
@property
def scheduled_takeoff_authorization_scope(self) -> AuthorizationScope:
"""Returns whether the UAV is authorized to do a scheduled takeoff and
if so, what the scope of the authorization is.
"""
return self._scheduled_takeoff_authorization_scope
@property
def scheduled_takeoff_time(self) -> Optional[int]:
"""Returns the scheduled takeoff time of the UAV as a UNIX timestamp
in seconds, truncated to an integer, or `None` if the UAV is not
scheduled for an automatic takeoff.
"""
return self._scheduled_takeoff_time
@property
def scheduled_takeoff_time_gps_time_of_week(self) -> Optional[int]:
"""Returns the scheduled takeoff time of the UAV as a GPS time of week
value, or `None` if the UAV is not scheduled for an automatic takeoff.
"""
return self._scheduled_takeoff_time_gps_time_of_week
async def _set_parameter_single(self, name: str, value: float) -> None:
"""Sets the value of a single parameter on the UAV.
This function assumes that all sanity checks on the name and the value
have already been performed by the caller.
"""
# We need to retrieve the current value of the parameter first because
# we need its type
param_id = name.encode("utf-8")[:16]
response = await self._get_parameter(name)
param_type = response.param_type
encoded_value = self._autopilot.encode_param_to_wire_representation(
value, param_type
)
try:
# Try to set the new parameter value. In normal circumstances, we
# will get a PARAM_VALUE message in response, with the same
# parameter ID. However, in ArduPilot, when SERIALx_OPTIONS bit 10
# is set for the primary telemetry channel, we will _not_ get
# PARAM_VALUE messages, at least not in ArduPilot 4.4. In this case,
# we make one final attempt to read the parameter value explicitly.
await self.driver.send_packet_with_retries(
spec.param_set(
param_id=param_id,
param_value=encoded_value,
param_type=param_type,
),
target=self,
wait_for_response=spec.param_value(param_id=param_id),
timeout=0.7,
)
except TooSlowError:
# This is where we try to recover
observed_value = await self.get_parameter(name)
if value != observed_value:
raise RuntimeError(
f"Failed to set parameter {name!r}, "
f"tried to set {value!r}, got {observed_value!r}"
) from None
async def set_parameter(self, name: str, value: float) -> None:
"""Sets the value of a single parameter on the UAV."""
return await self.set_parameters({name: value})
async def set_parameters(self, parameters: dict[str, float]) -> None:
"""Sets the value of multiple parameters on the UAV, preferably in a
more efficient manner if the autopilot of the drone supports MAVFTP
parameter uploads.
"""
if not parameters:
return
# Basic sanity check on the values
for name, value in parameters.items():
if not isfinite(value):
raise RuntimeError(f"Value of parameter {name!r} must be finite")
if len(parameters) > 1 and self._autopilot.supports_mavftp_parameter_upload:
# Do a bulk upload
async with aclosing(MAVFTP.for_uav(self)) as ftp:
filename, contents = self._autopilot.prepare_mavftp_parameter_upload(
parameters
)
# TODO(ntamas): handle error code when closing the file
await ftp.put(contents, filename, skip_crc_check=True)
else:
# No support for bulk uploads, or we only have a single parameter,
# so just do it one by one
for name, value in sorted(parameters.items()):
await self._set_parameter_single(name, value)
async def test_component(
self, component: str, *, channel: str = Channel.PRIMARY
) -> None:
"""Tests a component of the UAV.
Parameters:
component: the component to test; currently we support ``camera``,
``led``, ``motor`` and ``pyro``
channel: the communication channel to use when sending the command
"""
if component == "motor":
# Older versions of ArduCopter did not support the motor count
# parameter so let's just test all the motors one by one
heartbeat = self.get_last_message(MAVMessageType.HEARTBEAT)
motor_count = 4 if not heartbeat else MAVType(heartbeat.type).motor_count
for i in range(motor_count):
await self.driver.send_command_long(
self,
MAVCommand.DO_MOTOR_TEST,
i + 1, # motor instance number
float(MotorTestThrottleType.PERCENT),
15, # 15%
2, # timeout: 2 seconds
0, # 1 motor only
float(MotorTestOrder.DEFAULT),
channel=channel,
)
await sleep(3)
elif component == "led":
color_sequence = [
Color(name)
for name in "red lime blue yellow cyan magenta white".split()
]
for index, color in enumerate(color_sequence):
if index > 0:
await sleep(1)
await self.set_led_color(color, channel=channel, duration=2)
elif component == "pyro":
await self.start_pyro_test()
elif component == "camera":
await self.trigger_camera_shutter()
else:
raise NotSupportedError
async def handle_firmware_update(
self, target_id: str, blob: bytes
) -> AsyncIterator[Progress]:
async for event in self._autopilot.handle_firmware_update(
self, target_id, blob
):
yield event
self._clear_autopilot_capabilities()
def handle_message_autopilot_version(self, message: MAVLinkMessage):
"""Handles an incoming MAVLink AUTOPILOT_VERSION message targeted at
this UAV.
"""
self._autopilot = self._autopilot.refine_with_capabilities(message.capabilities)
self._store_message(message)
if self._mavlink_version < 2:
if message.capabilities & MAVProtocolCapability.MAVLINK2:
# Autopilot supports MAVLink 2 so switch to it
self._mavlink_version = 2
# The other side has to know that we have switched; we do it by
# sending it a REQUEST_AUTOPILOT_CAPABILITIES message again
self.driver.run_in_background(self._request_autopilot_capabilities)
else:
# MAVLink 2 not supported by the drone. We do not support MAVLink 1
# as most of the messages we use are MAVLink 2 only, so indicate
# a protocol error at this point and bail out.
self.ensure_error(FlockwaveErrorCode.AUTOPILOT_PROTOCOL_ERROR)
def handle_message_command_long(self, message: MAVLinkMessage):
if message.command == MAVCommand.ACCELCAL_VEHICLE_POS:
self.accelerometer_calibration.handle_message_accelcal_vehicle_pos(message)
else:
# Do not raise an exception here, otherwise it would be an easy way
# to crash the extension -- just send an unsupported COMMAND_LONG
pass
def handle_message_drone_show_status(self, message: MAVLinkMessage):
"""Handles an incoming drone show specific status message targeted at
this UAV.
"""
data = DroneShowStatus.from_mavlink_message(message)
self._last_skybrush_status_info = data
# Process the basic part of the packet that is always present (both with
# the standard and the compact telemetry profile)
self._update_gps_fix_type_and_satellite_count(data.gps_fix, data.num_satellites)
gps_start_time = data.start_time if data.start_time >= 0 else None
if gps_start_time != self._scheduled_takeoff_time_gps_time_of_week:
self._scheduled_takeoff_time_gps_time_of_week = gps_start_time
if gps_start_time is None:
self._scheduled_takeoff_time = None
else:
self._scheduled_takeoff_time = int(
gps_time_of_week_to_utc(gps_start_time).timestamp()
)
self._scheduled_takeoff_authorization_scope = data.authorization_scope
debug = data.message.encode("utf-8")
self._update_errors_from_drone_show_status_packet(data)
updates = {"light": data.light, "gps": self._gps_fix, "debug": debug}
if self._rssi_mode is RSSIMode.RTCM_COUNTERS:
updates["rssi"] = [
rtcm_counter_to_rssi(data.rtcm_counters[0]),
rtcm_counter_to_rssi(data.rtcm_counters[1]),
]
# If the status packet has an extended section, process the extended part
if data.extension:
extended_part = data.extension
# Process the missing information that the standard telemetry
# provides with the GLOBAL_POSITION_INT packet
self._position.lat = extended_part.lat
self._position.lon = extended_part.lng
self._position.amsl = extended_part.alt
self._position.ahl = extended_part.relative_alt
self._velocity.x = extended_part.vx
self._velocity.y = extended_part.vy
self._velocity.z = extended_part.vz
# Process the missing information that the standard telemetry
# provides with the GPS_RAW_INT packet
self._gps_fix.horizontal_accuracy = extended_part.h_acc
self._gps_fix.vertical_accuracy = extended_part.v_acc
updates["position"] = self._position
updates["velocity"] = self._velocity
updates["heading"] = extended_part.heading
self.update_status(**updates)
self.notify_updated()
def handle_message_heartbeat(self, message: MAVLinkMessage):
"""Handles an incoming MAVLink HEARTBEAT message targeted at this UAV."""
if self._mavlink_version < 2 and message.get_msgbuf()[0] == 253:
# Other side sent a MAVLink 2 heartbeat so we can switch to MAVLink
# 2 as well
self._mavlink_version = 2
# Get the age of the _last_ heartbeat, will be used later
age_of_last_heartbeat = self.get_age_of_message(MAVMessageType.HEARTBEAT)
# Store a copy of the heartbeat
self._store_message(message)
# Determine whether the heartbeat indicates that it makes sense trying
# to initiate communication with the UAV. Heartbeats that indicate that
# the UAV is powering down or is completely powered down will not trigger
# further communication with the UAV.
can_communicate = can_communicate_infer_from_heartbeat(message)
# If the heartbeat indicates that we might not be able to communicate
# with the drone yet (maybe the heartbeat is sent by the wifi module on
# behalf of the drone while the FC is off), let's not initiate
# communication but just keep track of whatever we've found in the
# heartbeat
if not can_communicate:
self._update_errors_from_inactive_state()
self.update_status(mode="off", gps=OurGPSFixType.NO_FIX)
self.notify_updated()
return
if self._connection_state is not ConnectionState.CONNECTED:
self.notify_reconnection(message)
# Do we already have basic information about the autopilot capabilities?
# If we don't, ask for them.
if not self.driver.autopilot_factory and not self.get_last_message(
MAVMessageType.AUTOPILOT_VERSION
):
now = monotonic()
if (
self._last_autopilot_capabilities_requested_at is None
or now - self._last_autopilot_capabilities_requested_at > 2
):
self._last_autopilot_capabilities_requested_at = now
self.driver.run_in_background(self._request_autopilot_capabilities)
# If we haven't received a SYS_STATUS message for a while but we keep
# on receiving heartbeats, chances are that the data streams are not
# configured correctly so we configure them.
#
# TODO(ntamas): This can be problematic with Skynet if it is deduplicating
# HEARTBEAT or SYS_STATUS messages that contain no change. Make sure that
# in these cases self.driver.assume_data_streams_configured is True
if (
not self.driver.assume_data_streams_configured
and age_of_last_heartbeat < 2
and self.get_age_of_message(MAVMessageType.SYS_STATUS) > 5
):
self._configure_data_streams_soon()
# Update error codes and basic status info if needed
if self._autopilot.is_duplicate_message(message):
self.touch_status()
else:
self._update_errors_from_sys_status_and_heartbeat()
self.update_status(
mode=self._autopilot.describe_mode(
message.base_mode, message.custom_mode, message.type
)
)
self.notify_updated()
def handle_message_global_position_int(self, message: MAVLinkMessage):
# TODO(ntamas): reboot detection with time_boot_ms
if abs(message.lat) <= 900000000:
self._position.lat = message.lat / 1e7
self._position.lon = message.lon / 1e7
self._position.amsl = message.alt / 1e3
self._position.ahl = message.relative_alt / 1e3
else:
# Some drones, such as the Parrot Bebop 2, use 2^31-1 as latitude
# and longitude to indicate that no GPS fix has been obtained yet,
# so treat any values outside the valid latitude range as invalid
self._position.lat = self._position.lon = self._position.amsl = (
self._position.ahl
) = 0
self._velocity.x = message.vx / 100
self._velocity.y = message.vy / 100
self._velocity.z = message.vz / 100
if abs(message.hdg) <= 36000:
heading = message.hdg / 100
else:
heading = 0
self.update_status(
position=self._position, velocity=self._velocity, heading=heading
)
self.notify_updated()
def handle_message_gps_raw_int(self, message: MAVLinkMessage):
num_sats = message.satellites_visible
self._gps_fix.horizontal_accuracy = (
message.h_acc / 1000.0 if message.h_acc > 0 else None
)
self._gps_fix.vertical_accuracy = (
message.v_acc / 1000.0 if message.v_acc > 0 else None
)
self._update_gps_fix_type_and_satellite_count(
GPSFixType(message.fix_type).to_ours(), num_sats if num_sats < 255 else None
)
self.update_status(gps=self._gps_fix)
self.notify_updated()
def handle_message_log_data(self, message: MAVLinkMessage):
self.log_downloader.handle_message_log_data(message)
def handle_message_log_entry(self, message: MAVLinkMessage):
self.log_downloader.handle_message_log_entry(message)
def handle_message_mag_cal_progress(self, message: MAVLinkMessage):
self.compass_calibration.handle_message_mag_cal_progress(message)
def handle_message_mag_cal_report(self, message: MAVLinkMessage):
self.compass_calibration.handle_message_mag_cal_report(message)
def handle_message_radio_status(self, message: MAVLinkMessage):
if self._rssi_mode is not RSSIMode.RADIO_STATUS:
return
# Limitations:
# - Currently we do not account for multiple connections; we always
# update the RSSI of the first connection. This is fine for the time
# being because we assume that the wifi connection (which provides
# the RSSI value) is the first connection, and the radio connection is
# not supposed to be bi-directional.
# - We handle the conventions of the MAVESP8266 firmware only.
# MAVESP8266 simply uses WiFi.RSSI(), which is a 8-bit signed int
# representing the signal level in dBm. We scale the value to a
# 0% - 100% range such that anything less than -100 dBm is 0% and
# anything larger than -50 dBm is 100%.
# - In AP mode, the MAVESP8266 firmware transmits 0 as the RSSI value,
# which is interpreted as 0 dBm here and translated to 200%, which then
# gets clamped to 100%.
# Unsigned-to-signed conversion to work around MAVLink data type issues
rssi_dbm = message.rssi if message.rssi <= 127 else message.rssi - 256
rssi = min(max(0, int((rssi_dbm + 100) * 2)), 100)
self.update_rssi(index=0, value=rssi)
def handle_message_sys_status(self, message: MAVLinkMessage):
self._store_message(message)
if self._autopilot.is_duplicate_message(message):
self.touch_status()
else:
self._update_errors_from_sys_status_and_heartbeat()
# Update battery status
if message.voltage_battery < 65535:
self._battery.voltage = message.voltage_battery / 1000
else:
self._battery.voltage = 0.0
if message.battery_remaining == -1:
self._battery.percentage = None
elif self._autopilot.is_battery_percentage_reliable:
self._battery.percentage = message.battery_remaining
else:
self._battery.percentage = None
self.update_status(battery=self._battery)
self.notify_updated()
def handle_message_system_time(self, message: MAVLinkMessage):
"""Handles an incoming MAVLink SYSTEM_TIME message targeted at this UAV."""
previous_message = self._get_previous_copy_of_message(message)
if previous_message:
# TODO(ntamas): compare the time since boot with the previous
# version to detect reboot events
pass
self._store_message(message)
@property
def accelerometer_calibration(self) -> AccelerometerCalibration:
"""State object of the accelerometer calibration procedure."""
if self._accelerometer_calibration is None:
self._accelerometer_calibration = AccelerometerCalibration()
return self._accelerometer_calibration
@property
def compass_calibration(self) -> CompassCalibration:
"""State object of the compass calibration procedure."""
if self._compass_calibration is None:
self._compass_calibration = CompassCalibration()
return self._compass_calibration
@property
def is_connected(self) -> bool:
"""Returns whether the UAV is connected to the ground station and we
have seen heartbeats from it recently (even if they indicated that the
drone is in a sleep state).
"""
return self._connection_state is not ConnectionState.DISCONNECTED
@property
def log_downloader(self) -> MAVLinkLogDownloader:
"""State object of the log download procedure."""
if self._log_downloader is None:
self._log_downloader = MAVLinkLogDownloader.for_uav(self)
return self._log_downloader
@property
def mavlink_version(self) -> int:
"""The MAVLink version supported by this UAV."""
return self._mavlink_version
@property
def network_id(self) -> str:
"""The network ID of the UAV."""
return self._network_id
@property
def system_id(self) -> int:
"""The system ID of the UAV."""
return self._system_id
def notify_disconnection(self) -> None:
"""Notifies the UAV state object that we have detected that it has been
disconnected from the network. In other words, the heartbeats from the
drone have ceased arriving.
"""
self._set_connection_state(ConnectionState.DISCONNECTED, None)
def _notify_rebooted_by_us(self) -> None:
"""Notifies the UAV state object that we have rebooted the UAV ourselves
and we should configure its data streams again soon once we re-establish
connection.
"""
self.driver.run_in_background(
delayed(1, self.notify_disconnection, ensure_async=True) # type: ignore[reportArgumentType]
)
def _reset_mavlink_version(self) -> None:
"""Resets the MAVLink protocol version used by messages sent to this
UAV to the default value.
Currently we assume that all the drones we are trying to talk to
support MAVLink 2, so we always reset to MAVLink 2.
"""
self._mavlink_version = 2
def notify_prearm_failure(self, message: str) -> None:
"""Notifies the UAV state object that a prearm check has failed."""
self._preflight_status.message = message
self._preflight_status.result = PreflightCheckResult.FAILURE
def notify_reconnection(self, heartbeat: MAVLinkMessage) -> None:
"""Notifies the UAV state object that it has been reconnected to the
network.
"""
if can_communicate_infer_from_heartbeat(heartbeat):
new_state = ConnectionState.CONNECTED
else:
new_state = ConnectionState.SLEEPING
self._set_connection_state(new_state, heartbeat)
@property
def preflight_status(self) -> PreflightCheckInfo:
return self._preflight_status
async def reboot(self, channel: str = Channel.PRIMARY) -> None:
"""Reboots the autopilot of the UAV."""
success = await self.driver.send_command_long(
self,
MAVCommand.PREFLIGHT_REBOOT_SHUTDOWN,
1, # reboot autopilot
channel=channel,
)
if not success:
raise RuntimeError("Reset command failed")
else:
self._notify_rebooted_by_us()
async def reboot_after_update(self, channel: str = Channel.PRIMARY) -> None:
"""Reboots the autopilot of the UAV and keeps it in the bootloader
until upgraded.
This function should be called after an over-the-air update if the UAV
supports over-the-air updates.
"""
success = await self.driver.send_command_long(
self,
MAVCommand.PREFLIGHT_REBOOT_SHUTDOWN,
3, # reboot autopilot and stay in bootloader until updated
channel=channel,
)
if not success:
raise RuntimeError("Reset and update command failed")
else:
self._notify_rebooted_by_us()
async def reload_show(self) -> None:
"""Asks the UAV to reload the current drone show file."""
# param1 = 0 if we want to reload the show file
success = await self.driver.send_command_long(
self, MAVCommand.USER_1, SkybrushUserCommand.RELOAD_SHOW
)
if not success:
raise RuntimeError("Failed to reload show file")
async def remove_show(self) -> None:
"""Asks the UAV to remove the current drone show file."""
# param1 = 1 if we want to clear the show file
success = await self.driver.send_command_long(
self, MAVCommand.USER_1, SkybrushUserCommand.REMOVE_SHOW
)
if not success:
raise RuntimeError("Failed to remove show file")
async def set_mode(
self, mode: Union[int, str], *, channel: str = Channel.PRIMARY
) -> None:
"""Attempts to set the UAV in the given custom mode."""
if isinstance(mode, str):
try:
mode = int(mode)
except ValueError:
pass
if isinstance(mode, int):
base_mode, submode = MAVModeFlag.CUSTOM_MODE_ENABLED, 0
elif isinstance(mode, str):
try:
base_mode, mode, submode = self._autopilot.get_flight_mode_numbers(mode)
except NotSupportedError:
raise ValueError(
"setting flight modes by name is not supported"
) from None
else:
raise TypeError("flight mode must be numeric or string")
success = await self.driver.send_command_long(
self,
MAVCommand.DO_SET_MODE,
param1=float(base_mode),
param2=float(mode),
param3=float(submode),
channel=channel,
)
if not success:
raise RuntimeError(f"UAV rejected flight mode {mode}")
async def set_servo(
self, servo: int, value: int, *, channel: str = Channel.PRIMARY
) -> None:
"""Asks the UAV to set one of its servo channels to a given value.
Args:
servo: the index of the servo channel (1-based)
value: the PWM value to set on the servo channel, in microseconds
"""
if servo < 1:
raise RuntimeError("Invalid servo channel index")
if value < 0:
raise RuntimeError("Invalid servo channel value")
success = await self.driver.send_command_long(
self,
MAVCommand.DO_SET_SERVO,
param1=float(servo),
param2=float(value),
channel=channel,
)
if not success:
raise RuntimeError(f"UAV rejected setting servo channel {servo} to {value}")
@property
def supports_scheduled_takeoff(self) -> bool:
"""Returns whether the UAV supports scheduled takeoffs."""
return self._autopilot and self._autopilot.supports_scheduled_takeoff
async def set_authorization_scope(self, scope: AuthorizationScope) -> None:
"""Sets or clears whether the UAV has authorization to perform an
automatic takeoff.
"""
await self.set_parameter("SHOW_START_AUTH", authorization_scope_to_int(scope))
async def set_scheduled_takeoff_time(self, seconds: Optional[int]) -> None:
"""Sets the scheduled takeoff time of the UAV to the given timestamp in
seconds. Only integer seconds are supported. Setting the takeoff time
to `None` or a negative number will clear the takeoff time.
"""
# The UAV needs GPS time of week so we convert it first. Note that we
# convert the UNIX timestamp to a datetime first because UNIX timestamps
# do not have leap seconds (every day is 86400 seconds in UNIX time) so
# they are inherently ambiguous
if seconds is None or seconds < 0:
gps_time_of_week = -1
else:
dt = datetime.fromtimestamp(int(seconds), tz=timezone.utc)
_, gps_time_of_week = datetime_to_gps_time_of_week(dt)
await self.set_parameter("SHOW_START_TIME", gps_time_of_week)
async def set_led_color(
self,
color: Optional[Color],
*,
channel: str = Channel.PRIMARY,
duration: float = 5,
) -> None:
"""Sets the color of the drone LED to a specific RGB color."""
if color is not None:
red, green, blue = color_to_rgb8_triplet(color)
duration_msec = max(0, min(int(duration * 1000), 65535))
effect = 1
else:
red, green, blue = 0, 0, 0
duration_msec = 0
effect = 0
message = create_led_control_packet(
[
red,
green,
blue,
duration_msec & 0xFF,
duration_msec >> 8,
effect,
]
)
await self.driver.send_packet(message, self, channel=channel)
async def start_pyro_test(
self, channel: Optional[Union[int, tuple[int, int]]] = None, delay: float = 2
) -> None:
"""Asks the UAV to start testing its pyro channels.
Args:
channel: the channel index to test or the channels to test. ``None``
means to test all channels.
delay: time between tests of consecutive channels, in seconds
"""
if channel is None:
start, end = 0, 256
elif isinstance(channel, int):
start, end = channel, channel + 1
else:
start, end = channel
num_channels = end - start
if num_channels <= 0:
return
success = await self.driver.send_command_long(
self,
MAVCommand.USER_1,
SkybrushUserCommand.TEST_PYRO,
start,
num_channels,
delay,
)
if not success:
raise RuntimeError("Failed to start pyro test")
async def takeoff_to_relative_altitude(
self, altitude: float = 2.5, *, channel: str = Channel.PRIMARY
) -> None:
"""Instructs the UAV to take off to a relative altitude above its
current position.
Parameters:
altitude: the relative altitude above the current position of the
UAV where we should take off to
channel: the channel to use to send the command
Raises:
RuntimeError: if the command cannot be sent to the UAV or if it does
not acknowledge the takeoff command
"""
# Okay, so the NAV_TAKEOFF command sucks big time. ArduCopter interprets
# the last argument as relative altitude above the ground, which totally
# makes sense. PX4 interprets it as _absolute_ AMSL instead, and treats
# NaN as "just pick a sensible takeoff altitude". Furthermore, ArduCopter
# ignores the latitude and longitude while PX4 insists them to be NaN
# (otherwise they are treated as coordinates to take off to). This stems
# from the fact that MAV reference frames are not supported in PX4:
#
# https://github.com/PX4/PX4-Autopilot/issues/10246
#
# The lowest common denominator is to send NaN as the latitude and
# longitude, and make the takeoff altitude dependent on whether the
# autopilot supports the local reference frame. However, the ArduCopter
# SITL simulator blows up when we do so -- so for ArduCopter, we send
# zeros instead.
if not self._autopilot.supports_local_frame:
try:
# We assume that we are at zero meters AHL
altitude = self.convert_ahl_to_amsl(altitude, current_ahl=0)
except RuntimeError:
# No position yet, just send NaN and hope for the best
altitude = nan
# set takeoff coordinate. PX4 needs NaN / NaN, ArduPilot needs 0 / 0
lat, lon = nan, nan
if isinstance(self._autopilot, ArduPilot):
lat, lon = 0, 0
if not await self.driver.send_command_long(
self,
MAVCommand.NAV_TAKEOFF,
param4=nan, # yaw should stay the same
param5=lat, # latitude
param6=lon, # longitude
param7=altitude, # takeoff altitude
channel=channel,
):
raise RuntimeError("Failed to send takeoff command")
@asynccontextmanager
async def temporarily_request_messages(
self, messages: dict[int, float]
) -> AsyncIterator[None]:
"""Temporarily requests the UAV to send a given set of messages while
the execution is in the context, resetting the messages upon exiting
the context. Resetting is done at a best-effort basis; failures will be
ignored.
Parameters:
messages: a dict mapping MAVLink message IDs to their corresponding
stream rates in Hz
"""
successful = []
try:
for message, rate in messages.items():
success = await self.driver.send_command_long(
self,
MAVCommand.SET_MESSAGE_INTERVAL,
message,
1000000 / rate, # one per second
)
if success:
successful.append(message)
else:
raise RuntimeError(
f"UAV rejected message stream rate of {rate} Hz for "
f"message {message}"
)
yield
finally:
failed = []
for message in successful:
try:
await self.driver.send_command_long(
self,
MAVCommand.SET_MESSAGE_INTERVAL,
message,
0, # off
)
except Exception:
failed.append(message)
for message in failed:
self.driver.log.warning(
f"Failed to reset data stream rate(s) for message(s) {message}",
extra={"id": log_id_for_uav(self)},
)
async def trigger_camera_shutter(self) -> None:
"""Asks the UAV to trigger the camera shutter (if it has a camera)."""
success = await self.driver.send_command_long(
self,
MAVCommand.DO_DIGICAM_CONTROL,
param5=1,
)
if not success:
raise RuntimeError("Failed to trigger camera shutter")
async def upload_show(self, show) -> None:
coordinate_system = get_coordinate_system_from_show_specification(show)
if coordinate_system.type != "nwu":
raise RuntimeError("Only NWU coordinate systems are supported")
altitude_reference = get_altitude_reference_from_show_specification(show)
light_program = get_light_program_from_show_specification(show)
trajectory = get_trajectory_from_show_specification(show)
geofence = get_geofence_configuration_from_show_specification(show)
pyro_program = None
rth_plan = None
yaw_setpoints = None
pro_keys = set(show.keys()).intersection(["pyro", "rthPlan", "yawControl"])
if pro_keys:
try:
api = self.driver.app.import_api("show_pro")
if not api.loaded:
raise RuntimeError(
f"Show pro extension is not loaded, neglecting {'and'.join(pro_keys)} from the show"
)
except RuntimeError as ex:
self.driver.log.warning(str(ex))
else:
pyro_program = api.encode_pyro(show)
rth_plan = api.encode_rth_plan(show)
yaw_setpoints = api.encode_yaw(show)
async with SkybrushBinaryShowFile.create_in_memory() as show_file:
await show_file.add_trajectory(trajectory)
await show_file.add_encoded_light_program(light_program)
if pyro_program:
await show_file.add_encoded_event_list(pyro_program)
if rth_plan:
await show_file.add_encoded_rth_plan(rth_plan)
if yaw_setpoints:
await show_file.add_encoded_yaw_setpoints(yaw_setpoints)
await show_file.finalize()
data = show_file.get_contents()
# Upload show file
async with aclosing(MAVFTP.for_uav(self)) as ftp:
await ftp.put(data, "/collmot/show.skyb")
# We give some time for the filesystem to flush caches etc before
# asking the drone to reload the show file. There were some reports
# that sometimes the show file was read only partially, and I suspect
# this could have been because the filesystem was not flushed fully
# to the SD card before we tried to reload the show. We could not debug
# it properly as it happened very rarely.
# Encode latitude and longitude of show origin
# TODO(ntamas): this is not entirely accurate due to the back-and-forth
# conversion happening between floats and ints; sometimes the 7th
# decimal digit is off by one.
encoded_lat = int(coordinate_system.origin.lat * 1e7)
encoded_lon = int(coordinate_system.origin.lon * 1e7)
encoded_amsl = (
int(altitude_reference * 1e3)
if altitude_reference is not None
else -32768000
)
# Try configuring with a single USER_2 command first, falling back to
# the old parameter-based configuration if USER_2 is not supported.
try:
success = await self.driver.send_command_int(
self,
MAVCommand.USER_2,
0, # command code
0, # unused
0, # unused
coordinate_system.orientation,
encoded_lat,
encoded_lon,
encoded_amsl,
)
except NotSupportedError:
success = False
if not success:
# Configure show origin, orientation and altitude reference using
# the old method. There is no version of the firmware that would
# support SHOW_ORIGIN_AMSL but does not support the new-style
# configuration method so we raise an error if the user tries to
# set an AMSL value
if altitude_reference is not None:
raise NotSupportedError(
"AMSL-based control is not supported in this firmware"
)
orientation = coordinate_system.orientation % 360
await self.set_parameter("SHOW_ORIGIN_LAT", encoded_lat)
await self.set_parameter("SHOW_ORIGIN_LNG", encoded_lon)
await self.set_parameter("SHOW_ORIENTATION", orientation)
# Configure and enable geofence
await self.configure_geofence(geofence)
# Ask drone to reload show file now that we are done with everything
# else
await self.reload_show()
async def wait_until_connected(self) -> None:
"""Waits until the UAV becomes connected (i.e. when we see the next
heartbeat message from the drone).
Returns immediately if the drone is currently considered connected.
"""
if self._connection_state is not ConnectionState.CONNECTED:
await self._connected_event.wait()
def _configure_data_streams_soon(self, force: bool = False) -> None:
"""Schedules a call to configure the data streams that we want to receive
from the UAV, as soon as possible.
Parameters:
force: when `False` and a configuration request has been scheduled
recently, the call will be ignored. When `True`, repeated attempts
to configure the data streams will all be processed.
"""
if not force and self._last_data_stream_configuration_attempted_at:
now = monotonic()
if now - self._last_data_stream_configuration_attempted_at < 5:
return
self.driver.run_in_background(self._configure_data_streams)
self._last_data_stream_configuration_attempted_at = monotonic()
async def _configure_data_streams(self) -> None:
"""Configures the data streams that we want to receive from the UAV."""
success = False
# We give ourselves 60 seconds to configure everything. Most of the
# internal functions time out on their own anyway
with move_on_after(60):
self._configuring_data_streams = True
try:
await self._configure_data_streams_with_fine_grained_commands()
success = True
except NotSupportedError:
await self._configure_data_streams_with_legacy_commands()
success = True
except TooSlowError:
# attempt timed out, even after retries, so we just give up
pass
except FutureCancelled:
# This is okay, server is shutting down
return
finally:
self._configuring_data_streams = False
# In case of a failure, we only print a warning here. Soon the GCS will
# realize again that it is not receiving status updates from the drone
# and will attempt to configure again.
if not success:
self.driver.log.warning(
"Failed to configure data stream rates, trying again later",
extra={"id": log_id_for_uav(self)},
)
async def _configure_data_streams_with_fine_grained_commands(self) -> None:
"""Configures the intervals of the messages that we want to receive from
the UAV using the newer `SET_MESSAGE_INTERVAL` MAVLink command.
"""
stream_rates = [
(MAVMessageType.SYS_STATUS, 1),
(MAVMessageType.GPS_RAW_INT, 1),
(MAVMessageType.GLOBAL_POSITION_INT, 2),
]
for message_id, interval_hz in stream_rates:
success = await self.driver.send_command_long(
self,
MAVCommand.SET_MESSAGE_INTERVAL,
param1=message_id,
param2=1000000 / interval_hz,
)
if not success:
self.driver.log.warning(
f"Failed to configure data stream rate for message {message_id}",
extra={"id": log_id_for_uav(self)},
)
async def _configure_data_streams_with_legacy_commands(self) -> None:
"""Configures the data streams that we want to receive from the UAV
using the deprecated `REQUEST_DATA_STREAM` MAVLink command.
"""
# TODO(ntamas): this is unsafe; there are no confirmations for
# REQUEST_DATA_STREAM commands so we never know if we succeeded or
# not
await self.driver.send_packet(
spec.request_data_stream(req_stream_id=0, req_message_rate=0, start_stop=0),
target=self,
)
# EXTENDED_STATUS: we need SYS_STATUS from it for the general status
# flags and GPS_RAW_INT for the GPS fix info.
await self.driver.send_packet(
spec.request_data_stream(
req_stream_id=MAVDataStream.EXTENDED_STATUS,
req_message_rate=1,
start_stop=1,
),
target=self,
)
# POSITION: we need GLOBAL_POSITION_INT for position and velocity
await self.driver.send_packet(
spec.request_data_stream(
req_stream_id=MAVDataStream.POSITION,
req_message_rate=2,
start_stop=1,
),
target=self,
)
async def _configure_mandatory_custom_mode(self) -> None:
"""Sets the drone to its mandatory custom mode after connection; used
only for local experimentation with the SITL simulator where it is
convenient to set up a custom mode in advance without an RC.
"""
await sleep(2)
if self.driver.mandatory_custom_mode is None:
return
try:
await self.set_mode(self.driver.mandatory_custom_mode)
except TooSlowError:
self.driver.log.warning(
"Failed to configure custom mode; no response in time",
extra={"id": log_id_for_uav(self)},
)
except Exception:
self.driver.log.exception(
"Failed to configure custom mode", extra={"id": log_id_for_uav(self)}
)
def _handle_reboot(self) -> None:
"""Handles a reboot event on the autopilot and attempts to re-initialize
the data streams.
"""
if not self.driver.assume_data_streams_configured:
self._configure_data_streams_soon(force=True)
# No need to request the autopilot capabilities here; we do it after
# every heartbeat if we don't have them yet. See the comment in
# `self._request_autopilot_capabilities()` for an explanation.
if self.driver.mandatory_custom_mode is not None:
# Don't set the mode immediately because the drone might not
# respond right after bootup
self.driver.run_in_background(self._configure_mandatory_custom_mode)
# Reset our internal state object of the compass calibration procedure
self.compass_calibration.reset()
def _clear_autopilot_capabilities(self) -> None:
"""Clears the cached autopilot capabilities and firmware version number
of the UAV.
This function should be called after a firmware update to ensure that
we query the new firmware version after the update.
"""
self._last_messages.pop(MAVMessageType.AUTOPILOT_VERSION, None)
async def _request_autopilot_capabilities(self) -> None:
"""Sends a request to the autopilot to send its capabilities via MAVLink
in a separate packet.
"""
try:
success = await self.driver.send_command_long(
self, MAVCommand.REQUEST_AUTOPILOT_CAPABILITIES, param1=1
)
except FutureCancelled:
# This is okay, server is shutting down
return
except TooSlowError:
self.driver.log.warning(
"Failed to request autopilot capabilities; no confirmation "
"received in time",
extra={"id": log_id_for_uav(self)},
)
return
if not success:
self.driver.log.warning(
"UAV rejected to send autopilot capabilities",
extra={"id": log_id_for_uav(self)},
)
# At this point, we only received an acknowledgment from the drone that
# it _will_ send the AUTOPILOT_VERSION packet -- we don't know whether
# it really will and even if it does, it might get lost in transit.
# Therefore, we check whether we already have an AUTOPILOT_VERSION
# packet in our stash after receiving a heartbeat, and if we don't, we
# ask the drone to send one by calling this function.
def _get_previous_copy_of_message(
self, message: MAVLinkMessage
) -> Optional[MAVLinkMessage]:
"""Returns the previous copy of this MAVLink message, or `None` if we
have not observed such a message yet.
"""
record = self._get_previous_record_of_message(message)
return record.message if record else None
def _get_previous_record_of_message(
self, message: MAVLinkMessage
) -> Optional[MAVLinkMessageRecord]:
"""Returns the previous copy of this MAVLink message and its timestamp,
or `None` if we have not observed such a message yet.
"""
return self._last_messages.get(message.get_msgId())
def _set_connection_state(
self, value: ConnectionState, heartbeat: Optional[MAVLinkMessage]
) -> None:
if self._connection_state is value:
return
if value is ConnectionState.CONNECTED:
if self._connection_state is ConnectionState.SLEEPING:
# Clear any previously stored GPS coordinate, velocity,
# attitude and heading info
self.update_status(
position=None,
position_xyz=None,
velocity=None,
velocity_xyz=None,
attitude=None,
heading=None,
)
self._connection_state = value
if value is ConnectionState.DISCONNECTED:
# Revert to the lowest MAVLink version that we support in case the UAV
# was somehow reset and it does not "understand" MAVLink v2 in its new
# configuration
self._reset_mavlink_version()
elif value is ConnectionState.CONNECTED:
# We assume that the autopilot type stays the same even if we lost
# contact with the drone or if it was rebooted. However, if we do not
# know the autopilot type yet, we create an instance based on the
# heartbeat
if isinstance(self._autopilot, UnknownAutopilot) and heartbeat is not None:
autopilot_cls = Autopilot.from_heartbeat(heartbeat)
self._autopilot = autopilot_cls()
if self._was_probably_rebooted_after_reconnection():
if not self._first_connection:
self.driver.log.warning(
f"UAV {self.id} might have been rebooted; reconfiguring"
)
self._first_connection = False
self._handle_reboot()
# Send "connected" event to listeners
event = self._connected_event
self._connected_event = Event()
event.set()
def _store_message(self, message: MAVLinkMessage) -> None:
"""Stores the given MAVLink message in the dictionary that maps
MAVLink message types to their most recent versions that were seen
for this UAV.
"""
self._last_messages[message.get_msgId()].update(message)
def _get_heartbeat_sys_status_geofence_error_and_motor_state(
self,
) -> tuple[Any, Any, bool, bool]:
"""Returns a tuple indicating whether there is a geofence error and
whether the motors are currently running, based on the most recent
HEARTBEAT and SYS_STATUS messages.
If either message is not available, returns `(False, False)`.
This function encapsulates some logic that is shared between
`_update_errors_from_drone_show_status_packet()` and
`_update_errors_from_sys_status_and_heartbeat()` and it is required
to update the FlockwaveErrorCode.GEOFENCE_VIOLATION_WARNING flag
correctly from both places.
Returns:
the last heartbeat message, the last SYS_STATUS message, whether
the SYS_STATUS packet indicated a geofence error, and whether the
motors are running according to the heartbeat.
"""
heartbeat = self.get_last_message(MAVMessageType.HEARTBEAT)
sys_status = self.get_last_message(MAVMessageType.SYS_STATUS)
if not heartbeat or not sys_status:
return None, None, False, False
sensor_mask: int = (
sys_status.onboard_control_sensors_enabled
& sys_status.onboard_control_sensors_present
)
not_healthy_sensors: int = sensor_mask & (
# Python has no proper bitwise negation on unsigned integers
# so we use XOR instead
sys_status.onboard_control_sensors_health ^ 0xFFFFFFFF
)
has_geofence_error = not_healthy_sensors & MAVSysStatusSensor.GEOFENCE.value
are_motors_running = heartbeat.base_mode & MAVModeFlag.SAFETY_ARMED.value
return heartbeat, sys_status, bool(has_geofence_error), bool(are_motors_running)
def _update_errors_from_drone_show_status_packet(self, status: DroneShowStatus):
"""Updates the error codes based on the most recent drone show status
message.
The error codes managed by this function are disjoint from the error
codes managed by `_update_errors_from_sys_status_and_heartbeat()`,
except for FlockwaveErrorCode.GEOFENCE_VIOLATION_WARNING, which is
handled in both places because its presence depends on both SYS_STATUS
and the drone show status packets.
"""
_, _, has_geofence_error, are_motors_running = (
self._get_heartbeat_sys_status_geofence_error_and_motor_state()
)
errors: dict[int, bool] = {
FlockwaveErrorCode.TIMESYNC_ERROR.value: status.has_timesync_error,
FlockwaveErrorCode.FAR_FROM_TAKEOFF_POSITION.value: status.is_misplaced_before_takeoff,
FlockwaveErrorCode.GEOFENCE_VIOLATION_WARNING.value: (
# Geofence error reported from SYS_STATUS...
(has_geofence_error and not are_motors_running)
# ...or no error reported from SYS_STATUS, but a geofence breach
# was reported in the Skybrush-specific status packet
or (not has_geofence_error and status.is_geofence_breached)
),
}
self.ensure_errors(errors)
def _update_errors_from_inactive_state(self) -> None:
"""Updates the error codes in the case when we received a heartbeat
message that seems to indicate that the drone cannot communicate now.
Either this function or `_update_errors_from_sys_status_and_heartbeat()`
will be called for a heartbeat packet but not both. The policy is that
we clear error codes with a severity less than a "real" error and then
mark the drone as "sleeping". This way we won't lose genuine error
codes that happened while the drone was still awake.
"""
self.clear_errors_up_to_and_including(FlockwaveErrorCode.UNSPECIFIED_WARNING)
errors = {
FlockwaveErrorCode.AUTOPILOT_INIT_FAILED.value: False,
FlockwaveErrorCode.SLEEPING.value: True,
}
self.ensure_errors(errors)
def _update_errors_from_sys_status_and_heartbeat(self) -> None:
"""Updates the error codes based on the most recent HEARTBEAT and
SYS_STATUS messages. We need both to have an accurate picture of what is
going on, hence a separate function that is called from both message
handlers.
The error codes managed by this function are disjoint from the error
codes managed by `_update_errors_from_drone_show_status_packet()`,
except for FlockwaveErrorCode.GEOFENCE_VIOLATION_WARNING, which is
handled in both places because its presence depends on both SYS_STATUS
and the drone show status packets. Try to keep things this way as it
simplifies things.
This function does _not_ update the timestamp of the status information;
you need to do it on your own by calling `update_status()` after calling
this function.
"""
# This function is called frequently, and Python enums are a bit slow
# so we optimize enum access by using the 'value' property on them
heartbeat, sys_status, has_geofence_error, are_motors_running = (
self._get_heartbeat_sys_status_geofence_error_and_motor_state()
)
if not heartbeat or not sys_status:
return
# Check error conditions from SYS_STATUS
sensor_mask: int = (
sys_status.onboard_control_sensors_enabled
& sys_status.onboard_control_sensors_present
)
not_healthy_sensors: int = sensor_mask & (
# Python has no proper bitwise negation on unsigned integers
# so we use XOR instead
sys_status.onboard_control_sensors_health ^ 0xFFFFFFFF
)
has_gyro_error = not_healthy_sensors & (
MAVSysStatusSensor.GYRO_3D.value | MAVSysStatusSensor.GYRO2_3D.value
)
has_mag_error = not_healthy_sensors & (
MAVSysStatusSensor.MAG_3D.value | MAVSysStatusSensor.MAG2_3D.value
)
has_accel_error = not_healthy_sensors & (
MAVSysStatusSensor.ACCEL_3D.value | MAVSysStatusSensor.ACCEL2_3D.value
)
has_baro_error = not_healthy_sensors & (
MAVSysStatusSensor.ABSOLUTE_PRESSURE.value
| MAVSysStatusSensor.DIFFERENTIAL_PRESSURE.value
)
has_gps_error = not_healthy_sensors & MAVSysStatusSensor.GPS.value
has_proximity_error = not_healthy_sensors & MAVSysStatusSensor.PROXIMITY.value
has_motor_error = not_healthy_sensors & (
MAVSysStatusSensor.MOTOR_OUTPUTS.value
| MAVSysStatusSensor.REVERSE_MOTOR.value
)
# has_geofence_error already parsed above
has_rc_error = not_healthy_sensors & MAVSysStatusSensor.RC_RECEIVER.value
has_battery_error = not_healthy_sensors & MAVSysStatusSensor.BATTERY.value
has_logging_error = not_healthy_sensors & MAVSysStatusSensor.LOGGING.value
are_motor_outputs_disabled = self._autopilot.are_motor_outputs_disabled(
heartbeat, sys_status
)
# are_motors_running already parsed above
is_prearm_check_in_progress = self._autopilot.is_prearm_check_in_progress(
heartbeat, sys_status
)
is_returning_home = self._autopilot.is_rth_flight_mode(
heartbeat.base_mode, heartbeat.custom_mode
)
is_in_standby = heartbeat.system_status == MAVState.STANDBY.value
# The geofence status is a bit of a mess. ArduCopter and PX4 both report
# geofence violations by marking the geofence sensor as "present,
# enabled but not healthy". However, if the geofence action is set to
# "report only" in ArduCopter, it does _not_ consider the geofence
# sensor as enabled and we never get notifications about geofence
# breaches (apart from the "Fence breached" STATUSTEXT message, which
# we ignore because there is no corresponding "Breach resolved" message
# so we don't know for how long we should mark the geofence breached).
# That's why our Skybrush-specific status packet contains a bit to
# indicate geofence breaches even if the geofence mode is set to
# "report only" -- but it means that we have _two_ sources to check to
# determine the error code to use for the geofence.
show_stage = (
self._last_skybrush_status_info.stage
if self._last_skybrush_status_info
else DroneShowExecutionStage.UNKNOWN
)
# We do not use the LANDED error code yet because the current versions
# of the Skybrush firmware report "LANDED" for a long time after landing,
# which means that we would get an all-blue display in Live after a
# successful show.
errors = {
FlockwaveErrorCode.SLEEPING.value: False,
FlockwaveErrorCode.LANDING.value: show_stage
is DroneShowExecutionStage.LANDING,
FlockwaveErrorCode.TAKEOFF.value: show_stage
is DroneShowExecutionStage.TAKEOFF,
FlockwaveErrorCode.AUTOPILOT_INIT_FAILED.value: (
heartbeat.system_status == MAVState.UNINIT.value
),
FlockwaveErrorCode.AUTOPILOT_INITIALIZING.value: (
heartbeat.system_status == MAVState.BOOT.value
),
FlockwaveErrorCode.UNSPECIFIED_ERROR.value: (
# RC errors apparently trigger this error condition with
# ArduCopter if we don't exclude it explicitly
heartbeat.system_status == MAVState.CRITICAL.value
and not not_healthy_sensors
and not has_rc_error
),
FlockwaveErrorCode.UNSPECIFIED_CRITICAL_ERROR.value: (
heartbeat.system_status == MAVState.EMERGENCY.value
and not not_healthy_sensors
),
FlockwaveErrorCode.MAGNETIC_ERROR.value: has_mag_error,
FlockwaveErrorCode.GYROSCOPE_ERROR.value: has_gyro_error,
FlockwaveErrorCode.ACCELEROMETER_ERROR.value: has_accel_error,
FlockwaveErrorCode.PRESSURE_SENSOR_ERROR.value: has_baro_error,
FlockwaveErrorCode.GPS_SIGNAL_LOST.value: has_gps_error,
FlockwaveErrorCode.PROXIMITY_ERROR.value: has_proximity_error,
FlockwaveErrorCode.MOTOR_MALFUNCTION.value: has_motor_error
or (
self._last_skybrush_status_info
and self._last_skybrush_status_info.has_high_esc_error_rate
),
FlockwaveErrorCode.GEOFENCE_VIOLATION.value: (
has_geofence_error and are_motors_running
),
FlockwaveErrorCode.GEOFENCE_VIOLATION_WARNING.value: (
# Geofence error reported from SYS_STATUS...
(has_geofence_error and not are_motors_running)
# ...or no error reported from SYS_STATUS, but a geofence breach
# was reported in the Skybrush-specific status packet
or (
not has_geofence_error
and self._last_skybrush_status_info
and self._last_skybrush_status_info.is_geofence_breached
)
),
FlockwaveErrorCode.DRIFT_FROM_DESIRED_POSITION.value: (
self._last_skybrush_status_info
and self._last_skybrush_status_info.is_far_from_expected_position
),
FlockwaveErrorCode.RC_SIGNAL_LOST_WARNING.value: has_rc_error,
FlockwaveErrorCode.BATTERY_CRITICAL.value: has_battery_error,
FlockwaveErrorCode.LOGGING_DEACTIVATED.value: has_logging_error,
FlockwaveErrorCode.DISARMED.value: are_motor_outputs_disabled,
FlockwaveErrorCode.PREARM_CHECK_IN_PROGRESS.value: is_prearm_check_in_progress,
# If the motors are not running yet but we are on the ground, ready
# to fly, we use an informational flag to let the user know
FlockwaveErrorCode.ON_GROUND.value: not are_motors_running
and is_in_standby,
# If the motors are running but we are not in the air yet; we use an
# informational flag to let the user know
FlockwaveErrorCode.MOTORS_RUNNING_WHILE_ON_GROUND.value: are_motors_running
and is_in_standby,
# Use the special RTH error code if the drone is in RTH or smart RTH mode
# and its mode index is larger than the standby mode (typically:
# active, critical, emergency, poweroff, termination)
FlockwaveErrorCode.RETURN_TO_HOME.value: is_returning_home
and heartbeat.system_status > MAVState.STANDBY.value,
}
# Clear the collected prearm failure messages if the heartbeat and/or
# the system status shows that we are not in the prearm check phase any
# more
if not is_prearm_check_in_progress:
self._preflight_status.message = "Passed"
self._preflight_status.result = PreflightCheckResult.PASS
# Update the error flags as needed
self.ensure_errors(errors)
def _update_gps_fix_type_and_satellite_count(
self, type: OurGPSFixType, num_satellites: Optional[int]
) -> None:
"""Updates the GPS fix and the number of satellites in the internal
GPSFix_ object that is then later used to update the status object of
the UAV.
"""
self._gps_fix.type = type
self._gps_fix.num_satellites = num_satellites
def _was_probably_rebooted_after_reconnection(self) -> bool:
"""Returns whether the UAV was probably rebooted recently, _assuming_
that a reconnection event happened.
This function _must_ be called only after a reconnection event. Right
now we always return `False`, but we could implement a more sophisticated
check in the future based on the `SYSTEM_TIME` messages and whether the
`time_boot_ms` timestamp has decreased.
"""
return False