Repository URL to install this package:
|
Version:
2.35.1 ▾
|
"""Classes and functions related to MAVLink networks, i.e. a set of MAVLink
connections over which the system IDs of the devices share the same namespace.
For instance, a Skybrush server may participate in a MAVLink network with two
connections: a wifi connection and a fallback radio connection. The system ID
of a MAVLink message received on either of these two connections refers to the
same device. However, the same system ID in a different MAVLink network may
refer to a completely different device. The introduction of the concept of
MAVLink networks in the Skybrush server allows us to manage multiple independent
MAVLink-based drone swarms.
"""
from __future__ import annotations
from collections import defaultdict
from contextlib import contextmanager, ExitStack
from logging import Logger
from time import time_ns
from trio import move_on_after, open_nursery, to_thread
from trio.abc import ReceiveChannel
from trio_util import periodic
from typing import (
Any,
Callable,
Iterable,
Iterator,
Optional,
Sequence,
Union,
TYPE_CHECKING,
)
from flockwave.connections import (
Connection,
create_connection,
ListenerConnection,
UDPListenerConnection,
)
from flockwave.concurrency import Future, race
from flockwave.networking import find_interfaces_with_address
from flockwave.server.comm import CommunicationManager
from flockwave.server.model import ConnectionPurpose
from flockwave.server.utils import nop, overridden
from .comm import (
create_communication_manager,
Channel,
MAVLinkMessage,
)
from .driver import MAVLinkDriver, MAVLinkUAV
from .enums import MAVAutopilot, MAVComponent, MAVMessageType, MAVState, MAVType
from .led_lights import MAVLinkLEDLightConfigurationManager
from .packets import DroneShowStatus
from .rssi import RSSIMode
from .rtk import RTKCorrectionPacketEncoder
from .signing import MAVLinkSigningConfiguration
from .takeoff import MAVLinkScheduledTakeoffManager
from .types import (
MAVLinkMessageMatcher,
MAVLinkMessageSpecification,
MAVLinkNetworkSpecification,
MAVLinkStatusTextTargetSpecification,
spec,
)
from .utils import (
flockwave_severity_from_mavlink_severity,
log_id_for_uav,
python_log_level_from_mavlink_severity,
log_id_from_message,
)
if TYPE_CHECKING:
from flockwave.server.ext.show.config import DroneShowConfiguration
from flockwave.server.tasks.led_lights import LightConfiguration
__all__ = ("MAVLinkNetwork",)
#: MAVLink message specification for heartbeat messages that we are sending
#: to connected UAVs to keep them sending telemetry data
HEARTBEAT_SPEC = (
"HEARTBEAT",
{
"type": MAVType.GCS,
"autopilot": MAVAutopilot.INVALID,
"base_mode": 0,
"custom_mode": 0,
"system_status": MAVState.STANDBY,
},
)
Matchers = dict[str, list[tuple[Optional[int], MAVLinkMessageMatcher, Future]]]
class MAVLinkNetwork:
"""Representation of a MAVLink network."""
driver: MAVLinkDriver
log: Logger
manager: CommunicationManager[MAVLinkMessageSpecification, Any]
_connections: list[Connection]
_uav_addresses: dict[MAVLinkUAV, Any]
_id: str
"""ID of the network."""
_id_formatter: Callable[[int, str], str]
"""Formatter function that is called with the system ID of a UAV and the
network ID and returns the final ID that should be assigned to the UAV in
Skybrush.
"""
_matchers: Matchers
"""Dictionary mapping MAVLink message types to lists of tuples consisting
of an optional MAVLink system ID, a MAVLink message matching criterion and a
future that will be resolved when a MAVLink message matching the criterion
is received from the given MAVLink system ID (or any system ID if no
system ID was specified).
"""
_routing: dict[str, list[int]]
_rssi_mode: RSSIMode
"""Specifies how this network derives RSSI values for the drones in the
network.
"""
_signing: MAVLinkSigningConfiguration
"""Object that stores how the MAVLink connections should handle signed
messages (in both the inbound and the outbound directions).
"""
_statustext_targets: MAVLinkStatusTextTargetSpecification
"""Object specifying how MAVLink STATUSTEXT messages should be handled."""
_uav_system_id_offset: int = 0
"""Offset to add to the system ID of each UAV in the network before it is
sent to the ID formatter that derives the final ID in Skybrush.
"""
_uavs: dict[int, MAVLinkUAV]
"""Dictionary mapping MAVLink system IDs in the network to the corresponding
UAVs.
"""
_use_broadcast_rate_limiting: bool = False
"""Whether to use artificial rate limiting for broadcast packets to work
around flow control problems in certain types of links.
Typically you can leave this at ``False`` unless you have packet loss
problems and you suspect that they are due to buffer overflows in the
links that the network uses.
"""
@classmethod
def from_specification(cls, spec: MAVLinkNetworkSpecification):
"""Creates a MAVLink network from its specification, typically found in
a configuration file.
"""
result = cls(
spec.id,
system_id=spec.system_id,
id_formatter=spec.id_format.format,
packet_loss=spec.packet_loss,
statustext_targets=spec.statustext_targets,
routing=spec.routing,
rssi_mode=spec.rssi_mode,
signing=spec.signing,
uav_system_id_offset=spec.id_offset,
use_broadcast_rate_limiting=spec.use_broadcast_rate_limiting,
)
for connection_spec in spec.connections:
connection = create_connection(connection_spec)
result.add_connection(connection)
return result
def __init__(
self,
id: str,
*,
system_id: int = 254,
id_formatter: Callable[[int, str], str] = "{0}".format,
packet_loss: float = 0,
statustext_targets: MAVLinkStatusTextTargetSpecification = MAVLinkStatusTextTargetSpecification.DEFAULT,
routing: Optional[dict[str, list[int]]] = None,
rssi_mode: RSSIMode = RSSIMode.RADIO_STATUS,
signing: MAVLinkSigningConfiguration = MAVLinkSigningConfiguration.DISABLED,
uav_system_id_offset: int = 0,
use_broadcast_rate_limiting: bool = False,
):
"""Constructor.
Creates a new MAVLink network with the given network ID. Network
identifiers must be unique in the Skybrush server.
Parameters:
id: the network ID
system_id: the MAVLink system ID of the Skybrush server within the
network
id_formatter: function that can be called with a MAVLink system ID
and the network ID, and that must return a string that will be
used for the drone with the given system ID on the network
packet_loss: when larger than zero, simulates packet loss on the
network by randomly dropping received and sent MAVLink messages
statustext_targets: specifies where to forward MAVLink status text
messages. When the set contains the string `"server"`, the
status messages will be sent to the server log. When the
set contains the string `"client"`, they will be sent to the
connected clients in SYS-MSG messages. The two options are not
mutually exclusive; both can be configured at the same time.
routing: dictionary that specifies which link should be used for
certain types of packets. The keys of the dictionary are the
packet types; the values are the indices of the connections to
use for sending that particular packet type. Not including a
particular packet type in the dictionary will let the system
choose the link on its own.
rssi_mode: specifies how this network will derive RSSI values for
the drones in the network
signing: object that specifies whether outgoing messages should be
signed and whether incoming unsigned messages are accepted.
uav_system_id_offset: offset to add to the system ID of each UAV
before it is sent to the formatter function
"""
self.log = None # type: ignore
self._matchers = None # type: ignore
self._id = id
self._id_formatter = id_formatter
self._led_light_configuration_manager = MAVLinkLEDLightConfigurationManager(
self
)
self._packet_loss = max(float(packet_loss), 0.0)
self._routing = routing or {}
self._rssi_mode = rssi_mode
self._scheduled_takeoff_manager = MAVLinkScheduledTakeoffManager(self)
self._signing = signing
self._statustext_targets = statustext_targets
self._system_id = max(min(int(system_id), 255), 1)
self._uav_system_id_offset = int(uav_system_id_offset)
self._use_broadcast_rate_limiting = bool(use_broadcast_rate_limiting)
self._connections = []
self._uavs = {}
self._uav_addresses = {}
self._rtk_correction_packet_encoder = RTKCorrectionPacketEncoder()
def add_connection(self, connection: Connection):
"""Adds the given connection object to this network.
Parameters:
connection: the connection to add
"""
self._connections.append(connection)
@contextmanager
def expect_packet(
self,
type: Union[int, str, MAVMessageType],
params: MAVLinkMessageMatcher = None,
system_id: Optional[int] = None,
) -> Iterator[Future[MAVLinkMessage]]:
"""Sets up a handler that waits for a MAVLink packet of a given type,
optionally matching its content with the given parameter values based
on strict equality.
Parameters:
type: the type of the MAVLink message to wait for
params: dictionary mapping parameter names to the values that we
expect to see in the matched packet, or a callable that
receives a MAVLinkMessage and returns `True` if it matches the
packet we are looking for
system_id: the system ID of the sender of the message; `None` means
any system ID
Returns:
a Future that resolves to the next MAVLink message that matches the
given type and parameter values.
"""
type_str = type if isinstance(type, str) else MAVMessageType(type).name
# Map values of type 'bytes' to 'str' in the params dict because
# pymavlink never returns 'bytes'
if params is not None and not callable(params):
for name, value in params.items():
if isinstance(value, bytes):
try:
params[name] = value.decode("utf-8")
except ValueError:
pass
future = Future()
item = (system_id, params, future)
matchers = self._matchers[type_str]
matchers.append(item)
try:
yield future
finally:
matchers.pop(matchers.index(item))
@property
def id(self) -> str:
"""The unique identifier of this MAVLink network."""
return self._id
async def run(
self,
*,
driver,
log,
register_uav,
supervisor,
use_connection,
):
"""Starts the network manager.
Parameters:
driver: the driver object for MAVLink-based drones
log: a logging object where the network manager can log messages
register_uav: a callable that can be called with a single UAV_
object as an argument to get it registered in the application
supervisor: the application supervisor that can be used to re-open
connections if they get closed
use_connection: context manager that must be entered when the
network manager wishes to register a connection in the
application
"""
if len(self._connections) > 1:
if self.id:
id_format = "{0}/{1}"
else:
id_format = "{1}"
else:
id_format = "{0}"
# Create names for the communication links
connection_names = [
id_format.format(self.id, index)
for index, connection in enumerate(self._connections)
]
# Register the communication links
with ExitStack() as stack:
for connection, name in zip(self._connections, connection_names):
description = (
"MAVLink listener"
if isinstance(connection, ListenerConnection)
else "MAVLink connection"
)
if name:
description += f" ({name})"
stack.enter_context(
use_connection(
connection,
f"MAVLink: {name}" if name else "MAVLink",
description=description,
purpose=ConnectionPurpose.uavRadioLink,
)
)
# Create the communication manager
manager = create_communication_manager(
packet_loss=self._packet_loss,
network_id=self.id,
system_id=self._system_id,
signing=self._signing,
use_broadcast_rate_limiting=self._use_broadcast_rate_limiting,
)
# Warn the user about the simulated packet loss setting
if self._packet_loss > 0:
percentage = round(min(1, self._packet_loss) * 100)
log.warning(
f"Simulating {percentage}% packet loss",
extra={"id": self._id},
)
# Warn the user when the rate limiting is enabled for broadcasts
if self._use_broadcast_rate_limiting:
log.info(
"Rate limiting enabled for broadcast packets",
extra={"id": self._id},
)
# Register the links with the communication manager. The order is
# important here; the ones coming first will primarily be used for
# sending, falling back to later ones if sending on the first one
# fails
for connection, name in zip(self._connections, connection_names):
manager.add(connection, name=name)
# Register the connection aliases
self._register_connection_aliases(manager, connection_names, stack, log=log)
# Set up a dictionary that will map from MAVLink message types that
# we are waiting for to lists of corresponding (predicate, future)
# pairs
matchers: Matchers = defaultdict(list)
# Override some of our properties with the values we were called with
stack.enter_context(
overridden(
self,
log=log,
driver=driver,
manager=manager,
register_uav=register_uav,
_matchers=matchers,
)
)
async with open_nursery() as nursery:
# Start background tasks that check the configured start times
# on the drones at regular intervals and that take care of
# broadcasting the current light configuration to the drones
nursery.start_soon(self._scheduled_takeoff_manager.run)
nursery.start_soon(self._led_light_configuration_manager.run)
# Start the communication manager
try:
await manager.run(
consumer=self._handle_inbound_messages,
supervisor=supervisor,
log=log,
tasks=[self._generate_heartbeats],
)
finally:
for matcher in matchers.values():
for _, _, future in matcher:
future.cancel()
# Cancel all tasks in this nursery as we are about to shut down
nursery.cancel_scope.cancel()
async def broadcast_packet(
self, spec: MAVLinkMessageSpecification, channel: Optional[str] = None
) -> None:
"""Broadcasts a message to all UAVs in the network.
Parameters:
spec: the specification of the MAVLink message to send
channel: specifies the channel that the packet should be sent on;
defaults to the primary channel of the network
"""
await self.manager.broadcast_packet(spec, destination=channel)
def enqueue_rc_override_packet(self, channels: list[int]) -> None:
"""Handles a list of a RC channels that the server wishes to forward
to the drones as RC override.
Parameters:
channels: the values of the RC channels to send in a MAVLink
`RC_CHANNELS_OVERRIDE` message
"""
message = spec.rc_channels_override(
target_system=0,
target_component=0,
chan1_raw=channels[0],
chan2_raw=channels[1],
chan3_raw=channels[2],
chan4_raw=channels[3],
chan5_raw=channels[4],
chan6_raw=channels[5],
chan7_raw=channels[6],
chan8_raw=channels[7],
chan9_raw=channels[8],
chan10_raw=channels[9],
chan11_raw=channels[10],
chan12_raw=channels[11],
chan13_raw=channels[12],
chan14_raw=channels[13],
chan15_raw=channels[14],
chan16_raw=channels[15],
chan17_raw=channels[16],
chan18_raw=channels[17],
)
self.manager.enqueue_broadcast_packet(
message, destination=Channel.RC, allow_failure=True
)
def enqueue_rtk_correction_packet(self, packet: bytes) -> None:
"""Handles an RTK correction packet that the server wishes to forward
to the drones in this network.
Parameters:
packet: the raw RTK correction packet to forward to the drones in
this network
"""
if not self.manager:
return
# Do not send the RTK correction packet if the network has no drones yet
if self.num_uavs == 0:
return
for message in self._rtk_correction_packet_encoder.encode(packet):
self.manager.enqueue_broadcast_packet(
message, destination=Channel.RTK, allow_failure=True
)
def notify_led_light_config_changed(self, config: LightConfiguration):
"""Notifies the network that the LED light configuration of the drones
has changed in the system. The network will then update the LED light
configuration of each drone.
"""
self._led_light_configuration_manager.notify_config_changed(config)
def notify_scheduled_takeoff_config_changed(self, config: DroneShowConfiguration):
"""Notifies the network that the automatic start configuration of the
drones has changed in the system. The network will then update the
start configuration of each drone.
"""
self._scheduled_takeoff_manager.notify_config_changed(config)
def notify_show_clock_start_time_changed(self, start_time: Optional[float]) -> None:
"""Notifies the network that the show clock was started, stopped or
adjusted in the system.
"""
self._scheduled_takeoff_manager.notify_start_time_changed(start_time)
@property
def num_uavs(self) -> int:
"""Returns the number of UAVs in this network."""
return len(self._uavs)
async def send_heartbeat(self, target: MAVLinkUAV) -> Optional[MAVLinkMessage]:
"""Sends a heartbeat targeted to the given UAV.
It is assumed (and not checked) that the UAV belongs to this network.
Parameters:
target: the UAV to send the heartbeat to
"""
spec = HEARTBEAT_SPEC
address = self._uav_addresses.get(target)
if address is None:
raise RuntimeError("UAV has no address in this network")
destination = (Channel.PRIMARY, address)
await self.manager.send_packet(spec, destination)
async def send_packet(
self,
spec: MAVLinkMessageSpecification,
target: MAVLinkUAV,
*,
wait_for_response: Optional[tuple[str, MAVLinkMessageMatcher]] = None,
wait_for_one_of: Optional[dict[str, MAVLinkMessageSpecification]] = None,
channel: Optional[str] = None,
) -> Union[None, MAVLinkMessage, tuple[str, MAVLinkMessage]]:
"""Sends a message to the given UAV and optionally waits for a matching
response.
It is assumed (and not checked) that the UAV belongs to this network.
Parameters:
spec: the specification of the MAVLink message to send
target: the UAV to send the message to
wait_for_response: when not `None`, specifies a MAVLink message
type to wait for as a response, and an additional message
matcher that examines the message further to decide whether this
is really the response we are interested in. The matcher may be
`None` to match all messages of the given type, a dictionary
mapping MAVLink field names to expected values, or a callable
that gets called with the retrieved MAVLink message of the
given type and must return `True` if and only if the message
matches our expectations. The source system of the MAVLink
reply must also be equal to the system ID of the UAV where
the original message was sent.
channel: specifies the channel that the packet should be sent on;
defaults to the primary channel of the network
Returns:
``None`` if `wait_for_response` and `wait_for_one_of` are both
``None``; the received response if `wait_for_response` was not
``None``; the key of the matched message specification and the
message itself if `wait_for_one_of` was not ``None``.
"""
spec[1].update(
target_system=target.system_id,
target_component=MAVComponent.AUTOPILOT1,
_mavlink_version=target.mavlink_version,
)
address = self._uav_addresses.get(target)
if address is None:
raise RuntimeError("UAV has no address in this network")
destination = (channel or Channel.PRIMARY, address)
if wait_for_response:
response_type, response_fields = wait_for_response
with self.expect_packet(
response_type, response_fields, system_id=target.system_id
) as future:
# TODO(ntamas): in theory, we could be getting a matching packet
# _before_ we sent ours. Sort this out if it causes problems.
await self.manager.send_packet(spec, destination)
return await future.wait()
elif wait_for_one_of:
tasks = {}
with ExitStack() as stack:
# Prepare futures for every single message type that we expect,
# and then send the message itself
for key, (response_type, response_fields) in wait_for_one_of.items():
future = stack.enter_context(
self.expect_packet(
response_type, response_fields, system_id=target.system_id
)
)
tasks[key] = future.wait
# Now send the message and wait for _any_ of the futures to
# succeed
await self.manager.send_packet(spec, destination)
return await race(tasks)
else:
await self.manager.send_packet(spec, destination)
def uavs(self) -> Iterable[MAVLinkUAV]:
"""Returns an iterator that iterates over the UAVs in this network.
Make sure that you do not add new UAVs or remove existing ones while the
iteration takes place.
"""
return self._uavs.values() if self._uavs else []
def _create_uav(self, system_id: int) -> MAVLinkUAV:
"""Creates a new UAV with the given system ID in this network and
registers it in the UAV registry.
"""
uav_id = self._id_formatter(system_id + self._uav_system_id_offset, self.id)
self._uavs[system_id] = uav = self.driver.create_uav(uav_id)
uav.assign_to_network_and_system_id(self.id, system_id)
uav._rssi_mode = self._rssi_mode
self.register_uav(uav)
return uav
def _find_uav_from_message(
self, message: MAVLinkMessage, address: Any
) -> Optional[MAVLinkUAV]:
"""Finds the UAV that this message is sent from, based on its system ID,
creating a new UAV object if we have not seen the UAV yet.
Parameters:
message: the message
address: the address that the message was sent from
Returns:
the UAV belonging to the system ID of the message or `None` if the
message was a broadcast message
"""
system_id: int = message.get_srcSystem()
if system_id == 0:
return None
else:
uav = self._uavs.get(system_id)
if not uav:
uav = self._create_uav(system_id)
# TODO(ntamas): protect from address hijacking!
self._uav_addresses[uav] = address
return uav
async def _generate_heartbeats(self, manager: CommunicationManager):
"""Generates heartbeat messages on the channels corresponding to the
network.
"""
async for _ in periodic(1):
await manager.broadcast_packet(
HEARTBEAT_SPEC, destination=Channel.PRIMARY, allow_failure=True
)
async def _handle_inbound_messages(
self, channel: ReceiveChannel[tuple[str, tuple[MAVLinkMessage, Any]]]
):
"""Handles inbound MAVLink messages from all the communication links
that the extension manages.
Parameters:
channel: a Trio receive channel that yields inbound MAVLink messages.
"""
# We need to respond to RADIO_STATUS messages only if we are parsing the
# RSSI values from there, otherwise they can be ignored
radio_status_handler = (
self._handle_message_radio_status
if self._rssi_mode is RSSIMode.RADIO_STATUS
else nop
)
handlers = {
"AUTOPILOT_VERSION": self._handle_message_autopilot_version,
"BAD_DATA": nop,
"COMMAND_ACK": nop,
"COMMAND_LONG": self._handle_message_command_long,
"DATA16": self._handle_message_data,
"DATA32": self._handle_message_data,
"DATA64": self._handle_message_data,
"DATA96": self._handle_message_data,
"FENCE_STATUS": nop,
"FILE_TRANSFER_PROTOCOL": nop,
"GLOBAL_POSITION_INT": self._handle_message_global_position_int,
"GPS_GLOBAL_ORIGIN": nop,
"GPS_RAW_INT": self._handle_message_gps_raw_int,
"HEARTBEAT": self._handle_message_heartbeat,
"HOME_POSITION": nop,
"HWSTATUS": nop,
"LOCAL_POSITION_NED": nop, # maybe later?
"LOG_DATA": self._handle_message_log_data,
"LOG_ENTRY": self._handle_message_log_entry,
"MAG_CAL_PROGRESS": self._handle_message_mag_cal_progress,
"MAG_CAL_REPORT": self._handle_message_mag_cal_report,
"MEMINFO": nop,
"MISSION_ACK": nop, # used for mission and geofence download / upload
"MISSION_COUNT": nop, # used for mission and geofence download / upload
"MISSION_CURRENT": nop, # maybe later?
"MISSION_ITEM_INT": nop, # used for mission and geofence download / upload
"MISSION_REQUEST": nop, # used for mission and geofence download / upload
"NAV_CONTROLLER_OUTPUT": nop,
"PARAM_VALUE": nop,
"POSITION_TARGET_GLOBAL_INT": nop,
"POWER_STATUS": nop,
"RADIO_STATUS": radio_status_handler,
"STATUSTEXT": self._handle_message_statustext,
"SYS_STATUS": self._handle_message_sys_status,
"TIMESYNC": self._handle_message_timesync,
"V2_EXTENSION": self._handle_message_v2_extension,
}
autopilot_component_id = MAVComponent.AUTOPILOT1
udp_bridge_id = MAVComponent.UDP_BRIDGE
# Many third-party MAVLink-based drones do not respond to broadcast
# messages sent to them with an IP address of 255.255.255.255 as they
# listen to the subnet-specific broadcast address only (e.g., 192.168.0.255).
# Therefore, we need to re-bind the broadcast address of the channel
# as soon as we have received the first packet from it, based on the
# address of that packet and the netmasks of the network interfaces.
# The following dict stores whether a link was already switched to its
# subnet-specific broadcast address
broadcast_address_updated: dict[str, bool] = defaultdict(bool)
async for connection_id, (message, address) in channel:
# Uncomment this for debugging
# self.log.info(repr(message))
# SiK radios use system ID = 51 and component ID = 68
# (MAV_COMP_ID_TELEMETRY_RADIO)
# mavesp8266 uses the correct system ID and component ID = 0xf0
# (MAV_COMP_ID_UDP_BRIDGE)
# Get the source component and the message type
src_component = message.get_srcComponent()
type = message.get_type()
# Determine whether we should process this message
should_process = src_component == autopilot_component_id or (
src_component == udp_bridge_id and type == "RADIO_STATUS"
)
if not should_process:
continue
# Update the broadcast address to a subnet-specific one if needed
if not broadcast_address_updated[connection_id]:
await self._update_broadcast_address_of_channel_to_subnet(
connection_id, address
)
broadcast_address_updated[connection_id] = True
# Resolve all futures that are waiting for this message
for system_id, params, future in self._matchers[type]:
# Check system ID early on and skip if it does not match
if system_id is not None and message.get_srcSystem() != system_id:
continue
if future.done():
# This may happen if we get multiple matching messages in
# quick succession before the task waiting for the result
# gets a chance of responding to them; in this case, we
# have to ignore the message, otherwise we would be resolving
# the future twice
continue
elif callable(params):
matched = params(message)
elif params is None:
matched = True
else:
matched = all(
getattr(message, param_name, None) == param_value
for param_name, param_value in params.items()
)
if matched:
future.set_result(message)
# Call the message handler if we have one
handler = handlers.get(type)
if handler:
try:
handler(message, connection_id=connection_id, address=address)
except Exception:
self.log.exception(
f"Error while handling MAVLink message of type {type}"
)
else:
self.log.warning(
f"Unhandled MAVLink message type: {type}",
extra=self._log_extra_from_message(message),
)
handlers[type] = nop
def _handle_message_autopilot_version(
self, message: MAVLinkMessage, *, connection_id: str, address: Any
):
uav = self._find_uav_from_message(message, address)
if uav:
uav.handle_message_autopilot_version(message)
def _handle_message_command_long(
self, message: MAVLinkMessage, *, connection_id: str, address: Any
):
"""Handles an incoming MAVLink COMMAND_LONG message."""
uav = self._find_uav_from_message(message, address)
if uav:
uav.handle_message_command_long(message)
def _handle_message_data(
self, message: MAVLinkMessage, *, connection_id: str, address: Any
):
if message.type == DroneShowStatus.TYPE:
uav = self._find_uav_from_message(message, address)
if uav:
uav.handle_message_drone_show_status(message)
def _handle_message_global_position_int(
self, message: MAVLinkMessage, *, connection_id: str, address: Any
):
uav = self._find_uav_from_message(message, address)
if uav:
uav.handle_message_global_position_int(message)
def _handle_message_gps_raw_int(
self, message: MAVLinkMessage, *, connection_id: str, address: Any
):
uav = self._find_uav_from_message(message, address)
if uav:
uav.handle_message_gps_raw_int(message)
def _handle_message_heartbeat(
self, message: MAVLinkMessage, *, connection_id: str, address: Any
):
"""Handles an incoming MAVLink HEARTBEAT message."""
if not MAVType(message.type).is_vehicle:
# Ignore non-vehicle heartbeats
return
# Forward heartbeat to the appropriate UAV
uav = self._find_uav_from_message(message, address)
if uav:
uav.handle_message_heartbeat(message)
def _handle_message_log_data(
self, message: MAVLinkMessage, *, connection_id: str, address: Any
):
"""Handles an incoming MAVLink LOG_DATA message."""
uav = self._find_uav_from_message(message, address)
if uav:
uav.handle_message_log_data(message)
def _handle_message_log_entry(
self, message: MAVLinkMessage, *, connection_id: str, address: Any
):
"""Handles an incoming MAVLink LOG_ENTRY message."""
uav = self._find_uav_from_message(message, address)
if uav:
uav.handle_message_log_entry(message)
def _handle_message_mag_cal_progress(
self, message: MAVLinkMessage, *, connection_id: str, address: Any
):
"""Handles an incoming MAVLink MAG_CAL_PROGRESS message."""
uav = self._find_uav_from_message(message, address)
if uav:
uav.handle_message_mag_cal_progress(message)
def _handle_message_mag_cal_report(
self, message: MAVLinkMessage, *, connection_id: str, address: Any
):
"""Handles an incoming MAVLink MAG_CAL_REPORT message."""
uav = self._find_uav_from_message(message, address)
if uav:
uav.handle_message_mag_cal_report(message)
def _handle_message_radio_status(
self, message: MAVLinkMessage, *, connection_id: str, address: Any
):
"""Handles an incoming MAVLink RADIO_STATUS message."""
uav = self._find_uav_from_message(message, address)
if uav:
uav.handle_message_radio_status(message)
def _handle_message_statustext(
self, message: MAVLinkMessage, *, connection_id: str, address: Any
):
"""Handles an incoming MAVLink STATUSTEXT message and forwards it to the
log console or to the GCS, depending on how the network is set up.
"""
text = message.text
if not text:
return
uav = self._find_uav_from_message(message, address)
if uav and uav._autopilot.is_prearm_error_message(text):
uav.notify_prearm_failure(uav._autopilot.process_prearm_error_message(text))
if not self._statustext_targets.log_prearm:
return
severity: int = message.severity
if severity <= self._statustext_targets.server:
extra = self._log_extra_from_message(message, uav)
extra["telemetry"] = "ignore"
self.log.log(
python_log_level_from_mavlink_severity(message.severity),
text,
extra=extra,
)
if severity <= self._statustext_targets.client and uav is not None:
uav.send_log_message_to_gcs(
text,
severity=flockwave_severity_from_mavlink_severity(message.severity),
)
def _handle_message_sys_status(
self, message: MAVLinkMessage, *, connection_id: str, address: Any
):
uav = self._find_uav_from_message(message, address)
if uav:
uav.handle_message_sys_status(message)
def _handle_message_timesync(
self, message: MAVLinkMessage, *, connection_id: str, address: Any
):
"""Handles an incoming MAVLink TIMESYNC message."""
if message.tc1 != 0:
now = time_ns() // 1000
self.log.info(f"Roundtrip time: {(now - message.ts1) // 1000} msec")
else:
# Timesync request, ignore it.
pass
def _handle_message_v2_extension(
self, message: MAVLinkMessage, *, connection_id: str, address: Any
):
"""Handles an incoming MAVLink V2_EXTENSION message, currently used to
carry debug information injected dynamically by a Skybrush MAVLink
proxy. Not used in production.
"""
if message.message_type != 42424:
# Not for us
return
length = message.payload[0]
payload = message.payload[1 : (length + 1)]
self.log.info(repr(payload))
def _log_message(
self, message: MAVLinkMessage, *, connection_id: str, address: Any
):
"""Logs an incoming MAVLink message for debugging purposes."""
self.log.debug(str(message))
def _log_extra_from_message(
self, message: MAVLinkMessage, uav: MAVLinkUAV | None = None
) -> dict[str, Any]:
if uav:
return {"id": log_id_for_uav(uav)}
else:
return {"id": log_id_from_message(message, self.id)}
def _register_connection_aliases(
self,
manager: CommunicationManager,
connection_names: Sequence[str],
stack: ExitStack,
log,
) -> None:
"""Registers some connection aliases in the given communication manager
object so we can simply send RTK packets to an `Channel.RTK` alias and
ensure that it ends up at the correct connection.
This method is called automatically from the main task of this network
during initialization; no need to call it manually.
"""
if not self._connections:
return
def register_by_index(
alias: str, index: Optional[Union[int, Iterable[int]]]
) -> list[str]:
if index is None:
index = 0
indices = [index] if isinstance(index, int) else list(index)
# If the list contains a channel that does not exist, replace it
# with channel 0 instead. Also filter duplicates.
targets = [
connection_names[index]
for index in sorted(
{
index if index >= 0 and index < len(connection_names) else 0
for index in indices
}
)
]
stack.enter_context(manager.with_alias(alias, targets=targets))
return targets
extra = {"id": self.id}
# Register the first connection as the primary
channels = register_by_index(Channel.PRIMARY, 0)
secondary_channels = register_by_index(Channel.SECONDARY, 1)
if channels != secondary_channels:
log.info(
f"Routing primary traffic to {format_channel_ids(channels)}, "
f"falling back to {format_channel_ids(secondary_channels)}",
extra=extra,
)
else:
log.info(
f"Routing primary traffic to {format_channel_ids(channels)}",
extra=extra,
)
# Register the RTK channel according to the routing setup
channels = format_channel_ids(
register_by_index(Channel.RTK, self._routing.get("rtk"))
)
if channels:
log.info(f"Routing RTK corrections to {channels}", extra=extra)
# Register the RTK channel according to the routing setup
channels = format_channel_ids(
register_by_index(Channel.RC, self._routing.get("rc"))
)
if channels:
log.info(f"Routing RC overrides to {channels}", extra=extra)
async def _update_broadcast_address_of_channel_to_subnet(
self, connection_id: str, address: tuple[str, int], timeout: float = 1
) -> None:
"""Updates the broadcast address of the connection with the given ID to the
subnet-specific broadcast address of the network interface that received
a packet from the given address.
"""
if isinstance(address, tuple):
ip, port = address
else:
# Not a wireless network
return
subnets = None
with move_on_after(timeout):
subnets = await to_thread.run_sync(find_interfaces_with_address, ip)
success = False
if subnets:
interface, subnet = subnets[0]
# HACK HACK HACK this is an ugly temporary fix; we are reaching into
# the internals of self.manager, which we shouldn't do
for entries in self.manager._entries_by_name.values():
for entry in entries:
if entry.channel and entry.name == connection_id:
broadcast_address = subnet.broadcast_address
if str(broadcast_address) == "127.255.255.255":
# We are on localhost, so just keep on using localhost
broadcast_address = "127.0.0.1"
# Try to figure out the current broadcast address of the channel
# TODO(ntamas): this should be sorted out in the future
old_broadcast_address = getattr(
entry.channel, "broadcast_address", None
)
if old_broadcast_address is None:
connection = getattr(entry, "connection", None)
if connection:
old_broadcast_address = getattr(
connection, "broadcast_address", None
)
if (
old_broadcast_address
and isinstance(old_broadcast_address, tuple)
and len(old_broadcast_address) == 2
):
# Keep the port, update the address only
broadcast_port = old_broadcast_address[1]
else:
# Hmmm, let's just assume that the source port of this packet is okay
broadcast_port = port
conn = entry.connection
if isinstance(conn, UDPListenerConnection):
conn.set_user_defined_broadcast_address(
(
str(broadcast_address),
broadcast_port,
)
)
self.log.info(
f"Broadcast address updated to {broadcast_address}:{broadcast_port} "
f"({interface})",
extra={"id": connection_id},
)
success = True
if not success:
self.log.warning(
"Failed to update broadcast address to a subnet-specific one"
)
def format_channel_ids(ids: Sequence[str]) -> str:
"""Formats a list of communication channel IDs in a way that is suitable for
printing in human-readable logs.
"""
parts: list[str] = []
for index, id in enumerate(ids):
parts.append(id)
if index < len(ids) - 1:
parts.append(", " if index < len(ids) - 2 else " and ")
return "".join(parts)