Repository URL to install this package:
|
Version:
2.40.0 ▾
|
from dataclasses import dataclass, field
from urllib.parse import urlencode
from typing import Any, Callable, Iterable, Optional, Sequence, Type, TypeVar
from flockwave.channels.types import Encoder, Parser
from flockwave.gps.encoder import create_gps_encoder
from flockwave.gps.nmea import NMEAPacket
from flockwave.gps.parser import create_gps_parser
from flockwave.gps.rtcm import create_rtcm_encoder
from flockwave.gps.rtcm.packets import RTCMPacket, RTCMV2Packet, RTCMV3Packet
from flockwave.server.utils.serial import (
describe_serial_port,
describe_serial_port_configuration,
)
from .enums import RTKConfigurationPresetType
from .types import GPSPacket
__all__ = ("RTKConfigurationPreset",)
GPSPacketFilter = Callable[[GPSPacket], bool]
"""Type specification for a GPS packet filter function."""
ALLOWED_FORMATS: Sequence[str] = "auto rtcm2 rtcm3 ubx".split()
"""Allowed packet formats in RTK streams. "auto" attempts to parse RTCM3, UBX
and NMEA messages. "ubx" attempts to parse RTCM3 and UBX messages.
"rtcm3" is RTCM3-only, and "rtcm2" is RTCM2-only.
Do not use a set here; the order represents the order in which the options
should appear in the configuration UI.
"""
def describe_format(format: str) -> str:
"""Returns a human-readable description of a format from the
``ALLOWED_FORMATS`` set.
"""
if format == "auto":
return "Detect automatically"
if format == "rtcm2":
return "RTCM2"
if format == "rtcm3":
return "RTCM3 only"
if format == "ubx":
return "U-Blox and RTCM3"
return format
C = TypeVar("C", bound="RTKConfigurationPreset")
@dataclass
class RTKConfigurationPreset:
"""Data class representing an RTK configuration preset consisting of one or
more RTK data sources and an optional packet filter to be executed on
every received packet.
"""
id: str
"""The unique ID of the preset"""
title: Optional[str] = None
"""A human-readable title of the preset"""
type: RTKConfigurationPresetType = RTKConfigurationPresetType.BUILTIN
"""Type of the preset."""
format: str = "auto"
"""Format of the GPS messages arriving in this configuration"""
sources: list[str] = field(default_factory=list)
"""List of source connections where this preset collects messages from"""
init: Optional[bytes] = None
"""Optional data to send on the connection before starting to read the
RTCM messages. Can be used for source-specific initialization.
"""
filter: Optional[GPSPacketFilter] = None
"""List of filters that the messages from the sources must pass through"""
auto_survey: bool = False
"""Whether switching to this preset will automatically start a survey
attempt on the remote device (if the device supports survey).
"""
auto_select: bool = False
"""Whether this preset will automatically be selected after loading the
extension if the user has not made an explicit selection yet.
"""
@classmethod
def from_json(
cls,
spec: dict[str, Any],
*,
id: Optional[str] = None,
type: RTKConfigurationPresetType = RTKConfigurationPresetType.BUILTIN,
):
"""Creates an RTK configuration preset object from its JSON
representation used in configuration files.
Parameters:
spec: the JSON specification in the configuration file
id: the ID of the preset, used when the preset is registered in a
registry. It is also used as a fallback when no title is
specified for the preset. When ``None``, it means that the
JSON specification is expected to provide an ID for the preset.
type: the type of the preset. Typically BUILTIN for presets that are
added in the configuration of the extension and USER for presets
that are stored in a separate configuration file meant for
presets defined by the user
"""
if id is None:
try:
id = str(spec.pop("id"))
except KeyError:
raise RuntimeError("RTK configuration preset needs an ID") from None
result = cls(
id=id, title=str(spec["title"] if "title" in spec else id), type=type
)
return result.update_from(spec)
def update_from(self: C, updates: dict[str, Any]) -> C:
"""Updates the preset from a JSON specification in the configuration
file. The specification can be partial; fields not listed explicitly in
the specification are left at their current values.
Args:
updates: the JSON specification of the updates to apply on the
preset
Returns:
the preset itself for easy method chaining
"""
if "title" in updates:
self.title = str(updates["title"])
if "format" in updates:
format = str(updates["format"])
if format not in ALLOWED_FORMATS:
raise ValueError(f"Invalid RTK packet format: {format!r}")
self.format = format
sources: Optional[list[Any]]
if "sources" in updates:
sources = updates["sources"]
elif "source" in updates:
# source is an alias to sources
sources = updates["source"]
else:
sources = None
if sources is not None:
if not isinstance(sources, list):
sources = [sources]
self.remove_all_sources()
for source in sources:
self.add_source(source)
if "init" in updates:
init = updates["init"]
self.init = init if isinstance(init, bytes) else str(init).encode("utf-8")
if "filter" in updates:
self.filter = create_filter_function(**updates["filter"])
if "auto_survey" in updates:
self.auto_survey = bool(updates["auto_survey"])
if "auto_select" in updates:
self.auto_select = bool(updates["auto_select"])
return self
@classmethod
def from_serial_port(
cls,
port,
configuration: dict[str, Any],
*,
id: str,
use_configuration_in_title: bool = True,
):
"""Creates an RTK configuration preset object from a serial port
descriptor and a configuration dictionary for the serial port with
things like baud rate and the number of stop bits.
Parameters:
port: the serial port descriptor from the `list_serial_ports()`
method
configuration: dictionary providing additional key-value pairs
that will be passed on to the constructor of a
SerialPortConnection when the port is opened
id: the ID of the preset
use_configuration_in_title: whether to include information
gathered from the configuration in the title of the newly
created preset
"""
label = describe_serial_port(port)
spec = (
describe_serial_port_configuration(configuration, only=("baud", "stopbits"))
if use_configuration_in_title
else None
)
title = f"{label} ({spec})" if spec else label
result = cls(id=id, title=title, type=RTKConfigurationPresetType.DYNAMIC)
result.format = "auto"
result.auto_survey = True
source = f"serial:{port.device}"
if configuration:
args = urlencode(configuration)
source = f"{source}?{args}"
result.add_source(source)
return result
def add_source(self, source: str) -> None:
"""Adds a new RTK data source to this preset.
Parameters:
source: the RTK data source; anything that is accepted by
``create_connection()``.
"""
self.sources.append(source)
def accepts(self, packet: GPSPacket) -> bool:
"""Returns whether the given GPS packet would be accepted by the filters
specified in this preset.
"""
if isinstance(packet, (RTCMV2Packet, RTCMV3Packet)):
return self.filter is None or self.filter(packet)
else:
return False
def create_gps_parser(self) -> Parser[bytes, GPSPacket]:
"""Creates a GPS message parser for this preset."""
if self.format == "auto":
formats = ["rtcm3", "ubx", "nmea"]
elif self.format == "ubx":
formats = ["rtcm3", "ubx"]
elif self.format == "rtcm3":
formats = ["rtcm3"]
elif self.format == "rtcm2":
formats = ["rtcm2"]
else:
raise ValueError(f"unknown format: {self.format}")
return create_gps_parser(formats)
def create_nmea_encoder(self) -> Encoder[NMEAPacket, bytes]:
"""Creates an NMEA message encoder for this preset, used for injecting
NMEA GGA messages into NTRIP streams of VRS (Virtual Reference Station)
networks.
"""
return create_gps_encoder("nmea")
def create_rtcm_encoder(self) -> Encoder[RTCMPacket, bytes]:
"""Creates an RTCM message encoder for this preset."""
return create_rtcm_encoder("rtcm2" if self.format == "rtcm2" else "rtcm3")
@property
def dynamic(self) -> bool:
"""Returns whether the preset is dynamic. For backward compatibility
purposes only.
"""
return self.type is RTKConfigurationPresetType.DYNAMIC
@property
def json(self) -> Any:
"""Returns a JSON object representing this preset, in a format suitable
for an RTK-INF message. Not all the fields are included, only the ones
that are mandated by the RTK-INF message specification.
"""
return {
"id": self.id,
"title": self.title,
"type": self.type.value,
"format": self.format,
"sources": self.sources,
}
def remove_all_sources(self) -> None:
"""Removes all sources from the RTH preset."""
self.sources.clear()
def _is_rtcm_packet(packet: GPSPacket) -> bool:
return isinstance(packet, (RTCMV2Packet, RTCMV3Packet))
def _process_rtcm_packet_id_list(
id_list: Optional[Iterable[str]],
) -> Optional[dict[Type[RTCMPacket], set[int]]]:
if id_list is None:
return None
result: dict[Type[RTCMPacket], set[int]] = {
RTCMV2Packet: set(),
RTCMV3Packet: set(),
}
for spec in id_list:
if spec.startswith("rtcm2/"):
result[RTCMV2Packet].add(int(spec[6:]))
elif spec.startswith("rtcm3/"):
result[RTCMV3Packet].add(int(spec[6:]))
return result
def create_filter_function(
accept: Optional[Iterable[str]] = None, reject: Optional[Iterable[str]] = None
) -> Callable[[GPSPacket], bool]:
"""Creates a filtering function that takes GPS packets and returns whether
the filter would accept the packet, based on a list of acceptable RTCM
packet identifiers and a list of rejected RTCM packet identifiers.
Non-RTCM packets are always rejected at the moment.
Each RTCM packet identifier consists of a prefix (``rtcm2/`` or ``rtcm3/``)
and a numeric RTCM packet ID (e.g., ``rtcm3/1020`` identifies RTCMv3 packets
of type 1020).
Rejections are processed first, followed by the "accept" directives. A
missing "accept" argument means that all packets are accepted (except the
ones rejected explicitly). A missing "reject" argument also means that
all packets are accepted by default.
Parameters:
accept: the list of RTCM packets to accept
reject: the list of RTCM packets to reject
Returns:
an appropriate filter function
"""
if accept is None and reject is None:
return _is_rtcm_packet
accept_by_class = _process_rtcm_packet_id_list(accept)
reject_by_class = _process_rtcm_packet_id_list(reject)
def filter(packet: GPSPacket) -> bool:
if isinstance(packet, RTCMV2Packet):
cls = RTCMV2Packet
elif isinstance(packet, RTCMV3Packet):
cls = RTCMV3Packet
else:
return False
if reject_by_class and packet.packet_type in reject_by_class[cls]:
return False
if accept_by_class and packet.packet_type not in accept_by_class[cls]:
return False
return True
return filter