Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
"""Types specific to the mission planning and management extension."""

from abc import ABC, abstractmethod
from logging import Logger
from blinker import Signal
from dataclasses import dataclass
from datetime import datetime
from time import time
from trio import Cancelled
from typing import (
    final,
    Any,
    Awaitable,
    ClassVar,
    Generic,
    Optional,
    Union,
    TypeVar,
)

from flockwave.server.errors import NotSupportedError
from flockwave.server.model.object import ModelObject
from flockwave.server.utils import maybe_round
from flockwave.server.utils.validation import Validator, ValidationError

from .types import MissionState

__all__ = ("Mission", "MissionPlan", "MissionType")


class Mission(ModelObject):
    """Representation of a single mission on the server.

    A mission consists of a _type_, an associated set of _parameters_, an
    optional _mission plan_ (generated from the type and planning parameters),
    and a set of drones that the server is allowed to control while the mission
    is running.

    This is an abstract superclass that is meant to serve as a base implementation
    for "real" missions. You will need to override at least the ``_run()`` method
    in subclasses.
    """

    _id: str = ""
    """The unique identifier of the mission."""

    type: str = ""
    """The type of the mission. Must be one of the identifiers from the mission
    type registry.
    """

    parameter_validator: Optional[Validator] = None
    """The JSON schema validator of the mission parameters."""

    state: MissionState = MissionState.NEW
    """The state of the mission."""

    created_at: float
    """Timestamp that stores when the mission was created on the server."""

    authorized_at: Optional[float] = None
    """Timestamp that stores when the mission was authorized to start on the
    server, or ``None`` if the mission was not authorized to start yet.
    """

    starts_at: Optional[float] = None
    """Scheduled start time of the mission (if it is scheduled to start
    automatically). ``None`` if there is no scheduled start time yet.
    """

    started_at: Optional[float] = None
    """Timestamp that stores when the mission has started, or ``None`` if the
    mission has not started yet.
    """

    finished_at: Optional[float] = None
    """Timestamp that stores when the mission has finished, or ``None`` if it
    has not finished yet or has not been started yet.
    """

    log: Optional[Logger] = None
    """Logger to use during the execution of the mission. Not available when
    the mission is not running.
    """

    _cancel_requested: bool = False
    """Stores whether the mission was requested to be cancelled."""

    _is_allowed_to_start_late: bool = False
    """Stores whether the mission is allowed to start even if the current
    timestamp is later than the start time of the mission _when the mission
    is sent to the scheduler_. Typically this property can remain ``False``;
    the only time you need to set it to ``True`` is when you want the mission
    to start immediately. In that case, you can set the start time of the mission
    to the current POSIX timestamp, but you also need to set ``_is_allowed_to_start_late``
    to ``True``, otherwise a few milliseconds will pass until the mission gets
    scheduled in the scheduler, and by that time it would already be late.
    """

    on_authorization_changed: ClassVar[Signal] = Signal(
        doc="Signal that is emitted when the authorization of a mission changes."
    )

    on_cancel_requested: ClassVar[Signal] = Signal(
        doc="Signal that is emitted when the user requests the cancellation of the mission."
    )

    on_start_time_changed: ClassVar[Signal] = Signal(
        doc="Signal that is emitted when the start time of a mission changes."
    )

    on_updated: ClassVar[Signal] = Signal(
        doc="Signal that is emitted when the state of the mission changes in "
        "any way that clients might be interested in."
    )

    def __init__(self):
        """Constructor."""
        self.created_at = time()

    @property
    def cancel_requested(self) -> bool:
        """Returns whether the user requested the mission to be cancelled."""
        return self._cancel_requested

    @property
    def device_tree_node(self) -> None:
        return None

    @property
    def id(self) -> str:
        return self._id

    @property
    def is_allowed_to_start_late(self) -> bool:
        """Returns whether the mission is allowed to start later than its
        scheduled start time. This property is set internally to ``True``
        when ``start_now()`` is called.
        """
        return self._is_allowed_to_start_late

    @property
    def is_authorized_to_start(self) -> bool:
        """Returns whether the mission is authorized to start."""
        return self.authorized_at is not None

    @property
    def is_finished(self) -> bool:
        """Returns whether the mission has finished already (successfully or not)."""
        return self.finished_at is not None

    @property
    def is_started(self) -> bool:
        """Returns whether the mission has been started already."""
        return self.started_at is not None

    @property
    def json(self) -> dict[str, Any]:
        """Returns the JSON representation of the mission."""
        return {
            "id": self.id,
            "type": self.type,
            "state": self.state.value,
            "parameters": self.parameters,
            "createdAt": maybe_round(self.created_at),
            "authorizedAt": maybe_round(self.authorized_at),
            "startsAt": maybe_round(self.starts_at),
            "startedAt": maybe_round(self.started_at),
            "finishedAt": maybe_round(self.finished_at),
        }

    @property
    def parameters(self) -> dict[str, Any]:
        """Returns the parameters of the mission in the format they should be
        serialized when converting into JSON.

        Override this getter in subclasses if your mission uses custom
        parameters that you wish to expose in its JSON representation.
        """
        return {}

    @property
    def was_successful(self) -> bool:
        """Returns whether the mission finished successfully."""
        return self.state is MissionState.SUCCESSFUL

    @final
    def authorize_to_start(self) -> None:
        """Authorizes the mission to start, moving it from its ``NEW`` state to
        ``AUTHORIZED_TO_START``. Parameters of missions that are authorized to
        start can not be modified any more. The scheduled start time may still
        be modified or cleared.

        This function is a no-op if the mission is already authorized with an
        earlier timestamp.

        Do not override this method. If you want to react to changes in the
        authorization state of the mission, update the internal
        `_handle_authorization()` method instead.

        Raises:
            RuntimeError: if the mission is not in the ``NEW`` or
                ``AUTHORIZED_TO_START`` state
        """
        self._ensure_not_started_yet()
        old_authorization_time = self.authorized_at
        old_state = self.state

        if self.authorized_at is not None:
            self.authorized_at = min(self.authorized_at, time())
        else:
            self.authorized_at = time()

        if old_state != self.state or old_authorization_time != self.authorized_at:
            self.state = MissionState.AUTHORIZED_TO_START
            self._handle_authorization()
            self.on_authorization_changed.send(self)
            self.notify_updated()

    def cancel(self) -> None:
        """Cancels the mission if it is running. Skips execution and moves
        straight to the cancelled state if the mission is not running. No-op if
        the mission was already cancelled or terminated.
        """
        self._cancel_requested = True
        if not self.is_started:
            # Start the mission now; the scheduler will call the run() method
            # of the mission, which will return immediately and update the
            # state variables
            self.start_now()
        self.on_cancel_requested.send(self)

    def notify_updated(self) -> None:
        """Notifies all subscribers to the `on_updated()` event that the mission
        was updated.
        """
        self.on_updated.send(self)

    @final
    def revoke_authorization(self) -> None:
        """Revokes the authorization of the mission to start, moving it from
        its ``AUTHORIZED_TO_START`` state back to ``NEW``. Parameters can then
        be modified again. The scheduled start time will not change but the
        mission will not start until authorized again.

        This function is a no-op if the mission is not authorized to start yet.

        Do not override this method. If you want to react to changes in the
        authorization state of the mission, update the internal
        `_handle_authorization()` method instead.

        Raises:
            RuntimeError: if the mission is not in the ``NEW`` or
                ``AUTHORIZED_TO_START`` state
        """
        self._ensure_not_started_yet()
        if self.state != MissionState.NEW or self.authorized_at is not None:
            self.authorized_at = None
            self.state = MissionState.NEW
            self._handle_authorization()
            self.on_authorization_changed.send(self)
            self.notify_updated()

    @final
    async def run(self, log: Optional[Logger]) -> None:
        """Run the task corresponding to the mission. This function will be
        called by the mission scheduler when it is time to start the mission.

        This function must be called only when the mission is in the
        ``AUTHORIZED_TO_START`` state (and the scheduler will ensure this in
        normal operation).

        Do not override this method; override the internal `_run()` method
        instead.

        Parameters:
            log: the logger that the mission should use to log events of interest

        Raises:
            RuntimeError: if the mission is not in the ``AUTHORIZED_TO_START``
                state
        """
        self._ensure_waiting_to_start()

        self.started_at = time()

        if not self._cancel_requested:
            self.state = MissionState.ONGOING
            self.notify_updated()

        self.log = log
        try:
            if not self._cancel_requested:
                await self._run()
        except RuntimeError as ex:
            # Sort-of-expected exception, simply log it as an error
            if log:
                log.error(str(ex), extra={"id": self.id, "telemetry": "ignore"})
            self.state = MissionState.FAILED
        except Exception:
            # More serious exception, log it as a proper exception
            if log:
                log.exception(
                    "Unexpected error while executing mission", extra={"id": self.id}
                )
            self.state = MissionState.FAILED
        except Cancelled:
            # Scheduler cancelled the execution of the mission
            if log:
                log.warning("Mission cancelled", extra={"id": self.id})
            self.state = MissionState.CANCELLED

            # Cancelled exceptions must be re-raised
            raise
        else:
            if self._cancel_requested:
                self.state = MissionState.CANCELLED
            else:
                self.state = MissionState.SUCCESSFUL
        finally:
            self.log = None
            self.finished_at = time()
            self.notify_updated()

    @final
    def start_now(self) -> None:
        """Authorizes the mission to start and starts it as soon as possible.

        Raises:
            RuntimeError: if the mission is not in the ``NEW`` or
                ``AUTHORIZED_TO_START`` states.
        """
        self._ensure_not_started_yet()
        self._is_allowed_to_start_late = True
        self.update_start_time(time())
        self.authorize_to_start()

    @final
    def update_parameters(self, parameters: dict[str, Any]) -> None:
        """Updates one or more parameters of the mission.

        This function must be called only when the mission is in the ``NEW``
        state.

        Do not override this method. If you want to add support for mission
        parameters, update the internal `_update_parameters()` method instead.

        Raises:
            RuntimeError: if the mission is not in the ``NEW`` state or if
                mission parameters cannot be validated according to mission
                parameter schema

        """
        self._ensure_new()
        self._validate_parameters(parameters)
        self._update_parameters(parameters)
        self.notify_updated()

    @final
    def update_start_time(self, start_time: Optional[Union[float, datetime]]):
        """Updates or clears the start time of the missions.

        This function must be called only when the mission is in the ``NEW``
        or ``AUTHORIZED_TO_START`` state.

        Do not override this method. If you want to react to changes in the
        scheduled start time of the mission, update the internal
        `_handle_start_time_change()` method instead.
        """
        self._ensure_not_started_yet()
        timestamp = (
            start_time
            if start_time is None or isinstance(start_time, (int, float))
            else start_time.timestamp()
        )

        if self.starts_at != timestamp:
            self.starts_at = timestamp
            self._handle_start_time_change()
            self.on_start_time_changed.send(self)
            self.notify_updated()

    def _ensure_new(self) -> None:
        """Ensures that the mission is in the ``NEW`` state.

        Raises:
            RuntimeError: if the mission is not in the ``NEW`` state
        """
        if self.state != MissionState.NEW:
            raise RuntimeError("Mission is already finalized")

    def _ensure_not_started_yet(self) -> None:
        """Ensure that the mission has not started yet. This is the case if the
        mission is in the ``NEW`` or ``AUTHORIZED_TO_START`` state.

        Raises:
            RuntimeError: if the mission has already started
        """
        if self.state not in (MissionState.NEW, MissionState.AUTHORIZED_TO_START):
            raise RuntimeError("Mission has started already")

    def _ensure_waiting_to_start(self) -> None:
        """Ensures that the mission is in the ``AUTHORIZED_TO_START`` state.

        Raises:
            RuntimeError: if the mission is not in the ``NEW`` state
        """
        if self.state != MissionState.AUTHORIZED_TO_START:
            raise RuntimeError("Mission is not in the 'authorized to start' state")

    def _handle_authorization(self) -> None:
        """Handles the event when the authorization state of the mission changes.

        Override this method in subclasses if you want to react to changes in
        the authorization state of the mission. The default implementation does
        nothing, therefore it is safe to override without calling the superclass
        method.
        """
        pass

    def _handle_start_time_change(self) -> None:
        """Handles the event when the scheduled start time of the mission
        changes.

        Override this method in subclasses if you want to react to changes in
        the scheduled start time of the mission. The default implementation does
        nothing, therefore it is safe to override without calling the superclass
        method.
        """
        pass

    @abstractmethod
    async def _run(self, log: Optional[Logger] = None) -> None:
        """Async function that runs the task corresponding to the mission.
        This function must be overridden in subclasses to add your own behaviour.

        Parameters:
            log: the logger that the mission should use to log events of interest
        """
        raise NotImplementedError

    def _update_parameters(self, parameters: dict[str, Any]) -> None:
        """Updates one or more parameters of the mission.

        This function _assumes_ that the mission is in the ``NEW`` state. It is
        illegal to call this function when the mission is in any other state.

        Override this method in subclasses if you want to add support for mission
        parameters. The default implementation does nothing, therefore it is
        safe to override without calling the superclass method.
        """
        pass

    def _validate_parameters(self, parameters: dict[str, Any]) -> None:
        """Validates one or more parameters of the mission according to the
        mission parameter validator defined automatically from the parameter
        schema of the given mission type.

        Args:
            parameters: the parameters to validate

        Raises:
            RuntimeError: on validation errors
        """
        if self.parameter_validator is None:
            return
        try:
            self.parameter_validator(parameters)
        except ValidationError as ex:
            raise RuntimeError(f"Mission parameter validation error: {ex}") from None


@dataclass
class MissionPlan:
    """Simple dataclass to model the object that a mission planner should
    return.
    """

    format: str
    """Format of the returned mission. This may indicate a Skybrush show
    file, an ArduPilot-style mission or anything else. It is the responsibility
    of the client to handle the payload depending on the format supplied here.
    """

    payload: Any = None
    """The description of the mission. For Skybrush shows, this can be the
    JSON representation of the show. For ArduPilot-style missions, this can be
    a string in the standard textual mission format supported by Mission Planner
    or QGroundControl.

    It is assumed that the payload is JSON-serializable.
    """

    EMPTY: ClassVar["MissionPlan"]
    """Default, "empty" mission plan that can be returned if no specific plan
    can be created for a mission but you still want to report the mission as
    plannable in advance.
    """

    @property
    def json(self):
        """Returns the JSON representation of the mission plan."""
        return {"format": self.format, "payload": self.payload}


MissionPlan.EMPTY = MissionPlan(format="empty")


T = TypeVar("T", bound=Mission)
"""Type variable representing a subclass of Mission_ that a given MissionType_
creates when asked to create a new mission instance.
"""


class MissionType(Generic[T], ABC):
    """Base class for mission types, i.e. classes that define how to create a
    mission plan to perform a given task and how to execute such a plan.

    New types of missions in the Skybrush server may be implemented by deriving
    a class from this base class and then registering it in the mission type
    registry.
    """

    @property
    @abstractmethod
    def description(self) -> str:
        """A longer, human-readable description of the mission type that can be
        used by clients for presentation purposes.
        """
        raise NotImplementedError

    @property
    @abstractmethod
    def name(self) -> str:
        """A human-readable name of the mission type that can be used by
        clients for presentation purposes.
        """
        raise NotImplementedError

    @abstractmethod
    def create_mission(self) -> T:
        """Creates a new mission with a default parameter set.

        Returns:
            a new mission instance
        """
        raise NotImplementedError

    def create_plan(
        self, parameters: dict[str, Any]
    ) -> Union[MissionPlan, Awaitable[MissionPlan]]:
        """Creates a new mission plan with the given planning parameters.

        Parameters:
            parameters: the parameters of the mission plan

        Returns:
            a mission plan or an awaitable that resolves to a mission plan in
            an asynchronous manner

        Raises:
            NotSupportedError: if the mission type does not support creating a
                mission plan
        """
        raise NotSupportedError("This mission type does not support planning")

    @abstractmethod
    def get_parameter_schema(self) -> dict[str, Any]:
        """Returns the JSON schema associated with general mission parameters.

        If you do not intend to use a schema, simply return an empty dictionary.
        Note that an empty dictionary is not a valid JSON schema; if you want to
        declare that you need no parameters, return ``{ "type": "object" }``
        instead.

        Returns:
            JSON schema of general mission parameters
        """
        raise NotImplementedError

    @abstractmethod
    def get_plan_parameter_schema(self) -> dict[str, Any]:
        """Returns the JSON schema associated with mission planning parameters.

        If you do not intend to use a schema or to support planning at all,
        simply return an empty dictionary. Note that an empty dictionary is not
        a valid JSON schema; if you want to declare that you need no parameters,
        return ``{ "type": "object" }``

        Returns:
            JSON schema of mission planning parameters
        """
        raise NotImplementedError

    def supports_planning(self) -> bool:
        """Returns whether the mission type supports returning a mission plan
        before the mission is executed, to be used by the ground station to
        inform the user what is going to happen during the mission.

        The default implementation of this property calls `create_plan()`
        and checks whether a `NotSupportedError` exception is raised or not.
        If `create_plan()` takes a long time in a subclass, you should override
        this function and return early on your own without calling `create_plan()`.
        """
        try:
            self.create_plan({})
        except NotSupportedError:
            return False
        except Exception:
            # Probably okay, it's just that the parameter schema does not match
            # what the function expects
            return True
        else:
            return True