Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-server / server / ext / crazyflie / led_lights.py
Size: Mime:
from __future__ import annotations

from struct import Struct
from typing import Optional

from flockwave.server.tasks.led_lights import (
    LEDLightConfigurationManagerBase,
    LightConfiguration,
    LightEffectType,
)

from flockwave.server.ext.crazyflie.crtp_extensions import (
    DRONE_SHOW_PORT,
    DroneShowCommand,
)

from .connection import BroadcasterFunction

__all__ = ("CrazyflieLEDLightConfigurationManager",)


_light_control_packet_struct = Struct("<BBBBB")


class CrazyflieLEDLightConfigurationManager(LEDLightConfigurationManagerBase[bytes]):
    """Class that manages the state of the LED lights on the drones when they
    are controlled by commands from the GCS.
    """

    _broadcaster: BroadcasterFunction

    def __init__(self, broadcaster: BroadcasterFunction):
        """Constructor."""
        super().__init__()
        self._broadcaster = broadcaster

    def _create_light_control_packet(
        self, config: LightConfiguration
    ) -> Optional[bytes]:
        """Creates a CRTP message payload for the message that we need to
        send to all the drones in order to instruct them to do the current light
        effect.
        """
        return _light_control_packet_struct.pack(
            DroneShowCommand.TRIGGER_GCS_LIGHT_EFFECT,
            1 if config.effect == LightEffectType.SOLID else 0,
            config.color[0],
            config.color[1],
            config.color[2],
        )

    async def _send_light_control_packet(self, packet: bytes) -> None:
        self._broadcaster(DRONE_SHOW_PORT, 0, packet)