Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-gps / gps / ntrip / client.py
Size: Mime:
"""NTRIP client related classes."""

from __future__ import annotations

import click
import sys

from base64 import b64encode
from dataclasses import dataclass, replace
from typing import Optional, TYPE_CHECKING
from urllib.parse import urlparse

from flockwave.gps.http import Request

if TYPE_CHECKING:
    from flockwave.gps.http import Response

from .errors import InvalidResponseError

__all__ = ("NtripClient", "NtripClientConnectionInfo")


@dataclass
class NtripClientConnectionInfo:
    """Dataclass that holds the parameters required to connect to an
    NTRIP caster on the Internet.
    """

    host: str
    port: int = 2101
    username: Optional[str] = None
    password: Optional[str] = None
    mountpoint: Optional[str] = None
    version: Optional[int] = None

    @classmethod
    def create_from_uri(cls, uri):
        """Creates a connection info object from a URI representation of the
        form::

            ntrip://[<username>:<password>]@<host>:[<port>][/<mountpoint>]

        or::

            ntrip1://[<username>:<password>]@<host>:[<port>][/<mountpoint>]
        """
        if uri.startswith("ntrip1"):
            parts = urlparse(uri, scheme="ntrip1")
            version = 1
        else:
            parts = urlparse(uri, scheme="ntrip")
            version = 2

        fake_uri = "http://" + parts.netloc + parts.path
        parts = urlparse(fake_uri, scheme="http")

        params = {
            "host": parts.hostname,
            "port": parts.port or 2101,
            "username": parts.username,
            "password": parts.password,
            "mountpoint": parts.path[1:] if len(parts.path) > 1 else None,
            "version": version,
        }
        return cls(**params)


class NtripClient:
    """An NTRIP client object that reads DGPS correction data from an NTRIP
    caster.
    """

    @classmethod
    def create(
        cls,
        host: str = "www.euref-ip.net",
        port: int = 2101,
        username: Optional[str] = None,
        password: Optional[str] = None,
        mountpoint: Optional[str] = None,
        version: Optional[int] = None,
    ):
        """Convenience constructor.

        Parameters:
            host: the hostname of the server. It may also be an
                URI if it starts with ``ntrip://`.
            port: the port of the server to connect to
            username: the username to use for authenticated streams
            password: the password to use for authenticated streams
            mountpoint: the mountpoint to read the RTCM messages from
            version: the NTRIP protocol version that the server speaks.
                ``None`` means the latest available version.

        Returns:
            NtripClient: a configured client object
        """
        if host.startswith("ntrip://") or host.startswith("ntrip1://"):
            conn = NtripClientConnectionInfo.create_from_uri(host)
            updates = {}
            if username is not None:
                updates["username"] = username
            if password is not None:
                updates["password"] = password
            if mountpoint is not None:
                updates["mountpoint"] = mountpoint
            if version is not None:
                updates["version"] = version
            if updates:
                conn = replace(conn, **updates)
        else:
            if version is None:
                version = 2
            if "/" in host and mountpoint is None:
                host, _, mountpoint = host.partition("/")
            conn = NtripClientConnectionInfo(
                host, port, username, password, mountpoint, version
            )
        return cls(connection_info=conn)

    def __init__(self, connection_info: NtripClientConnectionInfo):
        """Constructor.

        In most cases, it is easier to use the ``create()`` class method.

        Parameters:
            connection_info: an object describing how to connect to the server
        """
        self.connection_info = connection_info

    async def get_stream(
        self, mountpoint: Optional[str] = None, timeout: float = 10
    ) -> Response:
        """Returns a file-like object that will stream the DGPS data from
        the NTRIP caster.

        Parameters:
            mountpoint: the mountpoint to connect to. May be
                ``None``, in which case the mountpoint given in the
                connection info object at construction time will be used.
            timeout: timeout to use for the connection attempt, in seconds

        Throws:
            InvalidResponseError: when the caster returned an invalid
                response
        """
        url = self._url_for_mountpoint(mountpoint)

        request = Request(url)
        request.add_header("Accept", b"*/*")
        request.add_header("User-Agent", b"NTRIP NtripClientPOSIX/1.50")
        if self.connection_info.version == 2:
            request.add_header("Ntrip-Version", b"Ntrip/2.0")

        if self.connection_info.username is not None:
            credentials = b64encode(
                "{0.username}:{0.password}".format(self.connection_info).encode("utf-8")
            )
            credentials = credentials.replace(b"\n", b"")
            request.add_header("Authorization", b"Basic " + credentials)

        response = await request.send()
        await response.ensure_headers_processed()

        if response.protocol != b"ICY":
            self._check_header(response, "Content-type", b"gnss/data")

        return response

    def _check_header(self, response: Response, header: str, value: bytes):
        """Checks whether the given HTTP response contains the given header
        with the given value.
        """
        observed_value = response.getheader(header)
        if observed_value != value:
            raise InvalidResponseError(
                "expected Content-type: gnss/data, got {0!r}".format(observed_value)
            )

    def _url_for_mountpoint(self, mountpoint: Optional[str] = None) -> bytes:
        """Returns the URL of the given mountpoint.

        Parameters:
            mountpoint: the mountpoint to connect to. May be None, in which
                case the mountpoint given in the connection info object at
                construction time will be used.

        Returns:
            the URL of the given mountpoint
        """
        mountpoint = mountpoint or self.connection_info.mountpoint
        return f"http://{self.connection_info.host}:{self.connection_info.port}/{mountpoint}".encode(
            "ascii"
        )


@click.command()
@click.argument("url")
@click.option(
    "-u",
    "--username",
    metavar="USERNAME",
    default=None,
    help="the username to use when connecting",
)
@click.option(
    "-p",
    "--password",
    metavar="PASSWORD",
    default=None,
    help="the password to use when connecting",
)
@click.option(
    "-j",
    "--json/--no-json",
    default=False,
    help=(
        "print the timestamped chunks received from the NTRIP server in JSON "
        "format. Payload will be base64-encoded. Useful for replaying the "
        "stream later."
    ),
)
def ntrip_streamer(url, username, password, json):
    """Copies a stream from an NTRIP server directly into the standard
    output.

    The given URL must adhere to the following format:

        [protocol://][username[:password]@]hostname/mountpoint

    where 'protocol' is either 'ntrip' (for NTRIP v2 casters) or 'ntrip1'
    (for NTRIP v1 casters), and it defaults to 'ntrip'. The username and the
    password is optional.

    Example servers to try (if you have the right username and password):
    www.euref-ip.net/BUTE0, ntrip://ntrip.use-snip.com/RTCM3EPH,
    ntrip1://152.66.6.49/RTCM23
    """
    from json import dumps
    from time import monotonic

    try:
        from trio import run
    except ImportError:
        raise ImportError(
            "You need to install 'trio' to use the NTRIP streamer"
        ) from None

    client = NtripClient.create(url, username=username, password=password)

    async def main():
        stream = await client.get_stream()
        prev = monotonic()
        while True:
            data = await stream.read()
            if json:
                now = monotonic()
                dt = int((now - prev) * 1000)
                data = (
                    dumps({"dt": dt, "data": b64encode(data).decode("ascii")}).encode(
                        "ascii"
                    )
                    + b"\n"
                )
                prev = now
            else:
                if not data:
                    print("Stream ended.", file=sys.stderr)
                    break

            sys.stdout.buffer.write(data)
            sys.stdout.flush()

    run(main)


if __name__ == "__main__":
    ntrip_streamer()  # type: ignore