Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-gps / gps / ntrip / replay.py
Size: Mime:
"""Replaying recorded NTRIP streams from JSON files."""

from __future__ import annotations

import sys
from base64 import b64decode
from typing import AsyncIterator

import click


@click.command()
@click.argument("file")
@click.option(
    "-p",
    "--port",
    metavar="PORT",
    default=5555,
    help="the port to listen on; zero or negative if no TCP port is needed",
)
@click.option(
    "--stdout/--no-stdout",
    default=False,
    help="dump the recorded NTRIP stream to the standard output",
)
def ntrip_replayer(file, port: int = 5555, stdout: bool = False):
    """Replays a recorded NTRIP stream from JSON format to clients connecting
    to the given TCP port, looped infinitely.
    """
    from json import loads

    try:
        from trio import (
            BrokenResourceError,
            open_file,
            open_nursery,
            run,
            serve_tcp,
            sleep,
        )
    except ImportError:
        raise ImportError(
            "You need to install 'trio' to use the NTRIP replayer"
        ) from None

    def log(msg: str) -> None:
        print(msg, file=sys.stderr)

    async def iter_contents_of(file: str) -> AsyncIterator[bytes]:
        while True:
            async with await open_file(file, "r") as fp:
                async for line in fp:
                    obj = loads(line)
                    await sleep(obj["dt"] / 1000)
                    yield b64decode(obj["data"])

    async def handle_request(stream):
        log("Connection open")
        async for _chunk in iter_contents_of(file):
            try:
                await stream.send_all(_chunk)
            except BrokenResourceError:
                break
        log("Connection closed")

    async def handle_tcp_socket():
        host = "0.0.0.0"  # noqa: S104
        log(f"Listening on {host}:{port}...")
        await serve_tcp(handle_request, port=port, host=host)

    async def handle_stdout():
        log("Dumping stream to standard output...")
        async for chunk in iter_contents_of(file):
            sys.stdout.buffer.write(chunk)
            sys.stdout.buffer.flush()

    async def main():
        async with open_nursery() as nursery:
            if port > 0:
                nursery.start_soon(handle_tcp_socket)
            if stdout:
                nursery.start_soon(handle_stdout)

    run(main)


if __name__ == "__main__":
    ntrip_replayer()