Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-gps / src / flockwave / gps / ubx / rtk_config.py
Size: Mime:
"""Asynchronous task that configures a U-blox GPS receiver as an RTK base
station (if the GPS is capable enough).
"""

from struct import pack
from typing import Awaitable, Callable

from flockwave.gps.enums import GNSSType
from flockwave.gps.rtk import RTKBaseConfigurator, RTKMessageSet, RTKSurveySettings

from .encoder import create_ubx_encoder
from .enums import UBXClass, UBXNAVSubclass
from .packet import UBX

__all__ = ("UBXRTKBaseConfigurator",)


class UBXRTKBaseConfigurator(RTKBaseConfigurator):
    """Class that knows how to configure a U-blox GPS receiver as an RTK
    base station, optionally with given survey-in duration and accuracy.
    """

    async def run(
        self,
        write: Callable[[bytes], Awaitable[None]],
        sleep: Callable[[float], Awaitable[None]],
    ):
        """Asynchronous task that configures a U-blox GPS receiver as an RTK base
        station (if the GPS is capable enough).

        Parameters:
            write: a writer function that can be used to send commands to the GPS
                receiver
            sleep: a function that can be called to sleep a bit without blocking
                other tasks. Takes the number of seconds to sleep.
        """
        encoder = create_ubx_encoder()

        async def send(message, delay=0.2):
            await write(encoder(message))
            await sleep(delay)

        async def set_message_rate(class_id, subclass_id, rate):
            # Set message rates for ports 1 and 3; all other ports in the range
            # 0-5 are silenced
            payload = bytes([int(class_id), subclass_id, 0, rate, 0, rate, 0, 0])
            await send(UBX.CFG_MSG(payload), delay=0.01)
            # TODO(ntamas): we should wait for confirmation and re-send if needed;
            # this might be useful on radio links

        # Configure measurement rate to 1 Hz (0x3e8 msec), one solution for one
        # measurement, align measurements to GPS time
        await send(UBX.CFG_RATE(b"\xe8\x03\x01\x00\x01\x00"))

        # Configure navigation engine to stationary
        await send(
            UBX.CFG_NAV5(
                bytes(
                    [
                        # Parameter mask: apply all parameters
                        0xFF,
                        0xFF,
                        # Dynamic platform model: stationary
                        0x02,
                        # Position fixing mode: 3D
                        0x03,
                        # Fixed altitude for 2D mode, ignored
                        0x00,
                        0x00,
                        0x00,
                        0x00,
                        # Fixed altitude variance for 2D mode, ignored
                        0x10,
                        0x27,
                        0x00,
                        0x00,
                        # Minimum elevation of a satellite to be used: 5 degrees
                        0x05,
                        # Reserved
                        0x00,
                        # Position DOP mask: 25 (250 * 0.1)
                        0xFA,
                        0x00,
                        # Time DOP mask: 25 (250 * 01)
                        0xFA,
                        0x00,
                        # Position accuracy mask: 100m
                        0x64,
                        0x00,
                        # Time accuracy mask: 300m
                        0x2C,
                        0x01,
                        # Static hold threshold [cm/s]
                        0x00,
                        # DGNSS timeout [s]
                        0x00,
                        # Number of satellites required to be above the carrier-to-noise
                        # ratio threshold (see next byte) in order to attempt a fix
                        0x00,
                        # Carrier-to-noise ratio threshold for satellites
                        0x00,
                        # Reserved
                        0x10,
                        0x27,
                        # Static hold threshold
                        0x00,
                        0x00,
                        # UTC standard to be used; receiver selects based on GNSS configuration
                        0x00,
                        # Reserved
                        0x00,
                        0x00,
                        0x00,
                        0x00,
                        0x00,
                    ]
                )
            )
        )

        # Silence all NMEA messages except 0x0B, 0x0C and 0x0E
        for i in range(16):
            if i not in (0x0B, 0x0C, 0x0E):
                await set_message_rate(UBXClass.NMEA, i, 0)

        # Request receiver version
        await send(UBX.MON_VER(b""))

        # Request UTC time information so we can warn the user if the clock of
        # the computer is not in sync with GPS time
        await set_message_rate(UBXClass.NAV, UBXNAVSubclass.TIMEUTC, 5)

        # Request survey-in data so we can provide feedback to the user about the
        # survey-in procedure
        await set_message_rate(UBXClass.NAV, UBXNAVSubclass.SVIN, 1)

        # Request RTCM3 antenna position (1005) messages every 5 seconds
        await set_message_rate(UBXClass.RTCM3, 5, 5)

        # High-precision MSM messages are supported on uBlox M8P from revision
        # 130 and on uBlox F9P. We should do autodetection here.
        # TODO(ntamas): find out how to get the firmware revision.
        high_precision_supported = True
        use_high_precision = (
            high_precision_supported and self.settings.message_set is RTKMessageSet.MSM7
        )

        # GPS MSM message
        enabled = self.settings.uses_gnss(GNSSType.GPS)
        await set_message_rate(
            UBXClass.RTCM3, 74, 0 if use_high_precision or not enabled else 1
        )
        await set_message_rate(
            UBXClass.RTCM3, 77, 1 if use_high_precision and enabled else 0
        )

        # GLONASS MSM message
        enabled = self.settings.uses_gnss(GNSSType.GLONASS)
        await set_message_rate(
            UBXClass.RTCM3, 84, 0 if use_high_precision or not enabled else 1
        )
        await set_message_rate(
            UBXClass.RTCM3, 87, 1 if use_high_precision and enabled else 0
        )
        await set_message_rate(UBXClass.RTCM3, 230, 5 if enabled else 0)

        # Galileo MSM message
        enabled = self.settings.uses_gnss(GNSSType.GALILEO)
        await set_message_rate(
            UBXClass.RTCM3, 94, 0 if use_high_precision or not enabled else 1
        )
        await set_message_rate(
            UBXClass.RTCM3, 97, 1 if use_high_precision and enabled else 0
        )

        # BeiDou MSM message
        enabled = self.settings.uses_gnss(GNSSType.BEIDOU)
        await set_message_rate(
            UBXClass.RTCM3, 124, 0 if use_high_precision or not enabled else 1
        )
        await set_message_rate(
            UBXClass.RTCM3, 127, 1 if use_high_precision and enabled else 0
        )

        # No moving baseline messages
        await set_message_rate(UBXClass.RTCM3, 254, 0)

        # Turn off other unneeded UBX messages
        await set_message_rate(UBXClass.NAV, UBXNAVSubclass.PVT, 0)
        await set_message_rate(UBXClass.NAV, UBXNAVSubclass.VELNED, 0)
        await set_message_rate(UBXClass.RXM, 0x13, 0)
        await set_message_rate(UBXClass.RXM, 0x15, 0)
        await set_message_rate(UBXClass.MON, 0x09, 0)

        # Set survey-in duration and accuracy requirements
        await sleep(0.2)

        # CFG-TMODE3 parameter payload assembly
        coords: tuple[int, int, int]
        coords_hp: tuple[int, int, int]
        position = self.settings.position
        if position is not None:
            position_in_one_tenth_of_mm = position * 10000  # [m] --> [0.1 mm]
            coords_in_one_tenth_of_mm = (
                int(round(position_in_one_tenth_of_mm.x)),
                int(round(position_in_one_tenth_of_mm.y)),
                int(round(position_in_one_tenth_of_mm.z)),
            )
            coords, coords_hp = zip(
                *(divmod(coord, 100) for coord in coords_in_one_tenth_of_mm)
            )
            flags = 0x02  # 2 = fixed position
            fixed_accuracy = max(
                int(round(self.settings.accuracy * 10000)), 1
            )  # [m] --> [0.1 mm]
            min_accuracy = 0
        else:
            coords = 0, 0, 0
            coords_hp = 0, 0, 0
            flags = 0x01  # 1 = self-survey
            fixed_accuracy = 0
            min_accuracy = max(
                int(round(self.settings.accuracy * 10000)), 1
            )  # [m] --> [0.1 mm]
        payload = pack(
            "<BBHiiibbbBIIIQ",
            0,  # version
            0,  # reserved 1
            flags,
            coords[0],  # ECEF coordinate X
            coords[1],  # ECEF coordinate Y
            coords[2],  # ECEF coordinate Z
            coords_hp[0],  # ECEF coordinate X (high-precision part)
            coords_hp[1],  # ECEF coordinate Y (high-precision part)
            coords_hp[2],  # ECEF coordinate Z (high-precision part)
            0,  # reserved 2
            fixed_accuracy,  # fixed-pos 3D accuracy
            # Survey-in minimum duration [sec]
            max(int(round(self.settings.duration)), 1),
            # Survey-in position accuracy limit [0.1 mm]
            min_accuracy,
            # Reserved 3 (8 bytes)
            0,
        )
        await send(UBX.CFG_TMODE3(payload))

        # Read the CFG_TMODE3 settings back for confirmation
        await send(UBX.CFG_TMODE3(b""))


def test_rtk_base_configuration() -> None:
    from sys import argv

    try:
        from trio import open_nursery, run, sleep
    except ImportError:
        raise ImportError("You need to install 'trio' to run this test") from None

    async def main() -> None:
        from flockwave.connections.serial import SerialPortStream
        from flockwave.gps.parser import create_gps_parser

        port = await SerialPortStream.create(
            argv[1] if len(argv) > 1 else "/dev/cu.usbmodem14201"
        )
        parser = create_gps_parser()

        async with open_nursery() as nursery:
            settings = RTKSurveySettings(accuracy=10)
            settings.set_gnss_types(["gps", "glonass"])
            config = UBXRTKBaseConfigurator(settings)
            nursery.start_soon(config.run, port.send_all, sleep)

            while True:
                data = await port.receive_some()
                # print("raw:", hexlify(data, sep=" ").decode("ascii"))
                for message in parser(data):  # type: ignore
                    if hasattr(message, "packet_type"):
                        print(f"{message.packet_type}", end=" ")
                    print(message)

    run(main)


if __name__ == "__main__":
    test_rtk_base_configuration()