Repository URL to install this package:
|
Version:
2.33.0 ▾
|
from dataclasses import dataclass
from datetime import timezone
from enum import IntEnum, IntFlag
from functools import lru_cache
from struct import Struct, pack
from time import time
from typing import ClassVar, Optional, Sequence
from flockwave.gps.time import unix_to_gps_time_of_week, gps_time_of_week_to_utc
from flockwave.server.ext.show.config import AuthorizationScope
from flockwave.server.model.gps import GPSFixType as OurGPSFixType
from .enums import GPSFixType
from .types import MAVLinkMessage, MAVLinkMessageSpecification, spec
__all__ = ("DroneShowStatus", "create_led_control_packet")
_EMPTY = b"\x00" * 256
"""Helper constant used when we try to send an empty byte array via MAVLink."""
MSEC_IN_WEEK = 604800000
"""Number of milliseconds in a normal week."""
def authorization_scope_to_int(authorization_scope: AuthorizationScope) -> int:
"""Converts an authorization scope enum value to its integer representation
in the Skybrush firmware.
"""
if authorization_scope is AuthorizationScope.NONE:
return 0
elif authorization_scope is AuthorizationScope.LIVE:
return 1
elif authorization_scope is AuthorizationScope.REHEARSAL:
return 2
elif authorization_scope is AuthorizationScope.LIGHTS_ONLY:
return 3
else:
return 0
def authorization_scope_from_int(value: int) -> AuthorizationScope:
"""Converts the integer representation of an authorization scope enum value
to the enum itself.
"""
if value == 1:
return AuthorizationScope.LIVE
elif value == 2:
return AuthorizationScope.REHEARSAL
elif value == 3:
return AuthorizationScope.LIGHTS_ONLY
else:
return AuthorizationScope.NONE
def create_custom_data_packet(type: int, payload: bytes) -> MAVLinkMessageSpecification:
"""Creates a custom data packet used by our firmware with the given type
and payload.
"""
length = len(payload) + 1
if length <= 16:
padded_length = 16
packet = spec.data16
elif length <= 32:
padded_length = 32
packet = spec.data32
elif length <= 64:
padded_length = 64
packet = spec.data64
elif length <= 96:
padded_length = 96
packet = spec.data96
else:
raise ValueError("payload too long")
return packet(
type=0x5C,
len=length,
data=bytes([type]) + payload + b"\x00" * (padded_length - length),
)
def create_start_time_configuration_packet(
authorization_scope: AuthorizationScope,
start_time: Optional[float] = None,
should_update_takeoff_time: bool = True,
) -> MAVLinkMessageSpecification:
"""Creates a custom command packet used by our firmware that sets the
scheduled takeoff time and the takeoff authorization of the swarm.
Parameters:
start_time: the desired takeoff time of the swarm as a UNIX timestamp;
`None` if it should be cleared
authorized: whether the swarm is authorized to take off
should_update_takeoff_time: whether the desired takeoff time should be
updated on the swarm; set this to `False` if you do not want to
change the start time, only the authorization flag
"""
if not should_update_takeoff_time:
# do not touch; this is expressed by a value larger than 604800 seconds
# on the drone's side.
start_time = 0x7FFFFFFF
msec_until_start = 0x7FFFFFFF
elif start_time is None or start_time < 0:
# clear start time; this is expressed by a value smaller than -604800
# seconds on the drone's side
start_time = -0x80000000
msec_until_start = -0x80000000
else:
# convert from UNIX timestamp to GPS time-of-week
msec_until_start = int(1000 * (start_time - time()))
msec_until_start = min(max(msec_until_start, -MSEC_IN_WEEK), MSEC_IN_WEEK)
_, start_time = unix_to_gps_time_of_week(int(start_time))
scope_value = authorization_scope_to_int(authorization_scope)
return create_custom_data_packet(
type=1, payload=pack("<iBi", start_time, scope_value, msec_until_start)
)
def create_led_control_packet(
data: Optional[Sequence[int]] = None, broadcast: bool = False
) -> MAVLinkMessageSpecification:
"""Creates a special LED light control packet used by our firmware."""
kwds = {
"instance": 42,
"pattern": 42,
"custom_len": len(data) if data else 0,
"custom_bytes": bytes(data) + _EMPTY[len(data) :] if data else _EMPTY,
}
if broadcast:
kwds.update(target_system=0, target_component=0)
return spec.led_control(**kwds)
def format_elapsed_time(value: int) -> str:
"""Formats an elapsed time value in seconds into hour-minute-seconds
format.
"""
if value < 0:
sign = "-"
value = -value
else:
sign = " "
minutes, seconds = divmod(value, 60)
hours, minutes = divmod(minutes, 60)
return f"{sign}{hours:02}:{minutes:02}:{seconds:02}"
@lru_cache(maxsize=128)
def format_gps_time_of_week(value: int) -> str:
# TODO(ntamas): the cache should be invalidated every week, but at the
# moment it is unlikely that the server keeps on running for more than a
# week without interruptions, so let's just ignore this for the time being.
# If we really need to, we can call format_gps_time_of_week.cache_clear()
# regularly.
if value < 0:
return "---:--:--"
else:
dt = gps_time_of_week_to_utc(value)
assert dt.tzinfo is timezone.utc
return dt.astimezone(tz=None).strftime("@%H:%M:%S")
class DroneShowStatusFlag(IntFlag):
"""Status flags used in the Skybrush-specific drone show status packet."""
# Flags from now on come from the third flag byte in the status packet;
# four most significant bits only.
IS_FAR_FROM_EXPECTED_POSITION = 1 << 17
# Flags from now on come from the second flag byte in the status packet;
# four most significant bits only.
IS_MISPLACED_BEFORE_TAKEOFF = 1 << 11
UNUSED_1 = 1 << 10
UNUSED_2 = 1 << 9
UNUSED_3 = 1 << 8
# Flags from now on come from the first flag byte in the status packet
HAS_SHOW_FILE = 1 << 7
HAS_START_TIME = 1 << 6
HAS_ORIGIN = 1 << 5
HAS_ORIENTATION = 1 << 4
HAS_GEOFENCE = 1 << 3
HAS_AUTHORIZATION_TO_START = 1 << 2
IS_GPS_TIME_BAD = 1 << 1
GEOFENCE_BREACHED = 1 << 0
# Dummy member to be used as a default value
OFF = 0
_stage_descriptions = {
0: "",
1: "Initializing...",
2: "Countdown to show start",
3: "Taking off",
4: "Performing show",
5: "Return to launch",
6: "Position hold",
7: "Landing",
8: "Landed",
9: "Error",
10: "Running light program",
}
class DroneShowExecutionStage(IntEnum):
"""Execution stage constants in the Skybrush-specific drone show status
packet.
"""
UNKNOWN = -1
OFF = 0
INIT = 1
WAIT_FOR_START_TIME = 2
TAKEOFF = 3
PERFORMING = 4
RTL = 5
LOITER = 6
LANDING = 7
LANDED = 8
ERROR = 9
TESTING_LIGHTS = 10
@property
def probably_airborne(self) -> bool:
"""Returns whether the drone is probably airborne while performing
this stage.
"""
return 3 <= self <= 7
@property
def has_error(self) -> bool:
"""returns whether the execution stage represents an error condition."""
return self < 0 or self == 9
@property
def description(self) -> str:
"""Returns a human-readable description of the stage."""
description = _stage_descriptions.get(self)
description = (
description if description is not None else f"Unknown stage {self}"
)
return description
@dataclass
class DroneShowStatus:
"""Data class representing a Skybrush-specific drone show status object."""
start_time: int = -1
"""Scheduled start time of the drone show, in GPS seconds of week, negative
if not set.
"""
elapsed_time: int = 0
"""Number of seconds elapsed in the drone show."""
flags: int = DroneShowStatusFlag.OFF.value
"""Various status flags."""
stage: DroneShowExecutionStage = DroneShowExecutionStage.OFF
"""Stage of the drone show execution."""
light: int = 0
"""Current color of the RGB light, in RGB565 encoding."""
gps_fix: OurGPSFixType = OurGPSFixType.NO_GPS
"""Current GPS fix."""
num_satellites: int = 0
"""Number of satellites seen."""
authorization_scope: AuthorizationScope = AuthorizationScope.NONE
"""Authorization scope of the scheduled start of the drone show."""
rtcm_counters: Sequence[Optional[int]] = (None, None)
"""Number of RTCM messages received from the primary and secondary channels
in the last few seconds.
"""
TYPE: ClassVar[int] = 0x5B
"""Identifier of Skybrush-specific DATA16 show status packets."""
_struct: ClassVar[Struct] = Struct("<iHBBBBhBB")
"""Structure of Skybrush-specific DATA16 show status packets."""
@classmethod
def from_bytes(cls, data: bytes):
"""Constructs a DroneShowStatus_ object from the raw body of a MAVLink
DATA16 packet that has already been truncated to the desired length of
the packet.
"""
data_len = len(data)
if data_len < 14:
data = data.ljust(14, b"\x00")
(
start_time,
light,
flags,
flags2,
gps_health,
flags3,
elapsed_time,
primary_rtcm_count_plus_one,
secondary_rtcm_count_plus_one,
) = cls._struct.unpack(data[:14])
# Merge flags, flags2 and flags3 into one byte. Lower 4 bits of flags2
# is the execution stage. Lower 2 bits of flags3 is the boot count
# modulo 4.
flags |= (flags3 & 0xF0) << 10
flags |= (flags2 & 0xF0) << 4
stage = flags2 & 0x0F
if data_len <= 9:
# No "flags3" field in the original packet. "flags3" contains the
# authorization scope and we need to keep it consistent with the
# authorization flag in the "flags" field.
if flags & DroneShowStatusFlag.HAS_AUTHORIZATION_TO_START.value:
flags3 |= (0x01) << 2
# validate the authorization scope
scope = authorization_scope_from_int((flags3 & 0x0C) >> 2)
# validate the execution stage
try:
stage = DroneShowExecutionStage(stage)
except Exception:
stage = DroneShowExecutionStage.UNKNOWN
# process the RTCM counters
rtcm_counters = (
primary_rtcm_count_plus_one - 1
if primary_rtcm_count_plus_one > 0
else None,
secondary_rtcm_count_plus_one - 1
if secondary_rtcm_count_plus_one > 0
else None,
)
return cls(
start_time=start_time,
elapsed_time=elapsed_time,
light=light,
flags=flags,
stage=stage,
gps_fix=GPSFixType.to_ours(gps_health & 0x07),
num_satellites=gps_health >> 3,
authorization_scope=scope,
rtcm_counters=rtcm_counters,
)
@classmethod
def from_mavlink_message(cls, message: MAVLinkMessage):
"""Constructs a DroneShowStatus_ object from a MAVLink DATA16 packet.
Raises:
ValueError: if the type of the MAVLink DATA16 packet does not match
the expected type of a Skybrush-specific show status packet
"""
if message.type != cls.TYPE:
raise ValueError(
f"type of MAVLink packet is {message.type}, expected {cls.TYPE}"
)
return cls.from_bytes(bytes(message.data[: message.len]))
@property
def has_start_time(self) -> bool:
"""Returns whether there is a valid start time in the drone show status
message.
"""
return self.start_time >= 0
@property
def has_takeoff_authorization(self) -> bool:
"""Returns whether the takeoff authorization flag is set in the drone
show status message.
"""
return bool(self.flags & DroneShowStatusFlag.HAS_AUTHORIZATION_TO_START.value)
@property
def has_timesync_error(self) -> bool:
"""Returns whether there is probably a time synchronization problem
as we are receiving invalid timestamps from the GPS.
"""
return bool(self.flags & DroneShowStatusFlag.IS_GPS_TIME_BAD.value)
@property
def is_far_from_expected_position(self) -> bool:
"""Returns whether the drone seems to be far from its expected position,
typically during a show.
"""
return bool(
self.flags & DroneShowStatusFlag.IS_FAR_FROM_EXPECTED_POSITION.value
)
@property
def is_geofence_breached(self) -> bool:
"""Returns whether at least one of the geofences is breached, even if
the drone is configured not to act on them (i.e. report only)."""
return bool(self.flags & DroneShowStatusFlag.GEOFENCE_BREACHED.value)
@property
def is_misplaced_before_takeoff(self) -> bool:
"""Returns whether we are currently before the takeoff stage and the
drone seems to be misplaced.
"""
return bool(self.flags & DroneShowStatusFlag.IS_MISPLACED_BEFORE_TAKEOFF.value)
@property
def message(self) -> str:
"""Returns a short status message string that can be used for reporting
the status of the drone show subsystem on the UI.
"""
if self.stage.probably_airborne or self.elapsed_time >= -30:
clock = format_elapsed_time(self.elapsed_time)
else:
clock = format_gps_time_of_week(self.start_time)
message = self._format_message()
return f"[{clock}] {message}"
def _format_message(self) -> str:
"""Formats a status message from the execution stage and flags found in
the drone show status packet.
"""
# This function is called frequently, and Python enums are a bit slow
# so we optimize enum access by using the 'value' property on them
flags = self.flags
stage = self.stage
if not flags & DroneShowStatusFlag.HAS_SHOW_FILE.value:
return "No show data"
# If we are in a stage that implies that we are flying or we have an
# error, then it is more important than any info we could get from the
# flags so we show that
if stage.probably_airborne or stage.has_error:
return stage.description
# Looks like we are on the ground, so show the info that we can gather
# from the flags
if not flags & DroneShowStatusFlag.HAS_ORIGIN.value:
return "Origin not set"
elif not flags & DroneShowStatusFlag.HAS_ORIENTATION.value:
return "Orientation not set"
elif flags & DroneShowStatusFlag.IS_MISPLACED_BEFORE_TAKEOFF.value:
return "Not at takeoff position"
elif (
not flags & DroneShowStatusFlag.HAS_START_TIME.value
and stage != DroneShowExecutionStage.LANDED
):
if flags & DroneShowStatusFlag.HAS_AUTHORIZATION_TO_START.value:
return "Authorized without start time"
elif self.gps_fix < OurGPSFixType.FIX_3D.value:
# This is needed here to explain why the start time might not
# have been set yet; interpreting the SHOW_START_TIME parameter
# needs GPS fix
return "No 3D GPS fix yet"
elif flags & DroneShowStatusFlag.IS_GPS_TIME_BAD.value:
# If we get here, it means that we _do_ have 3D fix _but_ we
# still don't have a GPS timestamp. This can happen only if the
# GPS is not sending us the full timestamp; e.g., if it sends
# the iTOW but not the GPS week number (as it is on Entron 300
# drones)
return "Invalid GPS timestamp"
elif stage is DroneShowExecutionStage.OFF:
# We are not even in show mode so the start time info is not relevant
return ""
else:
return "Start time not set"
elif (
not flags & DroneShowStatusFlag.HAS_AUTHORIZATION_TO_START.value
and stage != DroneShowExecutionStage.LANDED
):
if stage is DroneShowExecutionStage.OFF:
# We are not even in show mode so the lack of authorization is not relevant
return ""
else:
return "Not authorized to start"
elif not flags & DroneShowStatusFlag.HAS_GEOFENCE.value:
return "Geofence not set"
elif self.gps_fix < OurGPSFixType.FIX_3D:
return "No 3D GPS fix yet"
# We are on the ground but there's nothing important to report from the
# flags so just show the description of the stage
return stage.description