Repository URL to install this package:
|
Version:
2.39.1 ▾
|
from abc import ABC, abstractmethod
from blinker import Signal
from contextlib import asynccontextmanager
from enum import Enum
from logging import Logger
from trio import (
BrokenResourceError,
Event,
current_time,
move_on_after,
open_nursery,
sleep,
)
from typing import AsyncIterator, Generic, Optional, TypeVar
__all__ = ("LEDLightConfigurationManagerBase",)
#: Type variable representing a packet type that the LED light configuration
#: manager will generate and send
TPacket = TypeVar("TPacket")
#: Type alias for an RGB color
RGBColor = tuple[int, int, int]
class LightEffectType(Enum):
"""Enumeration holding the type of light effects that could be configured
on the drones.
"""
OFF = "off"
"""GCS is not controlling the LED lights on the drones"""
SOLID = "solid"
"""GCS is asking the drones to use a solid LED light"""
class LightConfiguration:
"""LED light related configuration object for the drone show extension."""
updated = Signal(doc="Signal emitted when the configuration is updated")
color: RGBColor
effect: LightEffectType
@classmethod
def create_solid_color(cls, color: RGBColor) -> "LightConfiguration":
result = cls()
result.color = tuple(color) # type: ignore
result.effect = LightEffectType.SOLID
return result
@classmethod
def turn_off(cls) -> "LightConfiguration":
return cls()
def __init__(self):
"""Constructor."""
self.color = (0, 0, 0)
self.effect = LightEffectType.OFF
def clone(self) -> "LightConfiguration":
"""Makes an exact shallow copy of the configuration object."""
result = self.__class__()
result.update_from_json(self.json)
return result
@property
def json(self):
"""Returns the JSON representation of the configuration object."""
return {"color": list(self.color), "effect": str(self.effect.value)}
def update_from_json(self, obj):
"""Updates the configuration object from its JSON representation."""
changed = False
color = obj.get("color")
if color:
if (
isinstance(color, (list, tuple))
and len(color) >= 3
and all(isinstance(x, (int, float)) for x in color)
):
self.color = tuple(int(x) for x in color) # type: ignore
# Send a signal even if the color stayed the same; maybe the
# user sent the same configuration again because some of the
# drones in the show haven't received the previous request
changed = True
effect = obj.get("effect")
if effect:
# Send a signal even if the effect stayed the same; maybe the
# user sent the same configuration again because some of the
# drones in the show haven't received the previous request
self.effect = LightEffectType(effect)
changed = True
if changed:
self.updated.send(self)
class LEDLightConfigurationManagerBase(Generic[TPacket], ABC):
"""Base class for objects that manage the state of the LED lights on a set
of drones when the lights are controlled by commands from the GCS.
The configuration manager may exist in one of two modes: normal or rapid.
Normal mode is the default. Rapid mode is entered when the light
configuration changes; during rapid mode, the messages that instruct the
drones to change their LED colors are fired more frequently than normal to
ensure that all drones get the changes relatively quickly.
Attributes:
message_interval: number of seconds between consecutive messages sent
from the LED light configuration manager if the manager is not in
rapid mode
message_interval_in_rapid_mode: number of seconds between consecutive
messages sent from the LED light configuration manager if the manager
is in rapid mode
rapid_mode_duration: total duration of the rapid mode of the light
configuration manager. The default value is 5 seconds.
"""
_active: bool
_config: Optional[LightConfiguration]
_config_last_updated_at: float
_rapid_mode: bool
_rapid_mode_triggered: Event
_suppress_warnings_until: float
message_interval: float
message_interval_in_rapid_mode: float
rapid_mode_duration: float
def __init__(self):
"""Constructor."""
self.message_interval = 3
self.message_interval_in_rapid_mode = 0.2
self.rapid_mode_duration = 5
self._active = False
self._config = None
self._config_last_updated_at = 0
self._rapid_mode = False
self._rapid_mode_triggered = Event()
self._suppress_warnings_until = 0
def notify_config_changed(self, config: LightConfiguration) -> None:
"""Notifies the manager that the LED light configuration has changed.
This function has to be connected to the `show:lights_updated` signal
of the show extension.
"""
# Store the configuration
self._config = config
self._config_last_updated_at = current_time()
# Note that we need to dispatch messages actively if the mode is not
# "off"
self._active = (
self._config is not None and self._config.effect != LightEffectType.OFF
)
# Trigger rapid mode for the next five seconds so we dispatch commands
# more frequently to ensure that all the drones get it
self._rapid_mode = True
self._rapid_mode_triggered.set()
async def run(self) -> None:
"""Background task that regularly broadcasts messages about the current
status of the LED configuration of the UAVs.
"""
log = self._get_logger()
while True:
try:
await self._run(log)
except Exception:
if log:
log.exception(
"LED light control task stopped unexpectedly, restarting..."
)
await sleep(0.5)
async def _run(self, log: Optional[Logger]) -> None:
while True:
# Note that we might need to send a packet even if we are inactive
# to ensure that the drones are informed when the GCS stops sending
# further LED control commands and switches to "off" mode
packet = (
self._create_light_control_packet(self._config)
if self._config is not None
else None
)
if packet is not None:
try:
await self._send_light_control_packet(packet)
except BrokenResourceError:
# Outbound message queue not open yet
pass
except RuntimeError:
self._send_warning(log, "Failed to broadcast light control packet")
# If the config was updated recently, fire updates in rapid
# succession to ensure that all the drones get them as soon as
# possible. If not, but the current light configuration means that
# we need to control the color from the GCS, wait for at most three
# seconds before sending the next update to the drones. If the
# current light configuration means that we are _not_ controlling
# the lights on the drones, we simply wait until the next
# configuration change
if self._rapid_mode:
await sleep(self.message_interval_in_rapid_mode)
elif self._active:
with move_on_after(self.message_interval):
await self._rapid_mode_triggered.wait()
else:
await self._rapid_mode_triggered.wait()
# Fall back to normal mode a few seconds after the last configuration
# change
if (
self._rapid_mode
and current_time() - self._config_last_updated_at
>= self.rapid_mode_duration
):
self._rapid_mode = False
self._rapid_mode_triggered = Event()
@asynccontextmanager
async def use(self) -> AsyncIterator[None]:
"""Context manager that runs the tasks related to the show manager while
the exeecution is in the context.
"""
async with open_nursery() as nursery:
nursery.start_soon(self.run)
try:
yield
finally:
nursery.cancel_scope.cancel()
@abstractmethod
def _create_light_control_packet(
self, config: LightConfiguration
) -> Optional[TPacket]:
"""Creates a light control packet that must be sent to the group of
drones managed by this extension, assuming the given light
configuration on the GCS.
Returns:
the packet to send to the drones, or `None` if no packet has to be
sent
"""
raise NotImplementedError
def _get_logger(self) -> Optional[Logger]:
"""Returns the logger that the manager can use for logging warning
messages, or `None` if the manager should not use a logger.
The default implementation returns `None`, unconditionally.
"""
return None
@abstractmethod
async def _send_light_control_packet(self, packet: TPacket) -> None:
"""Sends a light control packet to the group of drones that this manager
object is managing.
"""
raise NotImplementedError
def _send_warning(self, log: Optional[Logger], message: str, *args, **kwds) -> None:
"""Prints a warning to the log and suppresses further warnings for the
next five seconds if needed.
"""
now = current_time()
if now < self._suppress_warnings_until:
return
self._suppress_warnings_until = now + 5
if log:
log.warning(message, *args, **kwds)