Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-server / server / ext / rtk / beacon_manager.py
Size: Mime:
from contextlib import ExitStack, contextmanager
from functools import partial
from itertools import count
from typing import Callable, ClassVar, Iterable, Iterator, TYPE_CHECKING, Optional

from flockwave.concurrency import Watchdog
from flockwave.gps.rtcm.packets import RTCMPacket, RTCMV3StationaryAntennaPacket
from flockwave.gps.rtcm.parsers import create_rtcm_parser
from flockwave.gps.vectors import ECEFToGPSCoordinateTransformation

from flockwave.server.ext.signals import SignalsExtensionAPI
from flockwave.server.registries.errors import RegistryFull
from flockwave.server.utils.generic import overridden

if TYPE_CHECKING:
    from flockwave.server.ext.beacon.model import Beacon
    from flockwave.server.ext.rtk.extension import RTKExtension

__all__ = ("RTKBeaconManager",)


class RTKBeaconManager:
    """Class that manages the status of the beacon that represents the position
    of the RTK base station in the server.
    """

    BEACON_ID_TEMPLATE: ClassVar[str] = "rtk::base_{}"

    enabled: bool = False
    timeout: float = 15

    _counter: Iterator[int]
    _parser: Callable[[bytes], Iterable[RTCMPacket]]
    _trans: ECEFToGPSCoordinateTransformation = ECEFToGPSCoordinateTransformation()
    _watchdog: Optional[Watchdog] = None

    def __init__(self) -> None:
        self._counter = count()

    @contextmanager
    def use(self, ext: "RTKExtension", nursery) -> Iterator[None]:
        if not self.enabled:
            return

        app = ext.app
        if app is None:
            return

        beacon_api = app.import_api("beacon")
        signal_api = app.import_api("signals", SignalsExtensionAPI)
        signal = ext.RTK_PACKET_SIGNAL

        self._parser = create_rtcm_parser()

        with ExitStack() as stack:
            # When switching RTK presets, it may happen that the new beacon is
            # created _before_ the previous beacon is removed so we need to use
            # a counter to ensure that the IDs are unique
            try:
                beacon = stack.enter_context(
                    beacon_api.use(self.BEACON_ID_TEMPLATE.format(next(self._counter)))
                )
            except RegistryFull:
                # This may happen if the user reaches the license limits
                app.handle_registry_full_error(self, "RTK base station beacon")
                beacon = None

            if beacon:
                beacon.basic_properties.name = "RTK base"

                stack.enter_context(
                    signal_api.use(
                        {signal: partial(self._on_rtk_packet_received, beacon=beacon)}
                    )
                )

                watchdog = stack.enter_context(
                    Watchdog(
                        self.timeout, partial(self._on_beacon_timed_out, beacon)
                    ).use_soon(nursery)
                )

                stack.enter_context(overridden(self, _watchdog=watchdog))

            yield

    def _on_beacon_timed_out(self, beacon: "Beacon") -> None:
        """Handler called by the watchdog when the beacon did not receive an
        updated position in time.
        """
        beacon.update_status(active=False)

    def _on_rtk_packet_received(self, sender, packet, beacon: "Beacon") -> None:
        """Handler called for each incoming RTK packet if this extension is
        interested in it. The packet is used to update the position of the
        RTK beacon.
        """
        encoded = packet
        try:
            for packet in self._parser(encoded):
                if isinstance(packet, RTCMV3StationaryAntennaPacket):  # type: ignore
                    beacon.update_status(
                        position=self._trans.to_gps(packet.position),  # type: ignore
                        active=True,
                    )
                    if self._watchdog:
                        self._watchdog.notify()
        except Exception:
            # Parse error, ignore
            pass