Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-server / server / ext / rtk / preset.py
Size: Mime:
from dataclasses import dataclass, field
from urllib.parse import urlencode
from typing import Any, Callable, Iterable, Optional, Sequence, Type, TypeVar

from flockwave.channels.types import Encoder, Parser
from flockwave.gps.encoder import create_gps_encoder
from flockwave.gps.nmea import NMEAPacket
from flockwave.gps.parser import create_gps_parser
from flockwave.gps.rtcm import create_rtcm_encoder
from flockwave.gps.rtcm.packets import RTCMPacket, RTCMV2Packet, RTCMV3Packet
from flockwave.server.utils.serial import (
    describe_serial_port,
    describe_serial_port_configuration,
)

from .enums import RTKConfigurationPresetType
from .types import GPSPacket

__all__ = ("RTKConfigurationPreset",)

GPSPacketFilter = Callable[[GPSPacket], bool]
"""Type specification for a GPS packet filter function."""

ALLOWED_FORMATS: Sequence[str] = "auto rtcm2 rtcm3 ubx".split()
"""Allowed packet formats in RTK streams. "auto" attempts to parse RTCM3, UBX
and NMEA messages. "ubx" attempts to parse RTCM3 and UBX messages.
"rtcm3" is RTCM3-only, and "rtcm2" is RTCM2-only.

Do not use a set here; the order represents the order in which the options
should appear in the configuration UI.
"""


def describe_format(format: str) -> str:
    """Returns a human-readable description of a format from the
    ``ALLOWED_FORMATS`` set.
    """
    if format == "auto":
        return "Detect automatically"
    if format == "rtcm2":
        return "RTCM2"
    if format == "rtcm3":
        return "RTCM3 only"
    if format == "ubx":
        return "U-Blox and RTCM3"
    return format


C = TypeVar("C", bound="RTKConfigurationPreset")


@dataclass
class RTKConfigurationPreset:
    """Data class representing an RTK configuration preset consisting of one or
    more RTK data sources and an optional packet filter to be executed on
    every received packet.
    """

    id: str
    """The unique ID of the preset"""

    title: Optional[str] = None
    """A human-readable title of the preset"""

    type: RTKConfigurationPresetType = RTKConfigurationPresetType.BUILTIN
    """Type of the preset."""

    format: str = "auto"
    """Format of the GPS messages arriving in this configuration"""

    sources: list[str] = field(default_factory=list)
    """List of source connections where this preset collects messages from"""

    init: Optional[bytes] = None
    """Optional data to send on the connection before starting to read the
    RTCM messages. Can be used for source-specific initialization.
    """

    filter: Optional[GPSPacketFilter] = None
    """List of filters that the messages from the sources must pass through"""

    auto_survey: bool = False
    """Whether switching to this preset will automatically start a survey
    attempt on the remote device (if the device supports survey).
    """

    auto_select: bool = False
    """Whether this preset will automatically be selected after loading the
    extension if the user has not made an explicit selection yet.
    """

    @classmethod
    def from_json(
        cls,
        spec: dict[str, Any],
        *,
        id: Optional[str] = None,
        type: RTKConfigurationPresetType = RTKConfigurationPresetType.BUILTIN,
    ):
        """Creates an RTK configuration preset object from its JSON
        representation used in configuration files.

        Parameters:
            spec: the JSON specification in the configuration file
            id: the ID of the preset, used when the preset is registered in a
                registry. It is also used as a fallback when no title is
                specified for the preset. When ``None``, it means that the
                JSON specification is expected to provide an ID for the preset.
            type: the type of the preset. Typically BUILTIN for presets that are
                added in the configuration of the extension and USER for presets
                that are stored in a separate configuration file meant for
                presets defined by the user
        """
        if id is None:
            try:
                id = str(spec.pop("id"))
            except KeyError:
                raise RuntimeError("RTK configuration preset needs an ID") from None

        result = cls(
            id=id, title=str(spec["title"] if "title" in spec else id), type=type
        )
        return result.update_from(spec)

    def update_from(self: C, updates: dict[str, Any]) -> C:
        """Updates the preset from a JSON specification in the configuration
        file. The specification can be partial; fields not listed explicitly in
        the specification are left at their current values.

        Args:
            updates: the JSON specification of the updates to apply on the
                preset

        Returns:
            the preset itself for easy method chaining
        """
        if "title" in updates:
            self.title = str(updates["title"])

        if "format" in updates:
            format = str(updates["format"])
            if format not in ALLOWED_FORMATS:
                raise ValueError(f"Invalid RTK packet format: {format!r}")
            self.format = format

        sources: Optional[list[Any]]
        if "sources" in updates:
            sources = updates["sources"]
        elif "source" in updates:
            # source is an alias to sources
            sources = updates["source"]
        else:
            sources = None

        if sources is not None:
            if not isinstance(sources, list):
                sources = [sources]

            self.remove_all_sources()
            for source in sources:
                self.add_source(source)

        if "init" in updates:
            init = updates["init"]
            self.init = init if isinstance(init, bytes) else str(init).encode("utf-8")

        if "filter" in updates:
            self.filter = create_filter_function(**updates["filter"])

        if "auto_survey" in updates:
            self.auto_survey = bool(updates["auto_survey"])

        if "auto_select" in updates:
            self.auto_select = bool(updates["auto_select"])

        return self

    @classmethod
    def from_serial_port(
        cls,
        port,
        configuration: dict[str, Any],
        *,
        id: str,
        use_configuration_in_title: bool = True,
    ):
        """Creates an RTK configuration preset object from a serial port
        descriptor and a configuration dictionary for the serial port with
        things like baud rate and the number of stop bits.

        Parameters:
            port: the serial port descriptor from the `list_serial_ports()`
                method
            configuration: dictionary providing additional key-value pairs
                that will be passed on to the constructor of a
                SerialPortConnection when the port is opened
            id: the ID of the preset
            use_configuration_in_title: whether to include information
                gathered from the configuration in the title of the newly
                created preset
        """
        label = describe_serial_port(port)
        spec = (
            describe_serial_port_configuration(configuration, only=("baud", "stopbits"))
            if use_configuration_in_title
            else None
        )
        title = f"{label} ({spec})" if spec else label

        result = cls(id=id, title=title, type=RTKConfigurationPresetType.DYNAMIC)
        result.format = "auto"
        result.auto_survey = True

        source = f"serial:{port.device}"
        if configuration:
            args = urlencode(configuration)
            source = f"{source}?{args}"

        result.add_source(source)

        return result

    def add_source(self, source: str) -> None:
        """Adds a new RTK data source to this preset.

        Parameters:
            source: the RTK data source; anything that is accepted by
                ``create_connection()``.
        """
        self.sources.append(source)

    def accepts(self, packet: GPSPacket) -> bool:
        """Returns whether the given GPS packet would be accepted by the filters
        specified in this preset.
        """
        if isinstance(packet, (RTCMV2Packet, RTCMV3Packet)):
            return self.filter is None or self.filter(packet)
        else:
            return False

    def create_gps_parser(self) -> Parser[bytes, GPSPacket]:
        """Creates a GPS message parser for this preset."""
        if self.format == "auto":
            formats = ["rtcm3", "ubx", "nmea"]
        elif self.format == "ubx":
            formats = ["rtcm3", "ubx"]
        elif self.format == "rtcm3":
            formats = ["rtcm3"]
        elif self.format == "rtcm2":
            formats = ["rtcm2"]
        else:
            raise ValueError(f"unknown format: {self.format}")
        return create_gps_parser(formats)

    def create_nmea_encoder(self) -> Encoder[NMEAPacket, bytes]:
        """Creates an NMEA message encoder for this preset, used for injecting
        NMEA GGA messages into NTRIP streams of VRS (Virtual Reference Station)
        networks.
        """
        return create_gps_encoder("nmea")

    def create_rtcm_encoder(self) -> Encoder[RTCMPacket, bytes]:
        """Creates an RTCM message encoder for this preset."""
        return create_rtcm_encoder("rtcm2" if self.format == "rtcm2" else "rtcm3")

    @property
    def dynamic(self) -> bool:
        """Returns whether the preset is dynamic. For backward compatibility
        purposes only.
        """
        return self.type is RTKConfigurationPresetType.DYNAMIC

    @property
    def json(self) -> Any:
        """Returns a JSON object representing this preset, in a format suitable
        for an RTK-INF message. Not all the fields are included, only the ones
        that are mandated by the RTK-INF message specification.
        """
        return {
            "id": self.id,
            "title": self.title,
            "type": self.type.value,
            "format": self.format,
            "sources": self.sources,
        }

    def remove_all_sources(self) -> None:
        """Removes all sources from the RTH preset."""
        self.sources.clear()


def _is_rtcm_packet(packet: GPSPacket) -> bool:
    return isinstance(packet, (RTCMV2Packet, RTCMV3Packet))


def _process_rtcm_packet_id_list(
    id_list: Optional[Iterable[str]],
) -> Optional[dict[Type[RTCMPacket], set[int]]]:
    if id_list is None:
        return None

    result: dict[Type[RTCMPacket], set[int]] = {
        RTCMV2Packet: set(),
        RTCMV3Packet: set(),
    }
    for spec in id_list:
        if spec.startswith("rtcm2/"):
            result[RTCMV2Packet].add(int(spec[6:]))
        elif spec.startswith("rtcm3/"):
            result[RTCMV3Packet].add(int(spec[6:]))

    return result


def create_filter_function(
    accept: Optional[Iterable[str]] = None, reject: Optional[Iterable[str]] = None
) -> Callable[[GPSPacket], bool]:
    """Creates a filtering function that takes GPS packets and returns whether
    the filter would accept the packet, based on a list of acceptable RTCM
    packet identifiers and a list of rejected RTCM packet identifiers.

    Non-RTCM packets are always rejected at the moment.

    Each RTCM packet identifier consists of a prefix (``rtcm2/`` or ``rtcm3/``)
    and a numeric RTCM packet ID (e.g., ``rtcm3/1020`` identifies RTCMv3 packets
    of type 1020).

    Rejections are processed first, followed by the "accept" directives. A
    missing "accept" argument means that all packets are accepted (except the
    ones rejected explicitly). A missing "reject" argument also means that
    all packets are accepted by default.

    Parameters:
        accept: the list of RTCM packets to accept
        reject: the list of RTCM packets to reject

    Returns:
        an appropriate filter function
    """

    if accept is None and reject is None:
        return _is_rtcm_packet

    accept_by_class = _process_rtcm_packet_id_list(accept)
    reject_by_class = _process_rtcm_packet_id_list(reject)

    def filter(packet: GPSPacket) -> bool:
        if isinstance(packet, RTCMV2Packet):
            cls = RTCMV2Packet
        elif isinstance(packet, RTCMV3Packet):
            cls = RTCMV3Packet
        else:
            return False

        if reject_by_class and packet.packet_type in reject_by_class[cls]:
            return False

        if accept_by_class and packet.packet_type not in accept_by_class[cls]:
            return False

        return True

    return filter