Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-server / server / ext / http_server / extension.py
Size: Mime:
"""Extension that extends the Skybrush server with an HTTP server listening
on a specific port.

This server can then be used by other extensions to implement HTTP-specific
functionality such as a web-based debug page (`flockwave.ext.debug`) or a
Socket.IO-based channel.
"""

import logging

from contextlib import contextmanager
from dataclasses import dataclass
from heapq import heapify, heappush
from hypercorn.config import Config as HyperConfig
from hypercorn.trio import serve
from pathlib import Path
from quart import abort, Blueprint, Quart, redirect, request, url_for
from quart_trio import QuartTrio
from trio import current_time, sleep
from typing import Iterable, Optional

from flockwave.ext.manager import ExtensionManager
from flockwave.networking import can_bind_to_tcp_address, format_socket_address
from flockwave.server.ports import suggest_port_number_for_service, use_port
from flockwave.server.types import Disposer
from flockwave.server.utils.networking import get_known_apps_for_port
from flockwave.server.utils.packaging import is_oxidized

from .routing import RoutingMiddleware

__all__ = ("exports", "load", "unload", "schema")

PACKAGE_NAME = __name__.rpartition(".")[0]

SERVICE: str = "http"
"""Name of the service that we use to derive a default port number."""

############################################################################

proposed_index_pages: list["ProposedIndexPage"] = []
quart_app: Optional[Quart] = None
got_first_request: bool = False
ext_manager: Optional[ExtensionManager] = None

############################################################################


@dataclass(order=True)
class ProposedIndexPage:
    priority: int = 0
    route: str = "/"


############################################################################


def create_app():
    """Creates the ASGI web application provided by this extension."""

    global quart_app, got_first_request

    if quart_app is not None:
        raise RuntimeError("App is already created")

    # root_path set to Path.cwd() because we won't have any resources here and
    # we want to be compatible with PyOxidizer where everything preferably
    # runs from memory
    if is_oxidized():
        cwd = str(Path.cwd())
        quart_app = app = QuartTrio(PACKAGE_NAME, instance_path=cwd, root_path=cwd)
    else:
        quart_app = app = QuartTrio(PACKAGE_NAME)

    # Track whether the first request was processed
    got_first_request = False

    @app.before_request
    async def track_first_request():
        global got_first_request
        got_first_request = True

    # Set up the default index route
    @app.route("/")
    async def index():
        index_url = get_index_url()
        if index_url:
            if request.query_string:
                index_url += "?" + request.query_string.decode("utf-8")
            return redirect(index_url)
        else:
            abort(404)

    router = RoutingMiddleware()
    router.add(app, scopes=("http", "websocket"))
    return router


def get_index_url() -> Optional[str]:
    """Get the index URL with the highest priority that was proposed by
    other extensions.

    Returns:
        the URL of the best proposed index page or ``None`` if no index page has
        been proposed
    """
    global proposed_index_pages
    if proposed_index_pages:
        return url_for(proposed_index_pages[0].route)
    else:
        return None


def mount(
    app, *, path: str, scopes: Optional[Iterable[str]] = None, priority: int = 0
) -> Optional[Disposer]:
    """Mounts the given ASGI web application or Quart blueprint at the
    given path.

    Parameters:
        app: the ASGI web application or Quart blueprint to mount
        path: the path to mount the web application or blueprint at
        scopes: when the app is an ASGI web application, specifies the ASGI
            scopes that the web application should respond to. `None` means
            to respond to all scopes. Ignored for blueprints.
        priority: the priority of the route if the app is an ASGI web
            application. Web applications with higher priorities take precedence
            over lower ones. Ignored for blueprints.

    Returns:
        a function that can be called to unmount the application. Unmounting
        of blueprints is not supported yet.
    """
    global ext_manager, exports, quart_app, got_first_request

    if isinstance(app, Blueprint):
        # Blueprints can only be registered if the app has not served its first
        # request yet
        assert quart_app is not None
        if got_first_request:
            if ext_manager:
                ext_manager.request_host_app_restart("http_server")
        else:
            quart_app.register_blueprint(app, url_prefix=path)
    else:
        return exports["asgi_app"].add(app, scopes=scopes, path=path, priority=priority)


@contextmanager
def mounted(
    app, *, path: str, scopes: Optional[Iterable[str]] = None, priority: int = 0
):
    """Context manager that mounts the given ASGI web application or Quart
    blueprint at the given path, and unmounts it when the context is exited.

    Parameters:
        app: the ASGI web application or Quart blueprint to mount
        path: the path to mount the web application or blueprint at
        scopes: when the app is an ASGI web application, specifies the ASGI
            scopes that the web application should respond to. `None` means
            to respond to all scopes. Ignored for blueprints.
        priority: the priority of the route if the app is an ASGI web
            application. Web applications with higher priorities take precedence
            over lower ones. Ignored for blueprints.
    """
    remover = mount(app, path=path, scopes=scopes, priority=priority)
    try:
        yield
    finally:
        if remover:
            remover()


def propose_index_page(route: str, priority: int = 0) -> Disposer:
    """Proposes the given route as a potential index page for the
    Skybrush server. This method can be called from the ``load()``
    functions of extensions when they want to propose one of their own
    routes as an index page. The server will select the index page with
    the highest priority when all the extensions have been loaded.

    Parameters:
        route: name of a route to propose as the index page, in the form of
            ``blueprint.route`` (e.g., ``webui.index``)
        priority: the priority of the proposed route.

    Returns:
        a function that can be called with no arguments to reovke the proposed
        index page
    """
    global proposed_index_pages

    page = ProposedIndexPage(priority=-priority, route=route)

    def disposer():
        index = proposed_index_pages.index(page)
        if index >= 0:
            proposed_index_pages[index] = proposed_index_pages[-1]
            proposed_index_pages.pop()
            heapify(proposed_index_pages)

    heappush(proposed_index_pages, page)

    return disposer


@contextmanager
def proposed_index_page(route: str, priority: int = 0):
    """Context manager that adds the given route as a potential index page for
    the Skybrush server as long as the execution is within the context, and
    revokes it when the context is exited.

    Parameters:
        route: name of a route to propose as the index page, in the form of
            ``blueprint.route`` (e.g., ``webui.index``)
        priority: the priority of the proposed route.
    """
    disposer = propose_index_page(route, priority)
    try:
        yield
    finally:
        disposer()


############################################################################


def load(app, configuration):
    """Loads the extension."""
    global exports, ext_manager

    address = (
        configuration.get("host", "localhost"),
        configuration.get("port", suggest_port_number_for_service(SERVICE)),
    )
    ext_manager = app.extension_manager

    exports.update(address=address, asgi_app=create_app())


def unload(app):
    """Unloads the extension."""
    global exports, ext_manager, quart_app, got_first_request

    quart_app = None
    ext_manager = None
    got_first_request = False
    exports.update(address=None, asgi_app=None)


async def run(app, configuration, logger):
    global exports, KNOWN_PORTS

    address = exports.get("address")
    if address is None:
        logger.warn("HTTP server address is not specified in configuration")
        return

    host, port = address

    # Parse startup delay from configuration. This is a horrible hack, but it
    # can be used to work around problems with certain setups where the server
    # responds to incoming HTTP requests _before_ all the extensions had a
    # chance to load. If one of these extensions wants to register a blueprint
    # on the HTTP server, the incoming request will prevent the server from
    # registering the blueprint because blueprints cannot be registered in Flask
    # after the first request. Increasing the delay gives a chance for the
    # extensions to load before the HTTP server starts responding to requests.
    maybe_startup_delay = configuration.get("startup_delay")
    startup_delay = 0.0
    if maybe_startup_delay is not None:
        try:
            startup_delay = float(maybe_startup_delay)
            if startup_delay < 0:
                raise ValueError
        except ValueError:
            logger.warn(f"Ignoring invalid startup delay: {maybe_startup_delay!r}")

    # Don't show info messages by default (unless the app is in debug mode),
    # show warnings and errors only
    server_log = logger.getChild("hypercorn")
    if not app.debug:
        server_log.setLevel(logging.WARNING)

    # Create configuration for Hypercorn
    config = HyperConfig()
    config.accesslog = server_log
    config.bind = [f"{host}:{port}"]
    config.certfile = configuration.get("certfile") or None
    config.errorlog = server_log
    config.keyfile = configuration.get("keyfile") or None
    config.use_reloader = False

    secure = bool(config.ssl_enabled)

    # Test quickly whether we can bind to the given host and port; if we cannot,
    # chances are that something else is using the port
    if not await can_bind_to_tcp_address(address):
        protocol = "HTTPS" if secure else "HTTP"
        formatted_address = format_socket_address(address)
        message = f"Cannot bind {protocol} server to {formatted_address}; is the port already in use?"
        apps = get_known_apps_for_port(port)

        if apps:
            message += "\nThe following application(s) may be using this port:\n"
            message += "\n".join(f"  - {app}" for app in apps)
            message += "\nAlternatively, you might have another instance of Skybrush Server running."

        logger.error(message, extra={"telemetry": "ignore"})
        return

    # Port seems to be available so try to start the "proper" server
    retries = 0
    max_retries = 3

    # Make sure to respect the prescribed startup delay
    await sleep(startup_delay)

    while True:
        logger.info(
            "Starting {1} server on {0}".format(
                format_socket_address(address), "HTTPS" if secure else "HTTP"
            )
        )

        started_at = current_time()

        try:
            with use_port(SERVICE, port):
                await serve(exports["asgi_app"], config)
        except Exception as ex:
            print(repr(ex))
            print(getattr(ex, "__cause__", None))
            # Server crashed -- maybe a change in IP address? Let's try again
            # if we have not reached the maximum retry count.
            if current_time() - started_at >= 5:
                retries = 0

            if retries < max_retries:
                logger.error(
                    "Server stopped unexpectedly, retrying...",
                    extra={"telemetry": "ignore"},
                )
                await sleep(1)
                retries += 1
            else:
                # Re-raise the exception; the extension manager will take care
                # of logging it nicely
                raise
        else:
            break


description = "HTTP server that listens on a specific port"

exports = {
    "address": None,
    "asgi_app": None,
    "mount": mount,
    "mounted": mounted,
    "propose_index_page": propose_index_page,
    "proposed_index_page": proposed_index_page,
}

schema = {
    "properties": {
        "host": {
            "type": "string",
            "title": "Host",
            "description": (
                "IP address of the host that the server should listen on. Use "
                "an empty string to listen on all interfaces, or 127.0.0.1 to "
                "listen on localhost only"
            ),
            "default": "127.0.0.1",
            "propertyOrder": 10,
        },
        "port": {
            "type": "integer",
            "title": "Port",
            "description": (
                "Port that the server should listen on. Untick the checkbox to "
                "let the server derive the port number from its own base port."
            ),
            "minimum": 1,
            "maximum": 65535,
            "default": suggest_port_number_for_service("http"),
            "required": False,
            "propertyOrder": 20,
        },
        "certfile": {
            "type": "string",
            "title": "Certificate file",
            "description": "Full path to the certificate file that the server should use for HTTPS connections",
            "required": False,
        },
        "keyfile": {
            "type": "string",
            "title": "Private key file",
            "description": "Full path to the private key file corresopnding to the certificate",
            "required": False,
        },
    }
}