Repository URL to install this package:
|
Version:
2.40.0 ▾
|
"""Skybrush server extension that adds debugging tools and a test page to
the Skybrush server.
"""
from __future__ import annotations
import json
import threading
from contextlib import ExitStack
from dataclasses import dataclass, field
from functools import wraps
from logging import Logger
from operator import attrgetter
from quart import abort, make_response, redirect, render_template, request, url_for
from trio import sleep_forever
from trio.lowlevel import current_root_task
from typing import Any, Awaitable, Callable, Optional, TYPE_CHECKING
from flockwave.ext.errors import NotSupportedError
from flockwave.server.utils import overridden
from flockwave.server.utils.quart import make_blueprint
from .utils import (
can_save_server_configuration,
get_server_configuration_as_json,
save_server_configuration,
)
if TYPE_CHECKING:
from flockwave.ext.manager import ExtensionManager
from flockwave.server.app import SkybrushServer
from semver import Version
__all__ = ("index", "run")
blueprint = make_blueprint(
"webui",
__name__,
static_folder="static",
template_folder="templates",
static_url_path="/static",
)
app: Optional["SkybrushServer"] = None
is_public: bool = False
log: Optional[Logger] = None
async def run(app, configuration, logger):
"""Runs the extension."""
global is_public
frontend = app.import_api("frontend")
http_server = app.import_api("http_server")
path = configuration.get("route", "/webui")
is_public = bool(configuration.get("public"))
with ExitStack() as stack:
stack.enter_context(overridden(globals(), app=app, log=logger))
stack.enter_context(http_server.mounted(blueprint, path=path))
stack.enter_context(
frontend.use_link_on_front_page(f"{blueprint.name}.index", "Configure")
)
await sleep_forever()
#############################################################################
@dataclass
class ExtensionInfo:
name: str
description: str = ""
loaded: bool = False
tags: list[str] = field(default_factory=list)
dependencies: list[str] = field(default_factory=list)
dependents: list[str] = field(default_factory=list)
restart_requested: bool = False
version: Optional[Version] = None
@classmethod
def for_extension(
cls, name: str, ext_manager: "ExtensionManager", *, details: bool = False
):
result = cls(name=name, loaded=ext_manager.is_loaded(name))
result.description = ext_manager.get_description_of_extension(name) or ""
result.tags = sorted(ext_manager.get_tags_of_extension(name))
result.restart_requested = ext_manager.was_app_restart_requested_by(name)
result.version = ext_manager.get_version_of_extension(name)
if details:
result.dependencies = sorted(
ext_manager.get_dependencies_of_extension(name)
)
result.dependents = sorted(
ext_manager.get_reverse_dependencies_of_extension(name)
)
return result
#############################################################################
# Helper functions for routes
def is_debugging() -> bool:
global app
extension_manager = app.extension_manager if app else None
return extension_manager is not None and extension_manager.is_loaded("debug")
def only_when_debugging(func: Callable[..., Awaitable[Any]]):
"""Decorator that can be added to a route and that enables the route if and
only if the debug extension is enabled.
"""
@wraps(func)
async def decorated(*args, **kwds):
if is_debugging():
return await func(*args, **kwds)
else:
abort(404)
return decorated
def _get_extension_by_name(name: str) -> tuple[ExtensionInfo, "ExtensionManager"]:
extension_manager = app.extension_manager if app else None
if extension_manager and name in extension_manager.known_extensions:
try:
extension = ExtensionInfo.for_extension(
name, extension_manager, details=True
)
except ModuleNotFoundError:
extension = None
else:
extension = None
if extension is None:
abort(404)
assert extension_manager is not None
return extension, extension_manager
async def _configure_extension_from_request_body_if_needed(
name: str,
) -> "ExtensionManager":
_, extension_manager = _get_extension_by_name(name)
data = await request.get_json()
if data and isinstance(data, dict):
# Configure the extension first
config = data.get("config")
if isinstance(config, dict):
extension_manager.configure(name, config)
return extension_manager
async def _to_json(
func: Callable[..., Awaitable[Any]], *args, on_success: Any = None
) -> dict[str, Any]:
"""Calls the given function and returns its result, wrapped in an appropriate
JSON object. Catches any exceptions raised from the function and also wraps
them in an appropriate JSON object.
"""
global log
try:
result = await func(*args)
except Exception as ex:
message: str
if isinstance(ex, NotSupportedError):
# handled silently
message = "Operation not supported"
elif isinstance(ex, OSError):
message = ex.strerror
if log:
log.error(ex.strerror)
else:
message = str(ex)
if log:
log.exception(ex)
return {"error": message}
return {"result": on_success} if result is None else {"result": result}
#############################################################################
# Route definitions
@blueprint.before_request
def fail_if_not_localhost() -> None:
"""Checks the environment of the current request being served and aborts
the request with an HTTP 403 Forbidden if it is not coming from localhost.
"""
if not is_public:
# We need to abort the request if it is not coming from localhost or
# if it has passed through proxy servers
if request.remote_addr != "127.0.0.1" or len(request.access_route) > 1:
abort(403)
@blueprint.context_processor
def inject_debug_variable() -> dict[str, Any]:
"""Injects the `can_save_config`, `debug` and `restart_requested` variables
into all template contexts.
"""
return {
"can_save_config": can_save_server_configuration(app),
"debug": is_debugging(),
"restart_requested": (
app.extension_manager.app_restart_requested if app else False
),
}
@blueprint.route("/")
async def index():
"""Returns the index page of the extension."""
return redirect(url_for(".list_extensions"))
@blueprint.route("/config", defaults={"as_attachment": False, "compact": False})
@blueprint.route("/config.json", defaults={"as_attachment": True, "compact": False})
@blueprint.route("/config/full", defaults={"as_attachment": False, "compact": False})
@blueprint.route(
"/config/full.json", defaults={"as_attachment": True, "compact": False}
)
@blueprint.route("/config/compact", defaults={"as_attachment": False, "compact": True})
@blueprint.route(
"/config/compact.json", defaults={"as_attachment": True, "compact": True}
)
async def get_configuration(as_attachment: bool = False, compact: bool = False):
"""Returns the current configuration of the server in JSON format."""
if app is None:
abort(403)
config = get_server_configuration_as_json(app, compact=compact)
formatted_config = json.dumps(config, indent=2, sort_keys=True)
response = await make_response(formatted_config, 200)
response.headers["Content-type"] = "application/json"
if as_attachment:
response.headers["Content-disposition"] = 'attachment; filename="config.json"'
return response
@blueprint.route("/config/save", methods=["POST"])
async def save_configuration():
"""Saves the current configuration of the server, overwriting its configuration
file.
"""
if app is None:
abort(403)
return await _to_json(save_server_configuration, app, on_success=True)
@blueprint.route("/extensions")
async def list_extensions():
"""Returns a page that lists all the extensions currently known to the
server and allows the user to load or unload them.
"""
extension_manager = app.extension_manager if app else None
extensions: list[ExtensionInfo] = []
if extension_manager:
for name in extension_manager.known_extensions:
try:
info = ExtensionInfo.for_extension(name, extension_manager)
extensions.append(info)
except ModuleNotFoundError:
# The configuration somehow refers to an extension that does not
# exist; this is okay, we just ignore it
pass
except Exception:
# error while importing extension; let's log it an ignore it
if log:
log.warning(f"Error while importing extension: {name!r}")
return await render_template(
"extensions.html.j2", title="Extensions", extensions=extensions
)
@blueprint.route("/version-info")
async def version_info():
"""Returns a page that shows the version information of the server
and the Python packages that it depends on.
"""
from importlib import metadata
distributions = sorted(metadata.distributions(), key=attrgetter("name"))
return await render_template(
"version_info.html.j2",
title="Version Info",
distributions=distributions,
)
@blueprint.route("/messages")
@only_when_debugging
async def send_messages():
"""Returns a page that allows the user to send messages to the server."""
return await render_template("messages.html.j2", title="Messages")
@blueprint.route("/threads")
@only_when_debugging
async def list_threads():
"""Returns a page that lists all active threads in the server."""
return await render_template(
"threads.html.j2", threads=threading.enumerate(), title="Threads"
)
@blueprint.route("/tasks")
@only_when_debugging
async def list_tasks():
"""Returns a page that lists all active Trio tasks in the server."""
tasks: list[tuple[str, Any]] = []
queue: list[tuple[int, Any]] = [(0, current_root_task())]
while queue:
level, task = queue.pop()
tasks.append((" " * level, task))
for nursery in task.child_nurseries:
queue.extend(
(level + 1, task)
for task in sorted(
nursery.child_tasks, key=attrgetter("name"), reverse=True
)
)
return await render_template("tasks.html.j2", title="Tasks", tasks=tasks)
@blueprint.route("/extensions/<name>")
async def show_extension_details(name):
"""Returns a page that shows the details and configuration of an extension
of the server.
"""
extension, extension_manager = _get_extension_by_name(name)
config = extension_manager.get_configuration_snapshot(name)
if isinstance(config, dict):
config.pop("enabled", None)
schema = extension_manager.get_configuration_schema(name)
return await render_template(
"extension_details.html.j2",
title=f"Extension: {name}",
extension=extension,
config=config,
schema=schema,
)
@blueprint.route("/extensions/<name>/load", methods=["POST"])
async def load_extension(name):
"""Loads the extension with the given name in response to a POST request."""
extension_manager = await _configure_extension_from_request_body_if_needed(name)
return await _to_json(extension_manager.load, name, on_success=True)
@blueprint.route("/extensions/<name>/unload", methods=["POST"])
async def unload_extension(name):
"""Unloads the extension with the given name in response to a POST request."""
_, extension_manager = _get_extension_by_name(name)
return await _to_json(extension_manager.unload, name, on_success=True)
@blueprint.route("/extensions/<name>/reload", methods=["POST"])
async def reload_extension(name):
"""Reloads the extension with the given name in response to a POST request."""
_, extension_manager = _get_extension_by_name(name)
async def reload():
await _configure_extension_from_request_body_if_needed(name)
await extension_manager.reload(name)
return True
return await _to_json(reload)
dependencies = ("frontend", "http_server")
description = "Adds a web-based configuration user interface to the server."
schema = {}