Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-server / server / ext / mavlink / packets.py
Size: Mime:
from __future__ import annotations

from dataclasses import dataclass
from datetime import timezone
from enum import IntEnum, IntFlag
from functools import lru_cache
from struct import Struct, pack
from time import time
from typing import ClassVar, Optional, Sequence

from flockwave.gps.time import unix_to_gps_time_of_week, gps_time_of_week_to_utc
from flockwave.server.ext.show.config import AuthorizationScope
from flockwave.server.model.gps import GPSFixType as OurGPSFixType

from .enums import GPSFixType
from .types import MAVLinkMessage, MAVLinkMessageSpecification, spec

__all__ = ("DroneShowStatus", "create_led_control_packet")


_EMPTY = b"\x00" * 256
"""Helper constant used when we try to send an empty byte array via MAVLink."""

MSEC_IN_WEEK = 604800000
"""Number of milliseconds in a normal week."""


def authorization_scope_to_int(authorization_scope: AuthorizationScope) -> int:
    """Converts an authorization scope enum value to its integer representation
    in the Skybrush firmware.
    """
    if authorization_scope is AuthorizationScope.NONE:
        return 0
    elif authorization_scope is AuthorizationScope.LIVE:
        return 1
    elif authorization_scope is AuthorizationScope.REHEARSAL:
        return 2
    elif authorization_scope is AuthorizationScope.LIGHTS_ONLY:
        return 3
    else:
        return 0


def authorization_scope_from_int(value: int) -> AuthorizationScope:
    """Converts the integer representation of an authorization scope enum value
    to the enum itself.
    """
    if value == 1:
        return AuthorizationScope.LIVE
    elif value == 2:
        return AuthorizationScope.REHEARSAL
    elif value == 3:
        return AuthorizationScope.LIGHTS_ONLY
    else:
        return AuthorizationScope.NONE


def create_custom_data_packet(type: int, payload: bytes) -> MAVLinkMessageSpecification:
    """Creates a custom data packet used by our firmware with the given type
    and payload.
    """
    length = len(payload) + 1
    if length <= 16:
        padded_length = 16
        packet = spec.data16
    elif length <= 32:
        padded_length = 32
        packet = spec.data32
    elif length <= 64:
        padded_length = 64
        packet = spec.data64
    elif length <= 96:
        padded_length = 96
        packet = spec.data96
    else:
        raise ValueError("payload too long")

    return packet(
        type=0x5C,
        len=length,
        data=bytes([type]) + payload + b"\x00" * (padded_length - length),
    )


def create_start_time_configuration_packet(
    authorization_scope: AuthorizationScope,
    start_time: Optional[float] = None,
    should_update_takeoff_time: bool = True,
) -> MAVLinkMessageSpecification:
    """Creates a custom command packet used by our firmware that sets the
    scheduled takeoff time and the takeoff authorization of the swarm.

    Parameters:
        start_time: the desired takeoff time of the swarm as a UNIX timestamp;
            `None` if it should be cleared
        authorized: whether the swarm is authorized to take off
        should_update_takeoff_time: whether the desired takeoff time should be
            updated on the swarm; set this to `False` if you do not want to
            change the start time, only the authorization flag
    """
    if not should_update_takeoff_time:
        # do not touch; this is expressed by a value larger than 604800 seconds
        # on the drone's side.
        start_time = 0x7FFFFFFF
        msec_until_start = 0x7FFFFFFF
    elif start_time is None or start_time < 0:
        # clear start time; this is expressed by a value smaller than -604800
        # seconds on the drone's side
        start_time = -0x80000000
        msec_until_start = -0x80000000
    else:
        # convert from UNIX timestamp to GPS time-of-week
        msec_until_start = int(1000 * (start_time - time()))
        msec_until_start = min(max(msec_until_start, -MSEC_IN_WEEK), MSEC_IN_WEEK)
        _, start_time = unix_to_gps_time_of_week(int(start_time))

    scope_value = authorization_scope_to_int(authorization_scope)
    return create_custom_data_packet(
        type=1, payload=pack("<iBi", start_time, scope_value, msec_until_start)
    )


def create_led_control_packet(
    data: Optional[Sequence[int]] = None, broadcast: bool = False
) -> MAVLinkMessageSpecification:
    """Creates a special LED light control packet used by our firmware."""
    kwds = {
        "instance": 42,
        "pattern": 42,
        "custom_len": len(data) if data else 0,
        "custom_bytes": bytes(data) + _EMPTY[len(data) :] if data else _EMPTY,
    }
    if broadcast:
        kwds.update(target_system=0, target_component=0)
    return spec.led_control(**kwds)


def format_elapsed_time(value: int) -> str:
    """Formats an elapsed time value in seconds into hour-minute-seconds
    format.
    """
    if value < 0:
        sign = "-"
        value = -value
    else:
        sign = " "

    minutes, seconds = divmod(value, 60)
    hours, minutes = divmod(minutes, 60)
    return f"{sign}{hours:02}:{minutes:02}:{seconds:02}"


@lru_cache(maxsize=128)
def format_gps_time_of_week(value: int) -> str:
    # TODO(ntamas): the cache should be invalidated every week, but at the
    # moment it is unlikely that the server keeps on running for more than a
    # week without interruptions, so let's just ignore this for the time being.
    # If we really need to, we can call format_gps_time_of_week.cache_clear()
    # regularly.
    if value < 0:
        return "---:--:--"
    else:
        dt = gps_time_of_week_to_utc(value)
        assert dt.tzinfo is timezone.utc
        return dt.astimezone(tz=None).strftime("@%H:%M:%S")


class DroneShowStatusFlag(IntFlag):
    """Status flags used in the Skybrush-specific drone show status packet."""

    # Flags from now on come from the third flag byte in the status packet;
    # four most significant bits only.
    IS_FAR_FROM_EXPECTED_POSITION = 1 << 17
    HAS_HIGH_ESC_ERROR_RATE = 1 << 16

    # Flags from now on come from the second flag byte in the status packet;
    # four most significant bits only.
    IS_MISPLACED_BEFORE_TAKEOFF = 1 << 11
    UNUSED_1 = 1 << 10
    UNUSED_2 = 1 << 9
    UNUSED_3 = 1 << 8

    # Flags from now on come from the first flag byte in the status packet
    HAS_SHOW_FILE = 1 << 7
    HAS_START_TIME = 1 << 6
    HAS_ORIGIN = 1 << 5
    HAS_ORIENTATION = 1 << 4
    HAS_GEOFENCE = 1 << 3
    HAS_AUTHORIZATION_TO_START = 1 << 2
    IS_GPS_TIME_BAD = 1 << 1
    GEOFENCE_BREACHED = 1 << 0

    # Dummy member to be used as a default value
    OFF = 0


_stage_descriptions = {
    0: "",
    1: "Initializing...",
    2: "Countdown to show start",
    3: "Taking off",
    4: "Performing show",
    5: "Return to launch",
    6: "Position hold",
    7: "Landing",
    8: "Landed",
    9: "Error",
    10: "Running light program",
}


class DroneShowExecutionStage(IntEnum):
    """Execution stage constants in the Skybrush-specific drone show status
    packet.
    """

    UNKNOWN = -1
    OFF = 0
    INIT = 1
    WAIT_FOR_START_TIME = 2
    TAKEOFF = 3
    PERFORMING = 4
    RTL = 5
    LOITER = 6
    LANDING = 7
    LANDED = 8
    ERROR = 9
    TESTING_LIGHTS = 10

    @property
    def probably_airborne(self) -> bool:
        """Returns whether the drone is probably airborne while performing
        this stage.
        """
        return 3 <= self <= 7

    @property
    def has_error(self) -> bool:
        """returns whether the execution stage represents an error condition."""
        return self < 0 or self == 9

    @property
    def description(self) -> str:
        """Returns a human-readable description of the stage."""
        description = _stage_descriptions.get(self)
        description = (
            description if description is not None else f"Unknown stage {self}"
        )
        return description


@dataclass
class DroneShowStatus:
    """Dataclass representing a Skybrush-specific drone show status object.

    This dataclass encapsulates information that can be extracted from the
    standard drone show status packet. Information that can be extracted only
    from the extended status packet is provided in a separate object attached
    to the `extension` property.
    """

    start_time: int = -1
    """Scheduled start time of the drone show, in GPS seconds of week, negative
    if not set.
    """

    elapsed_time: int = 0
    """Number of seconds elapsed in the drone show."""

    flags: int = DroneShowStatusFlag.OFF.value
    """Various status flags."""

    stage: DroneShowExecutionStage = DroneShowExecutionStage.OFF
    """Stage of the drone show execution."""

    light: int = 0
    """Current color of the RGB light, in RGB565 encoding."""

    gps_fix: OurGPSFixType = OurGPSFixType.NO_GPS
    """Current GPS fix."""

    num_satellites: int = 0
    """Number of satellites seen."""

    authorization_scope: AuthorizationScope = AuthorizationScope.NONE
    """Authorization scope of the scheduled start of the drone show."""

    rtcm_counters: Sequence[Optional[int]] = (None, None)
    """Number of RTCM messages received from the primary and secondary channels
    in the last few seconds.
    """

    extension: Optional[DroneShowStatusExtension] = None
    """Extended status information, if provided at construction time."""

    TYPE: ClassVar[int] = 0x5B
    """Identifier of Skybrush-specific DATA* show status packets."""

    _struct: ClassVar[Struct] = Struct("<iHBBBBhBB")
    """Structure of Skybrush-specific DATA* show status packets."""

    @classmethod
    def from_bytes(cls, data: bytes):
        """Constructs a DroneShowStatus_ object from the raw body of a MAVLink
        DATA* packet that has already been truncated to the desired length of
        the packet.
        """
        # Remember the original length so we can decide how much padding was
        # added later
        data_len = len(data)
        is_extended = data_len > 14

        # Length of standard packet is 14 bytes. Pad it to the required length
        # if the data is shorter than 14 bytes.
        if data_len < 14:
            data = data.ljust(14, b"\x00")

        # Unpack the standard section of the packet
        (
            start_time,
            light,
            flags,
            flags2,
            gps_health,
            flags3,
            elapsed_time,
            primary_rtcm_count_plus_one,
            secondary_rtcm_count_plus_one,
        ) = cls._struct.unpack(data[:14])

        # Merge flags, flags2 and flags3 into one byte. Lower 4 bits of flags2
        # is the execution stage. Lower 2 bits of flags3 is the boot count
        # modulo 4.
        flags |= (flags3 & 0xF0) << 10
        flags |= (flags2 & 0xF0) << 4
        stage = flags2 & 0x0F

        if data_len <= 9:
            # No "flags3" field in the original packet. "flags3" contains the
            # authorization scope and we need to keep it consistent with the
            # authorization flag in the "flags" field.
            if flags & DroneShowStatusFlag.HAS_AUTHORIZATION_TO_START.value:
                flags3 |= (0x01) << 2

        # validate the authorization scope
        scope = authorization_scope_from_int((flags3 & 0x0C) >> 2)

        # validate the execution stage
        try:
            stage = DroneShowExecutionStage(stage)
        except Exception:
            stage = DroneShowExecutionStage.UNKNOWN

        # process the RTCM counters
        rtcm_counters = (
            primary_rtcm_count_plus_one - 1
            if primary_rtcm_count_plus_one > 0
            else None,
            secondary_rtcm_count_plus_one - 1
            if secondary_rtcm_count_plus_one > 0
            else None,
        )

        # if the packet is an extended status packet, parse the extended bits
        extension = (
            DroneShowStatusExtension.from_bytes(data[14:]) if is_extended else None
        )

        return cls(
            start_time=start_time,
            elapsed_time=elapsed_time,
            light=light,
            flags=flags,
            stage=stage,
            gps_fix=GPSFixType.to_ours(gps_health & 0x07),
            num_satellites=gps_health >> 3,
            authorization_scope=scope,
            rtcm_counters=rtcm_counters,
            extension=extension,
        )

    @classmethod
    def from_mavlink_message(cls, message: MAVLinkMessage):
        """Constructs a DroneShowStatus_ object from a MAVLink DATA* packet.

        Raises:
            ValueError: if the type of the MAVLink DATA* packet does not match
                the expected type of a Skybrush-specific show status packet
        """
        if message.type != cls.TYPE:
            raise ValueError(
                f"type of MAVLink packet is {message.type}, expected {cls.TYPE}"
            )

        return cls.from_bytes(bytes(message.data[: message.len]))

    @property
    def has_high_esc_error_rate(self) -> bool:
        """Returns whether at least one of the ESCs on the the drone has a high
        error rate.
        """
        return bool(self.flags & DroneShowStatusFlag.HAS_HIGH_ESC_ERROR_RATE.value)

    @property
    def has_start_time(self) -> bool:
        """Returns whether there is a valid start time in the drone show status
        message.
        """
        return self.start_time >= 0

    @property
    def has_takeoff_authorization(self) -> bool:
        """Returns whether the takeoff authorization flag is set in the drone
        show status message.
        """
        return bool(self.flags & DroneShowStatusFlag.HAS_AUTHORIZATION_TO_START.value)

    @property
    def has_timesync_error(self) -> bool:
        """Returns whether there is probably a time synchronization problem
        as we are receiving invalid timestamps from the GPS.
        """
        return bool(self.flags & DroneShowStatusFlag.IS_GPS_TIME_BAD.value)

    @property
    def is_far_from_expected_position(self) -> bool:
        """Returns whether the drone seems to be far from its expected position,
        typically during a show.
        """
        return bool(
            self.flags & DroneShowStatusFlag.IS_FAR_FROM_EXPECTED_POSITION.value
        )

    @property
    def is_geofence_breached(self) -> bool:
        """Returns whether at least one of the geofences is breached, even if
        the drone is configured not to act on them (i.e. report only)."""
        return bool(self.flags & DroneShowStatusFlag.GEOFENCE_BREACHED.value)

    @property
    def is_misplaced_before_takeoff(self) -> bool:
        """Returns whether we are currently before the takeoff stage and the
        drone seems to be misplaced.
        """
        return bool(self.flags & DroneShowStatusFlag.IS_MISPLACED_BEFORE_TAKEOFF.value)

    @property
    def message(self) -> str:
        """Returns a short status message string that can be used for reporting
        the status of the drone show subsystem on the UI.
        """
        if self.stage.probably_airborne or self.elapsed_time >= -30:
            clock = format_elapsed_time(self.elapsed_time)
        else:
            clock = format_gps_time_of_week(self.start_time)
        message = self._format_message()
        return f"[{clock}] {message}"

    def _format_message(self) -> str:
        """Formats a status message from the execution stage and flags found in
        the drone show status packet.
        """
        # This function is called frequently, and Python enums are a bit slow
        # so we optimize enum access by using the 'value' property on them

        flags = self.flags
        stage = self.stage

        if not flags & DroneShowStatusFlag.HAS_SHOW_FILE.value:
            return "No show data"

        # If we are in a stage that implies that we are flying or we have an
        # error, then it is more important than any info we could get from the
        # flags so we show that
        if stage.probably_airborne or stage.has_error:
            return stage.description

        # Looks like we are on the ground, so show the info that we can gather
        # from the flags
        if not flags & DroneShowStatusFlag.HAS_ORIGIN.value:
            return "Origin not set"
        elif not flags & DroneShowStatusFlag.HAS_ORIENTATION.value:
            return "Orientation not set"
        elif flags & DroneShowStatusFlag.IS_MISPLACED_BEFORE_TAKEOFF.value:
            return "Not at takeoff position"
        elif (
            not flags & DroneShowStatusFlag.HAS_START_TIME.value
            and stage != DroneShowExecutionStage.LANDED
        ):
            if flags & DroneShowStatusFlag.HAS_AUTHORIZATION_TO_START.value:
                return "Authorized without start time"
            elif self.gps_fix < OurGPSFixType.FIX_3D.value:
                # This is needed here to explain why the start time might not
                # have been set yet; interpreting the SHOW_START_TIME parameter
                # needs GPS fix
                return "No 3D GPS fix yet"
            elif flags & DroneShowStatusFlag.IS_GPS_TIME_BAD.value:
                # If we get here, it means that we _do_ have 3D fix _but_ we
                # still don't have a GPS timestamp. This can happen only if the
                # GPS is not sending us the full timestamp; e.g., if it sends
                # the iTOW but not the GPS week number (as it is on Entron 300
                # drones)
                return "Invalid GPS timestamp"
            elif stage is DroneShowExecutionStage.OFF:
                # We are not even in show mode so the start time info is not relevant
                return ""
            else:
                return "Start time not set"
        elif (
            not flags & DroneShowStatusFlag.HAS_AUTHORIZATION_TO_START.value
            and stage != DroneShowExecutionStage.LANDED
        ):
            if stage is DroneShowExecutionStage.OFF:
                # We are not even in show mode so the lack of authorization is not relevant
                return ""
            else:
                return "Not authorized to start"
        elif not flags & DroneShowStatusFlag.HAS_GEOFENCE.value:
            return "Geofence not set"
        elif self.gps_fix < OurGPSFixType.FIX_3D:
            return "No 3D GPS fix yet"

        # We are on the ground but there's nothing important to report from the
        # flags so just show the description of the stage
        return stage.description


@dataclass
class DroneShowStatusExtension:
    """Dataclass representing the extended part of a Skybrush-specific drone
    show status object.

    This dataclass encapsulates information that can be extracted from the
    extended drone show status packet only.
    """

    lat: float
    """Latitude of the drone in degrees."""

    lng: float
    """Longitude of the drone in degrees."""

    alt: float
    """Altitude of the drone in meters above sea level."""

    relative_alt: float
    """Altitude of the drone relative to its home altitude, in meters."""

    vx: float
    """X (North) coordinate of the velocity of the drone, in m/s."""

    vy: float
    """Y (East) coordinate of the velocity of the drone, in m/s."""

    vz: float
    """Z (Down) coordinate of the velocity of the drone, in m/s."""

    heading: float
    """Heading of the drone."""

    h_acc: Optional[float] = None
    """Horizontal accuracy as reported by the GPS; ``None`` if not provided."""

    v_acc: Optional[float] = None
    """Vertical accuracy as reported by the GPS; ``None`` if not provided."""

    show_id: Optional[int] = None
    """Unique identifier of the show uploaded to the drone, if known."""

    trajectory_index: Optional[int] = None
    """Index of the trajectory that the drone is going to fly, if known."""

    _struct: ClassVar[Struct] = Struct("<iiiihhhHHHIH")
    """Structure of the packet."""

    @classmethod
    def from_bytes(cls, data: bytes):
        """Constructs a DroneShowStatusExtension_ object from the appropriate
        section of the raw body of a MAVLink DATA* packet.

        Typically you should not need to call this method directly; call
        `DroneShowStatus.from_bytes(...)` on the full packet payload instead.
        """
        (
            lat,
            lng,
            alt,
            relative_alt,
            vx,
            vy,
            vz,
            heading,
            h_acc,
            v_acc,
            show_id,
            trajectory_index,
        ) = cls._struct.unpack(data[: cls._struct.size])

        if abs(lat) <= 900000000:
            # latitude, longitude: convert to degrees
            lat /= 1e7
            lng /= 1e7
            # altitudes: [mm] --> [m]
            alt /= 1e3
            relative_alt /= 1e3
        else:
            # treat invalid latitudes as an indication of "no GPS fix"
            lat = lng = alt = relative_alt = 0.0

        # velocity: [cm/s] --> [m/s]
        vx /= 100.0
        vy /= 100.0
        vz /= 100.0

        # heading: [cdeg] --> [deg]
        heading = heading / 100.0 if abs(heading) <= 36000 else 0.0

        # GPS accuracy
        h_acc = h_acc / 1000.0 if h_acc > 0 else None
        v_acc = v_acc / 1000.0 if v_acc > 0 else None

        # TODO(ntamas): show ID, trajectory index
        return cls(lat, lng, alt, relative_alt, vx, vy, vz, heading, h_acc, v_acc)