Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-server / server / ext / rc_udp.py
Size: Mime:
"""Extension that accepts UDP packets on a specific port and treats them as
RC channel values for a (virtual or real) RC transmitter.
"""

from __future__ import annotations

from logging import Logger
from trio import fail_after, TooSlowError
from typing import Any, Callable, Sequence, Optional, TYPE_CHECKING

from flockwave.connections import create_connection
from flockwave.connections.socket import UDPListenerConnection
from flockwave.networking import format_socket_address
from flockwave.server.ports import suggest_port_number_for_service, use_port

if TYPE_CHECKING:
    from flockwave.server.app import SkybrushServer

Decoder = Callable[[bytes], Sequence[int]]
"""Type specification for decoder functions that take a raw packet and return a
list of RC channel values
"""

SERVICE: str = "rcin"
"""Name of the service that we use to derive a default port number."""


async def run(app: SkybrushServer, configuration, log):
    host = configuration.get("host", "127.0.0.1")
    port = configuration.get("port", suggest_port_number_for_service(SERVICE))
    formatted_address = format_socket_address((host, port))

    endianness = str(configuration.get("endianness", "little")).lower()
    bytes_per_channel = int(configuration.get("bytesPerChannel", 2))
    if endianness not in ("big", "little"):
        log.error("Endianness must be 'big' or 'little', disabling extension")
        return
    if bytes_per_channel != 1 and bytes_per_channel != 2:
        log.error("Bytes per RC channel must be 1 or 2, disabling extension")
        return

    timeout = float(configuration.get("timeout", 1))
    if timeout < 0:
        timeout = 0
    if timeout == 0:
        timeout = float("inf")

    value_range = parse_range(configuration.get("range"))

    decoder = (
        decode_one_byte_per_channel
        if bytes_per_channel == 1
        else (
            decode_two_bytes_per_channel_big_endian
            if endianness == "big"
            else decode_two_bytes_per_channel_little_endian
        )
    )

    if value_range is not None:
        decoder = rescale(decoder, value_range)

    rc = app.import_api("rc")
    connection = create_connection(f"udp-listen://{host}:{port}")

    async def handler(connection):
        await handle_udp_datagrams(
            connection,
            log=log,
            address=formatted_address,
            decoder=decoder,
            on_changed=rc.notify,
            on_lost=rc.notify_lost,
            timeout=timeout,
        )

    with (
        use_port(SERVICE, port),
        app.connection_registry.use(connection, name="UDP RC input"),
    ):
        await app.supervise(connection, task=handler)


def parse_range(value: Any) -> Optional[tuple[int, int]]:
    """Parses the 'range' parameter from the configuration and returns the
    lower and upper bound of each channel; the upper bound is exclusive.
    """
    if value is None:
        return None
    elif isinstance(value, (list, tuple)) and len(value) == 2:
        return tuple(value)  # type: ignore
    else:
        raise ValueError("range must be None or an array of length 2")


def decode_one_byte_per_channel(data: bytes) -> Sequence[int]:
    """Decoder function for incoming datagrams when we have one byte per channel."""
    return [x * 257 for x in data]


def decode_two_bytes_per_channel_big_endian(data: bytes) -> Sequence[int]:
    """Decoder function for incoming datagrams when we have two bytes per channel
    and each channel is big endian.
    """
    num_channels = len(data) // 2
    result = [0] * num_channels
    for i in range(num_channels):
        result[i] = (data[2 * i] << 8) + data[2 * i + 1]
    return result


def decode_two_bytes_per_channel_little_endian(data: bytes) -> Sequence[int]:
    """Decoder function for incoming datagrams when we have two bytes per channel
    and each channel is little endian.
    """
    num_channels = len(data) // 2
    result = [0] * num_channels
    for i in range(num_channels):
        result[i] = (data[2 * i + 1] << 8) + data[2 * i]
    return result


def rescale(decoder: Decoder, bounds: tuple[int, int]) -> Decoder:
    min_value, max_value = bounds
    ratio = 65535.0 / (max_value - min_value)

    def scaled_decoder(data: bytes) -> Sequence[int]:
        return [
            (
                0
                if value < min_value
                else (
                    65535
                    if value > max_value
                    else int(round(value - min_value) * ratio)
                )
            )
            for value in decoder(data)
        ]

    return scaled_decoder


async def handle_udp_datagrams(
    connection: UDPListenerConnection,
    log: Logger,
    address: str,
    decoder: Decoder,
    on_changed: Callable[[Sequence[int]], None],
    on_lost: Callable[[], None],
    timeout: float = 1,
) -> None:
    log.info(f"Listening for UDP RC input on {address}")

    connected = False

    try:
        while True:
            if connected:
                try:
                    with fail_after(timeout):
                        data, _ = await connection.read()
                except TooSlowError:
                    connected = False
                    log.warning("UDP RC input lost")
                    on_lost()
                    data = None
            else:
                data, _ = await connection.read()
                connected = True
                log.info("UDP RC connection established")

            if data:
                try:
                    channels = decoder(data)
                except Exception:
                    # probably dropping malformed packet
                    pass
                else:
                    on_changed(channels)
    finally:
        if connected:
            on_lost()
        log.info(f"UDP RC input closed on {address}")


dependencies = ("rc",)
description = "RC input source using UDP datagrams"
tags = "experimental"
schema = {
    "properties": {
        "host": {
            "type": "string",
            "title": "Host",
            "description": (
                "IP address of the host that the server should listen for incoming "
                "UDP datagrams. Use an empty string to listen on all interfaces, or "
                "127.0.0.1 to listen on localhost only"
            ),
            "default": "127.0.0.1",
            "propertyOrder": 10,
        },
        "port": {
            "type": "integer",
            "title": "Port",
            "description": (
                "Port that the server should listen on. Untick the checkbox to "
                "let the server derive the port number from its own base port."
            ),
            "minimum": 1,
            "maximum": 65535,
            "default": suggest_port_number_for_service(SERVICE),
            "required": False,
            "propertyOrder": 20,
        },
        "bytesPerChannel": {
            "type": "integer",
            "title": "Bytes per channel",
            "minimum": 1,
            "maximum": 2,
            "default": 2,
            "description": "Number of bytes per channel in each UDP packet",
        },
        "endianness": {
            "type": "string",
            "title": "Endianness",
            "description": "Endianness of each incoming packet",
            "default": "little",
            "enum": ["big", "little"],
            "options": {
                "enum_titles": [
                    "Big endian (network byte order, MSB first)",
                    "Little endian (LSB first)",
                ]
            },
        },
        "range": {
            "title": "Override channel range",
            "type": "array",
            "format": "table",
            "minItems": 2,
            "maxItems": 2,
            "items": {"type": "integer"},
            "required": False,
        },
        "timeout": {
            "title": "Timeout",
            "type": "number",
            "minimum": 0,
            "default": 1,
            "description": (
                "Number of seconds that must pass without receiving a UDP "
                "packet to consider the RC connection as lost. Zero means "
                "to disable the timeout."
            ),
        },
    }
}