Repository URL to install this package:
|
Version:
2.39.1 ▾
|
"""Model classes related to a single UAV."""
from __future__ import annotations
from abc import ABC, abstractmethod
from inspect import isawaitable
from typing import (
Any,
Awaitable,
Callable,
Generic,
Iterable,
Optional,
TypedDict,
Union,
TypeVar,
TYPE_CHECKING,
)
from flockwave.gps.vectors import GPSCoordinate, PositionXYZ, VelocityNED, VelocityXYZ
from flockwave.server.errors import NotSupportedError
from flockwave.server.logger import log as base_log
from flockwave.spec.schema import get_complex_object_schema
from .attitude import Attitude
from .battery import BatteryInfo
from .commands import Progress, ProgressEvents
from .devices import ObjectNode
from .error_set import ErrorSet
from .gps import GPSFix, GPSFixLike
from .log import FlightLogMetadata
from .metamagic import ModelMeta
from .mixins import TimestampLike, TimestampMixin
from .object import ModelObject, register
from .preflight import PreflightCheckInfo
from .transport import TransportOptions
from .utils import as_base64, scaled_by
if TYPE_CHECKING:
from flockwave.server.app import SkybrushServer
__all__ = (
"is_uav",
"PassiveUAVDriver",
"UAV",
"UAVBase",
"UAVDriver",
"UAVStatusInfo",
)
log = base_log.getChild("uav")
VersionInfo = dict[str, str]
"""Type alias for version information objects returned from UAVDriver, mapping
component names to version numbers
"""
TUAV = TypeVar("TUAV", bound="UAV")
"""Type variable that represents a UAV object."""
TDriver = TypeVar("TDriver", bound="UAVDriver")
"""Type variable that represents a UAV driver object."""
TResult = TypeVar("TResult")
"""Type variable that represents some unspecified result object."""
class BulkParameterUploadResponse(TypedDict, total=False):
"""Typed dictionary that is returned as a response for a PRM-SET-MANY
request.
"""
success: bool
"""Whether the bulk parameter upload succeeded. Always present."""
failed: list[str]
"""List of parameter names for which the bulk upload failed. Omitted if
the caller cannot provide the exact list of parameter names where the
upload failed.
"""
class UAVStatusInfo(TimestampMixin, metaclass=ModelMeta):
"""Class representing the status information available about a single
UAV.
"""
class __meta__:
schema = get_complex_object_schema("uavStatusInfo")
mappers = {"heading": scaled_by(10), "debug": as_base64}
debug: bytes
errors: ErrorSet
gps: GPSFix
heading: float
attitude: Optional[Attitude]
id: str
light: int
mode: str
position: GPSCoordinate
positionXYZ: Optional[PositionXYZ]
velocity: VelocityNED
velocityXYZ: Optional[VelocityXYZ]
battery: BatteryInfo
rssi: list[int]
def __init__(
self, id: Optional[str] = None, timestamp: Optional[TimestampLike] = None
):
"""Constructor.
Parameters:
id: ID of the UAV
timestamp: time when the status information was received. ``None``
means to use the current date and time. Integers represent
milliseconds elapsed since the UNIX epoch.
"""
TimestampMixin.__init__(self, timestamp)
self.debug = b""
self.errors = ErrorSet()
self.gps = GPSFix()
self.heading = 0.0
self.attitude = None
self.id = id # type: ignore
self.light = 0 # black
self.mode = ""
self.position = GPSCoordinate()
self.velocity = VelocityNED()
self.positionXYZ = None
self.velocityXYZ = None
self.battery = BatteryInfo()
self.rssi = []
@property
def position_xyz(self) -> Optional[PositionXYZ]:
return self.positionXYZ
@position_xyz.setter
def position_xyz(self, value: Optional[PositionXYZ]) -> None:
self.positionXYZ = value
@property
def velocity_xyz(self) -> Optional[VelocityXYZ]:
return self.velocityXYZ
@velocity_xyz.setter
def velocity_xyz(self, value: Optional[VelocityXYZ]) -> None:
self.velocityXYZ = value
@register("uav")
class UAV(ModelObject, ABC):
"""Abstract object that defines the interface of objects representing
UAVs.
"""
@property
@abstractmethod
def driver(self) -> "UAVDriver":
"""Returns the UAVDriver_ object that is responsible for handling
communication with this UAV.
"""
raise NotImplementedError
@property
@abstractmethod
def id(self) -> str:
"""A unique identifier for the UAV, assigned at construction
time.
"""
raise NotImplementedError
@property
@abstractmethod
def status(self) -> UAVStatusInfo:
"""Returns an UAVStatusInfo_ object representing the status of the
UAV.
"""
raise NotImplementedError
class UAVBase(UAV, Generic[TDriver]):
"""Base object for UAV implementations. Provides a default implementation
of the methods required by the UAV_ interface.
"""
def __init__(self, id: str, driver: TDriver):
"""Constructor.
Parameters:
id: the unique identifier of the UAV
driver: the driver that is responsible for handling communication
with this UAV.
"""
self._device_tree_node = ObjectNode()
self._driver = driver
self._id = id
self._status = UAVStatusInfo(id=id)
self._initialize_device_tree_node(self._device_tree_node)
@property
def device_tree_node(self) -> ObjectNode:
"""Returns the ObjectNode object that represents the root of the
device tree corresponding to the UAV.
Returns:
the node in the device tree where the subtree of the devices and
channels of the UAV is rooted
"""
return self._device_tree_node
@property
def driver(self) -> TDriver:
"""Returns the UAVDriver_ object that is responsible for handling
communication with this UAV.
"""
return self._driver
@property
def id(self) -> str:
"""A unique identifier for the UAV, assigned at construction
time.
"""
return self._id
@property
def status(self) -> UAVStatusInfo:
"""Returns an UAVStatusInfo_ object representing the status of the
UAV.
This property should be manipulated via the ``update_status()``
method.
"""
return self._status
def _initialize_device_tree_node(self, node: ObjectNode) -> None:
"""Initializes the device tree node of the UAV when it is
constructed.
This method will be called from the constructor. Subclasses may
override this method to provide a set of default devices for the
UAV.
Parameters:
node: the tree node whose subtree this call should initialize
"""
pass
def clear_errors(self) -> None:
"""Clears the error codes of the UAV."""
return self.update_status(errors=())
def clear_errors_up_to_and_including(self, code: int) -> None:
"""Clears all the error codes of the UAV that are less than or equal
to the given error code.
"""
if self._status.errors:
self.update_status(errors=(x for x in self._status.errors if x > code))
def convert_ahl_to_amsl(
self, altitude: float, *, current_ahl: Optional[float] = None
) -> float:
"""Converts an altitude given as altitude above home level to altitude
above mean sea level.
This function requires the drone to know its current AHL and AMSL so it
can calculate an offset between them. Alternatively, if the `current_ahl`
argument is not `None`, the given value is used as the current AHL.
Returns:
the given AHL altitude converted to AMSL
Raises:
RuntimeError: if the position of the UAV is not known yet
"""
if self._status is None:
raise RuntimeError("UAV status not known yet")
# TODO(ntamas): maybe we should use the position only if it has been
# updated recently. Or maybe not.
pos = self._status.position
if pos is None:
raise RuntimeError(
"Cannot convert AHL to AMSL, current position not known yet"
)
if pos.amsl is None:
raise RuntimeError("Cannot convert AHL to AMSL, current AMSL not known yet")
ahl = current_ahl if current_ahl is not None else pos.ahl
if ahl is None:
raise RuntimeError("Cannot convert AHL to AMSL, current AHL not known yet")
return altitude - ahl + pos.amsl
def ensure_error(self, code: int, present: bool = True) -> None:
"""Ensures that the given error code is present (or not present) in the
error code list.
This function does _not_ update the timestamp of the status information;
you need to do it on your own by calling `update_status()`.
Parameters:
code: the code to add or remove
present: whether to add the code (True) or remove it (False)
"""
self._status.errors.ensure(code, present)
def ensure_errors(self, codes: dict[int, bool]) -> None:
"""Updates multiple error codes with a single function call.
Parameters:
codes: dictionary mapping error codes to a boolean specifying
whether the error code should be present or absent
"""
self._status.errors.ensure_many(codes)
def touch_status(self) -> None:
"""Updates the timestamp of the status information of the UAV without
updating any of its fields.
This function can be used when we receive a message that is semantically
equivalent to a previous message but we want to remember that the UAV is
alive.
"""
self._status.update_timestamp()
def update_rssi(self, *, index: int, value: Optional[int] = None) -> None:
"""Updates the RSSI value of the UAV for the channel with the given
index.
Parameters:
index: the index of the channel
value: the new RSSI value in the range 0-100; -1 means "unknown",
and so is ``None``.
"""
value = min(100, max(-1, int(value))) if value is not None else -1
rssi = self._status.rssi
if len(rssi) <= index:
rssi.extend([-1] * (index - len(rssi) + 1))
rssi[index] = value
self._status.update_timestamp()
def update_status(
self,
*,
position: Optional[GPSCoordinate] = None,
position_xyz: Optional[PositionXYZ] = None,
velocity: Optional[VelocityNED] = None,
velocity_xyz: Optional[VelocityXYZ] = None,
heading: Optional[float] = None,
attitude: Optional[Attitude] = None,
mode: Optional[str] = None,
gps: Optional[GPSFixLike] = None,
battery: Optional[BatteryInfo] = None,
light: Optional[int] = None,
errors: Optional[Union[int, Iterable[int]]] = None,
debug: Optional[bytes] = None,
rssi: Optional[Union[int, Iterable[int]]] = None,
):
"""Updates the status information of the UAV.
Parameters with values equal to ``None`` are ignored.
Parameters:
position: the global (GPS) position of the UAV. It will be cloned to
ensure that modifying this position object from the caller will
not affect the UAV itself.
position_xyz: the position of the UAV in some local coordinate system.
It will be cloned to ensure that modifying this position object
from the caller will not affect the UAV itself.
velocity: the global (NED) velocity of the UAV. It will be cloned to
ensure that modifying this velocity object from the caller will
not affect the UAV itself.
velocity_xyz: the velocity of the UAV in some local coordinate system.
It will be cloned to ensure that modifying this position object
from the caller will not affect the UAV itself.
heading: the heading of the UAV, in degrees.
attitude: the attitude (roll, pitch, yaw) of the UAV, in degrees.
mode: the flight mode that the UAV is currently operating in
gps: information about the GPS fix of the UAV
battery: information about the status of the battery on the UAV.
It will be cloned to ensure that modifying this object from
the caller will not affect the UAV itself.
light: the color of the primary light of the UAV, in RGB565
encoding.
errors: the error code or error codes of the UAV; use an empty list
or tuple if the UAV has no errors
debug: additional debug information to store
rssi: the measured RSSI values for each of the channels the UAV is
accessible on.
"""
if position is not None:
self._status.position.update_from(position, precision=7)
if position_xyz is not None:
if self._status.position_xyz is None:
self._status.position_xyz = PositionXYZ()
self._status.position_xyz.update_from(position_xyz, precision=3)
if heading is not None:
# Heading is rounded to 2 digits; it is unlikely that more
# precision is needed and it saves space in the JSON
# representation
self._status.heading = round(heading % 360, 2)
if attitude is not None:
if self._status.attitude is None:
self._status.attitude = Attitude()
self._status.attitude.update_from(attitude)
if velocity is not None:
self._status.velocity.update_from(velocity, precision=2)
if velocity_xyz is not None:
if self._status.velocity_xyz is None:
self._status.velocity_xyz = VelocityXYZ()
self._status.velocity_xyz.update_from(velocity_xyz, precision=2)
if mode is not None:
self._status.mode = mode
if battery is not None:
self._status.battery.update_from(battery)
if light is not None:
self._status.light = int(light)
if errors is not None:
if isinstance(errors, int):
self._status.errors.set([errors] if errors > 0 else [])
else:
self._status.errors.set(code for code in errors if code > 0)
if rssi is not None:
if isinstance(rssi, int):
rssi = [rssi]
else:
rssi = list(rssi)
self._status.rssi = rssi
if gps is not None:
self._status.gps.update_from(gps)
if debug is not None:
self._status.debug = debug
self._status.update_timestamp()
class UAVDriver(Generic[TUAV], ABC):
"""Interface specification for UAV drivers that are responsible for
handling communication with a given group of UAVs via a common
communication channel (e.g., a radio or a wireless network).
Many of the methods in this class take a list of UAVs as an argument.
These lists contain the UAVs to address with a specific request from
the server, and it is the responsibility of the driver to translate
these requests to actual commands that the UAVs understand, and
transmit these commands to the UAVs. Implementors of methods receiving
a list of UAVs may reasonably assume that all the UAVs are managed by
this driver; it is the responsibility of the caller to ensure this.
The UAV lists are assumed to contain UAVs with unique IDs only.
These methods return a dictionary mapping UAVs to the results of
executing the operation on the UAV. The result should be ``True`` if
the operation succeeded, an object of type CommandExecutionStatus_ if
the operation has started executing but has not been finished yet;
anything else means failure. Failures should be denoted by strings
explaining the reason of the failure.
It is the responsibility of the implementor of these methods to ensure
that all the UAVs that appeared in the input UAV list are also mentioned
in the dictionary that is returned from the method.
"""
app: "SkybrushServer"
"""The Skybrush server application that hosts the driver."""
@staticmethod
def _execute(func, *args, **kwds):
"""Executes the given function with the given positional and keyword
arguments. When the function throws an exception, catches the exception
and returns it as an object instead of raising it. When the function is
asynchronous, returns the awaitable returned by the function.
"""
try:
return func(*args, **kwds)
except Exception as ex:
return ex
def __init__(self):
"""Constructor."""
self.app = None # type: ignore
def calibrate_component(self, uavs: list[TUAV], component: str):
"""Asks the driver to calibrate the given component on the given UAVs.
Typically, you don't need to override this method when implementing
a driver; override ``_calibrate_component_single()`` instead.
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs,
"calibration request",
self._calibrate_component_single,
component=component,
)
def enter_low_power_mode(
self, uavs: list[TUAV], transport: Optional[TransportOptions] = None
):
"""Asks the driver to send a signal to the given UAVs to enter low-power
mode. Each of the UAVs are assumed to be managed by this driver.
Typically, you don't need to override this method when implementing
a driver; override ``_enter_low_power_mode_single()`` and
optionally ``_enter_low_power_mode_broadcast()`` instead.
Parameters:
uavs: the UAVs to address with this request.
transport: transport options for sending the signal
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs,
"low-power mode request",
self._enter_low_power_mode_single,
getattr(self, "_enter_low_power_mode_broadcast", None),
transport=transport,
)
def get_log(self, uav: TUAV, log_id: str):
"""Asks the driver to retrieve the log with the given ID from the
given UAV.
Returns:
the log contents along with its metadata
"""
raise NotImplementedError
def get_log_list(self, uavs: list[TUAV]):
"""Asks the driver to retrieve the list of available logs from the
given UAVs.
Typically, you don't need to override this method when implementing
a driver; override ``_get_log_list_single()`` instead.
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs, "log listing request", self._get_log_list_single
)
def get_parameter(self, uavs: list[TUAV], name: str):
"""Asks the driver to retrieve the current value of a parameter from
the given UAVs.
Typically, you don't need to override this method when implementing
a driver; override ``_get_parameter_single()`` instead.
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs, "parameter retrieval", self._get_parameter_single, name=name
)
def request_preflight_report(self, uavs: list[TUAV]):
"""Asks the driver to request a detailed report about the status of
preflight checks on the given UAVs.
Typically, you don't need to override this method when implementing
a driver; override ``_request_preflight_report_single()`` instead.
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs,
"preflight report request",
self._request_preflight_report_single,
)
def request_version_info(self, uavs: list[TUAV]):
"""Asks the driver to request detailed version information from the
given UAVs.
Typically, you don't need to override this method when implementing
a driver; override ``_request_version_info_single()`` instead.
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs, "version info request", self._request_version_info_single
)
def resume_from_low_power_mode(
self, uavs: list[TUAV], transport: Optional[TransportOptions] = None
):
"""Asks the driver to send a signal to the given UAVs to resume normal
operation from low-power mode. Each of the UAVs are assumed to be
managed by this driver.
Typically, you don't need to override this method when implementing
a driver; override ``_resume_from_low_power_mode_single()`` and
optionally ``_resume_from_low_power_mode_broadcast()`` instead.
Parameters:
uavs: the UAVs to address with this request.
transport: transport options for sending the signal
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs,
"wakeup request",
self._resume_from_low_power_mode_single,
getattr(self, "_resume_from_low_power_mode_broadcast", None),
transport=transport,
)
def send_command(self, uavs: list[TUAV], command: str, args=None, kwds=None):
"""Asks the driver to send a direct command to the given UAVs, each
of which are assumed to be managed by this driver.
The default implementation of this method passes on each command
to the ``handle_multi_command_{command}()`` method where ``{command}``
is replaced by the command argument. The method will be called with
the command manager and the list of UAVs, further extended with the
given positional and keyword arguments. When such a method does not
exist, the handling of the command is forwarded to the
``handle_command_{command}()`` method instead, one by one for each UAV
that is targeted. If this method does not exist either, the implementation
will call the ``handle_multi_generic_command()`` method or the
``handle_generic_command()`` method instead, whose signatures should
match the signature of ``send_command()`` (but of course the latter
variant will receive a single UAV only). When none of these four
methods exist, the default implementation simply throws a
NotSupportedError_ exception.
This function will return immediately, but the return value of the
handler methods described above may be awaitables for some UAVs if the
execution of the command takes a longer time. Awaitables should be
handled by the caller appropriately. In particular, each awaitable
must be awaited for by the caller.
Parameters:
uavs: the UAVs to address with this request.
command: the command to send to the UAVs
args (list): the list of positional arguments for the command
(if the driver supports positional arguments)
kwds (dict): the keyword arguments for the command (if the
driver supports keyword arguments)
Returns:
dict[UAV,object]: dict mapping UAVs to the corresponding results.
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
args = [] if args is None else args
kwds = {} if kwds is None else kwds
# Validate the command and the arguments. If the driver knows that
# it won't be able to execute the command, it may return an error
# message here
error = self.validate_command(command, args, kwds)
if error:
return dict.fromkeys(uavs, error)
# Figure out whether we will execute the commands for all the UAVs
# at the same time, or one by one, depending on what is implemented
# by the driver or not
handlers = [
(f"handle_multi_command_{command}", False, True),
(f"handle_command_{command}", False, False),
("handle_generic_multi_command", True, True),
("handle_generic_command", True, False),
]
for func_name, generic, multi in handlers: # noqa: B007
func = getattr(self, func_name, None)
if func is not None:
break
else:
raise NotSupportedError
if multi:
# Driver knows how to execute the command for multiple UAVs
# at the same time
if generic:
return self._execute(func, uavs, command, args, kwds)
else:
return self._execute(func, uavs, *args, **kwds)
else:
# Driver can execute the command for a single UAV only so we need
# to loop
result = {}
if generic:
result = {
uav: self._execute(func, uav, command, args, kwds) for uav in uavs
}
else:
result = {uav: self._execute(func, uav, *args, **kwds) for uav in uavs}
return result
def send_fly_to_target_signal(self, uavs: list[TUAV], target: GPSCoordinate):
"""Asks the driver to send a signal to the given UAVs that makes them
fly to a given target coordinate. Every UAV passed as an argument is
assumed to be managed by this driver.
Typically, you don't need to override this method when implementing
a driver; override ``_send_fly_to_target_signal_single()`` instead.
Parameters:
uavs: the UAVs to address with this request
target: the target to fly to; the altitude above
home level may be set to `None` to indicate the current
altitude of UAVs. Altitude above ground is not supported yet.
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs,
"fly to target signal",
self._send_fly_to_target_signal_single,
target=target,
)
def send_hover_signal(
self,
uavs: list[TUAV],
*,
transport: Optional[TransportOptions] = None,
):
"""Asks the driver to send a signal to the given UAVs in order to
request them to hover in place as soon as possible.
Typically, you don't need to override this method when implementing
a driver; override ``_send_hover_signal_single()`` and optionally
``_send_hover_signal_broadcast()`` instead.
Parameters:
uavs: the UAVs to address with this request
transport: transport options for sending the signal
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs,
"position hold signal",
self._send_hover_signal_single,
getattr(self, "_send_hover_signal_broadcast", None),
transport=transport,
)
def send_landing_signal(
self, uavs: list[TUAV], transport: Optional[TransportOptions] = None
):
"""Asks the driver to send a landing signal to the given UAVs, each
of which are assumed to be managed by this driver.
Typically, you don't need to override this method when implementing
a driver; override ``_send_landing_signal_single()`` and optionally
``_send_landing_signal_broadcast()`` instead.
Parameters:
uavs: the UAVs to address with this request.
transport: transport options for sending the signal
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs,
"landing signal",
self._send_landing_signal_single,
getattr(self, "_send_landing_signal_broadcast", None),
transport=transport,
)
def send_light_or_sound_emission_signal(
self,
uavs: list[TUAV],
signals: list[str],
duration: int,
transport: Optional[TransportOptions] = None,
):
"""Asks the driver to send a light or sound emission signal to the
given UAVs, each of which are assumed to be managed by this driver.
Typically, you don't need to override this method when implementing
a driver; override ``_send_light_or_sound_emission_signal_single()``
and optionally ``_send_light_or_sound_emission_signal_broadcast()``
instead.
Parameters:
uavs: the UAVs to address with this request.
signals: the list of signal types that the targeted UAVs should
emit (e.g., 'sound', 'light')
duration: the duration of the required signal in milliseconds
transport: transport options for sending the signal
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs,
"light or sound emission signal",
self._send_light_or_sound_emission_signal_single,
getattr(self, "_send_light_or_sound_emission_signal_broadcast", None),
signals=signals,
duration=duration,
transport=transport,
)
def send_motor_start_stop_signal(
self,
uavs: list[TUAV],
start: bool = False,
force: bool = False,
transport: Optional[TransportOptions] = None,
):
"""Asks the driver to send a signal to start or stop the motors of the
given UAVs, each of which are assumed to be managed by this driver.
Typically, you don't need to override this method when implementing
a driver; override ``_send_motor_start_stop_signal_single()`` and
optionally ``_send_motor_start_stop_signal_broadcast()`` instead.
Parameters:
uavs: the UAVs to address with this request.
start: whether the motors should be started (`True`) or stopped
(`False`)
force: whether to force the execution of the command even if it is
unsafe (e.g., stopping the motors while airborne)
transport: transport options for sending the signal
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs,
"motor start signal" if start else "motor stop signal",
self._send_motor_start_stop_signal_single,
getattr(self, "_send_motor_start_stop_signal_broadcast", None),
start=start,
force=force,
transport=transport,
)
def send_reset_signal(
self,
uavs: list[TUAV],
*,
component: Optional[str] = None,
transport: Optional[TransportOptions] = None,
):
"""Asks the driver to send a reset signal to the given UAVs in order
to restart some component of the UAV or the whole UAV itself.
Typically, you don't need to override this method when implementing
a driver; override ``_send_reset_signal_single()`` and optionally
``_send_reset_signal_broadcast()`` instead.
Parameters:
uavs: the UAVs to address with this request.
component: the component to reset. ``None`` or an empty string means
to reset the entire UAV.
transport: transport options for sending the signal
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs,
"reset signal",
self._send_reset_signal_single,
getattr(self, "_send_reset_signal_broadcast", None),
component=str(component or ""),
transport=transport,
)
def send_return_to_home_signal(
self, uavs: list[TUAV], transport: Optional[TransportOptions] = None
):
"""Asks the driver to send a return-to-home signal to the given
UAVs, each of which are assumed to be managed by this driver.
Typically, you don't need to override this method when implementing
a driver; override ``_send_return_to_home_signal_single()`` and
optionally ``_send_return_to_home_signal_broadcast()`` instead.
Parameters:
uavs: the UAVs to address with this request.
transport: transport options for sending the signal
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs,
"return to home signal",
self._send_return_to_home_signal_single,
getattr(self, "_send_return_to_home_signal_broadcast", None),
transport=transport,
)
def send_shutdown_signal(
self, uavs: list[TUAV], transport: Optional[TransportOptions] = None
):
"""Asks the driver to send a shutdown signal to the given UAVs, each
of which are assumed to be managed by this driver.
Typically, you don't need to override this method when implementing
a driver; override ``_send_shutdown_signal_single()`` and optionally
``_send_shutdown_signal_broadcast()`` instead.
Parameters:
uavs: the UAVs to address with this request
transport: transport options for sending the signal
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs,
"shutdown signal",
self._send_shutdown_signal_single,
getattr(self, "_send_shutdown_signal_broadcast", None),
transport=transport,
)
def send_takeoff_signal(
self,
uavs: list[TUAV],
*,
scheduled: bool = False,
transport: Optional[TransportOptions] = None,
):
"""Asks the driver to send a takeoff signal to the given UAVs, each
of which are assumed to be managed by this driver.
Typically, you don't need to override this method when implementing
a driver; override ``_send_takeoff_signal_single()`` and optionally
``_send_takeoff_signal_broadcast()`` instead.
Parameters:
uavs: the UAVs to address with this request
scheduled: whether the takeoff signal was scheduled earlier and is
now issued autonomously by the server
transport: transport options for sending the signal
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs,
"takeoff signal",
self._send_takeoff_signal_single,
getattr(self, "_send_takeoff_signal_broadcast", None),
scheduled=scheduled,
transport=transport,
)
def set_parameter(self, uavs: list[TUAV], name: str, value: Any):
"""Asks the driver to set the value of a parameter on the given UAVs.
Typically, you don't need to override this method when implementing
a driver; override ``_set_parameter_single()`` instead.
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs,
"parameter upload",
self._set_parameter_single,
name=name,
value=value,
)
def set_parameters(self, uavs: list[TUAV], parameters: dict[str, Any]):
"""Asks the driver to set the value of multiple parameters on the given UAVs.
Typically, you don't need to override this method when implementing
a driver; override ``_set_parameters_single()`` instead.
Returns:
dict mapping UAVs to the corresponding results (which may also be
errors or awaitables; it is the responsibility of the caller to
evaluate errors and wait for awaitables)
"""
return self._dispatch_request(
uavs,
"bulk parameter upload",
self._set_parameters_single,
parameters=parameters,
)
def validate_command(self, command: str, args, kwds) -> Optional[str]:
"""Checks whether the driver could execute the command on the UAVs
_in principle_, without knowing which UAVs the command will be sent to.
This function can be used to bail out early from sending commands if
we know already that the driver will not support the command.
For instance, if the UAV commands do not support keyword arguments,
you can check that here and bail out early if keyword arguments were
supplied.
The default implementation does nothing; no need to call it from
subclasses.
Parameters:
command: the command to send to the UAVs
args: the list of positional arguments for the command
(if the driver supports positional arguments)
kwds: the keyword arguments for the command (if the
driver supports keyword arguments)
Returns:
an error message if the command cannot be executed, or `None` if
the command can be executed
"""
pass
def _dispatch_request(
self,
uavs: list[TUAV],
request_name: str,
handler: Callable[..., TResult],
broadcaster: Optional[Callable[..., TResult]] = None,
**kwds,
) -> Union[TResult, Exception, dict[TUAV, Union[Exception, TResult]]]:
"""Common implementation for the body of several ``send_*_signal()``
and similar methods in this class.
The primary purpose of this function is to handle the most common case
when a single operation has to be performed for a list of UAVs. The
function assumes that there is a dedicated _handler_ function (either
sync or async) that can perform the operation for a _single_ UAV, and
optionally another, _broadcaster_ function that can perform the operation
by broadcasting a message to all affected UAVs. The results from the
individual handlers are then merged into a dictionary consisting of
results (for UAVs with sync handlers) and awaitables (for UAVs with
async handlers).
Arguments:
uavs: the list of UAVs to dispatch the request to
request_name: name of the request (operation) for logging purposes,
also to be used in error messages
handler: the handler function that can perform the operation for a
single UAV
broadcaster: the broadcaster function that can perform the operation
by broadcasting a message
Returns:
dictionary mapping UAVs to the corresponding result objects or
awaitables, or a single result object if broadacsting was used
"""
result = {}
# Determine whether we need to broadcast this signal
transport = kwds.get("transport")
if transport:
should_broadcast = TransportOptions.is_broadcast(transport)
else:
should_broadcast = False
if should_broadcast:
# We need to broadcast. Do we have a separate function for broadcasting?
if broadcaster is not None:
try:
outcome = broadcaster(**kwds)
except NotImplementedError:
outcome = NotImplementedError(
f"Broadcasting {request_name} not implemented yet"
)
except NotSupportedError as ex:
outcome = NotSupportedError(
str(ex) or f"Broadcasting {request_name} not supported"
)
except RuntimeError as ex:
outcome = RuntimeError(
f"Error while broadcasting {request_name}: {str(ex)}"
)
except Exception as ex:
log.exception(ex)
outcome = ex.__class__(
f"Unexpected error while broadcasting {request_name}: {ex!r}"
)
return outcome
# No separate function for broadcasting. Can we replace the call
# with unicast calls?
elif TransportOptions.should_ignore_ids(transport):
# No, because we must ignore whatever IDs were submitted. In
# this case we should return a "not supported" error
return NotSupportedError(f"Broadcasting {request_name} not supported")
# Fallthrough, continuing with unicast messages
# We need to send this command one by one to all the UAVs
for uav in uavs:
try:
outcome = handler(uav, **kwds)
except NotImplementedError:
outcome = NotImplementedError(f"{request_name} not implemented yet")
except NotSupportedError as ex:
outcome = NotSupportedError(str(ex) or f"{request_name} not supported")
except RuntimeError as ex:
outcome = RuntimeError(f"Error while sending {request_name}: {str(ex)}")
except Exception as ex:
log.exception(ex)
outcome = ex.__class__(
f"Unexpected error while sending {request_name}: {ex!r}"
)
result[uav] = outcome
return result
def _calibrate_component_single(self, uav: TUAV, component: str):
"""Asks the driver to calibrate a component of a single UAV managed by
this driver.
May return an awaitable if preparing the result takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all". It means that if it returns, the operation succeeded.
Raise an exception if the operation cannot be executed for any reason;
a RuntimeError is typically sufficient.
Parameters:
uav: the UAV to address with this request.
component: the component to calibrate.
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
if hasattr(uav, "calibrate_component"):
return uav.calibrate_component(component) # type: ignore
else:
raise NotSupportedError
def _enter_low_power_mode_single(
self, uav: TUAV, *, transport: Optional[TransportOptions] = None
) -> Union[None, Awaitable[None]]:
"""Asks the driver to request a single UAV to switch to low-power mode.
May return an awaitable if sending the request takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all". It means that if it returns, the operation succeeded.
Raise an exception if the operation cannot be executed for any reason;
a RuntimeError is typically sufficient.
Parameters:
uav: the UAV to address with this request.
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
# Default is NotSupportedError because it is not that common for UAVs
# to support low-power mode
raise NotSupportedError
def _get_log_list_single(
self, uav: TUAV
) -> Union[list[FlightLogMetadata], Awaitable[list[FlightLogMetadata]]]:
"""Asks the driver to retrieve the list of flight logs from a single
UAV managed by this driver.
May return an awaitable if preparing the result takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all". It means that if it returns, the operation succeeded.
Raise an exception if the operation cannot be executed for any reason;
a RuntimeError is typically sufficient.
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
raise NotImplementedError
def _get_parameter_single(self, uav: TUAV, name: str) -> Any:
"""Asks the driver to retrieve the value of a parameter with the given
name from a single UAV managed by this driver.
May return an awaitable if preparing the result takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all". It means that if it returns, the operation succeeded.
Raise an exception if the operation cannot be executed for any reason;
a RuntimeError is typically sufficient.
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
raise NotImplementedError
def _resume_from_low_power_mode_single(
self, uav: TUAV, *, transport: Optional[TransportOptions] = None
) -> Union[None, Awaitable[None]]:
"""Asks the driver to resume normal operation for a a single UAV that is
now in low-power mode.
May return an awaitable if sending the request takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all". It means that if it returns, the operation succeeded.
Raise an exception if the operation cannot be executed for any reason;
a RuntimeError is typically sufficient.
Parameters:
uav: the UAV to address with this request.
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
# Default is NotSupportedError because it is not that common for UAVs
# to support low-power mode
raise NotSupportedError
def _request_preflight_report_single(
self, uav: TUAV
) -> Union[PreflightCheckInfo, Awaitable[PreflightCheckInfo]]:
"""Asks the driver to return a detailed report about the results of the
preflight checks for a single UAV managed by this driver.
May return an awaitable if preparing the result takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all". It means that if it returns, the operation succeeded.
Raise an exception if the operation cannot be executed for any reason;
a RuntimeError is typically sufficient.
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
raise NotImplementedError
def _request_version_info_single(
self, uav: TUAV
) -> Union[VersionInfo, Awaitable[VersionInfo]]:
"""Asks the driver to return a mapping from component names to the
corresponding version numbers for a single UAV managed by this driver.
May return an awaitable if preparing the result takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all". It means that if it returns, the operation succeeded.
Raise an exception if the operation cannot be executed for any reason;
a RuntimeError is typically sufficient.
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
raise NotImplementedError
def _send_fly_to_target_signal_single(
self, uav: TUAV, target: GPSCoordinate
) -> Union[None, Awaitable[None]]:
"""Asks the driver to send a "fly to target" signal to a single UAV
managed by this driver.
May return an awaitable if sending the signal takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all". It means that if it returns, the operation succeeded.
Raise an exception if the operation cannot be executed for any reason;
a RuntimeError is typically sufficient.
Parameters:
uav: the UAV to address with this request.
target: the target to fly to; the altitude above
home level may be set to `None` to indicate the current
altitude. Altitude above ground is not supported yet.
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
raise NotImplementedError
def _send_hover_signal_single(
self, uav: TUAV, *, transport: Optional[TransportOptions] = None
) -> Union[None, Awaitable[None]]:
"""Asks the driver to send a position hold signal to a single UAV
managed by this driver.
May return an awaitable if sending the signal takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all". It means that if it returns, the operation succeeded.
Raise an exception if the operation cannot be executed for any reason;
a RuntimeError is typically sufficient.
Parameters:
uav: the UAV to address with this request
transport: transport options for sending the signal
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
raise NotImplementedError
def _send_landing_signal_single(
self, uav: TUAV, *, transport: Optional[TransportOptions] = None
) -> Union[None, Awaitable[None]]:
"""Asks the driver to send a landing signal to a single UAV managed
by this driver.
May return an awaitable if sending the signal takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all". It means that if it returns, the operation succeeded.
Raise an exception if the operation cannot be executed for any reason;
a RuntimeError is typically sufficient.
Parameters:
uav: the UAV to address with this request.
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
raise NotImplementedError
def _send_light_or_sound_emission_signal_single(
self,
uav: TUAV,
signals: list[str],
duration: int,
*,
transport: Optional[TransportOptions] = None,
) -> Union[None, Awaitable[None]]:
"""Asks the driver to send a light or sound emission signal to a
single UAV managed by this driver.
May return an awaitable if sending the signal takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all", with one exception. The specification says that signal
names may be defined and extended by the user without modifying the
formal protocol specification, and implementations should accept any
signal name and simply not respond to signals that they do not know.
Therefore, it is valid to return even if no visible light or audio
signal was emitted by the UAV as long as no other errors happened.
Raise an exception if the operation cannot be executed for any other
reason; a RuntimeError is typically sufficient.
Parameters:
uav: the UAV to address with this request.
signals: the list of signal types that the targeted UAV should emit
(e.g., 'sound', 'light')
duration: the duration of the required signal in milliseconds
transport: transport options for sending the signal
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
raise NotImplementedError
def _send_motor_start_stop_signal_single(
self,
uav: TUAV,
start: bool,
force: bool = False,
*,
transport: Optional[TransportOptions] = None,
) -> Union[None, Awaitable[None]]:
"""Asks the driver to send a signal to start or stop the motors of the
given UAVs, each of which are assumed to be managed by this driver.
May return an awaitable if sending the signal takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all". It means that if it returns, the operation succeeded.
Raise an exception if the operation cannot be executed for any reason;
a RuntimeError is typically sufficient.
Parameters:
uav: the UAV to address with this request.
start: whether the motors should be started (`True`) or stopped
(`False`)
force: whether to force the execution of the command even if it is
unsafe (e.g., stopping the motors while airborne)
transport: transport options for sending the signal
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
raise NotImplementedError
def _send_reset_signal_single(
self, uav: TUAV, component: str, *, transport: Optional[TransportOptions] = None
) -> Union[None, Awaitable[None]]:
"""Asks the driver to send a reset signal to a single UAV managed by
this driver.
May return an awaitable if sending the signal takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all". It means that if it returns, the operation succeeded.
Raise an exception if the operation cannot be executed for any reason;
a RuntimeError is typically sufficient.
Parameters:
uav: the UAV to address with this request.
component: the component to reset; an empty string means that the
entire UAV should be reset.
transport: transport options for sending the signal
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
raise NotImplementedError
def _send_return_to_home_signal_single(
self, uav: TUAV, *, transport: Optional[TransportOptions] = None
) -> Union[None, Awaitable[None]]:
"""Asks the driver to send a return-to-home signal to a single UAV
managed by this driver.
May return an awaitable if sending the signal takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all". It means that if it returns, the operation succeeded.
Raise an exception if the operation cannot be executed for any reason;
a RuntimeError is typically sufficient.
Parameters:
uav: the UAV to address with this request.
transport: transport options for sending the signal
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
raise NotImplementedError
def _send_shutdown_signal_single(
self, uav: TUAV, *, transport: Optional[TransportOptions] = None
) -> Union[None, Awaitable[None]]:
"""Asks the driver to send a shutdown signal to a single UAV managed
by this driver.
May return an awaitable if sending the signal takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all". It means that if it returns, the operation succeeded.
Raise an exception if the operation cannot be executed for any reason;
a RuntimeError is typically sufficient.
Parameters:
uav: the UAV to address with this request.
transport: transport options for sending the signal
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
raise NotImplementedError
def _send_takeoff_signal_single(
self,
uav: TUAV,
*,
scheduled: bool = False,
transport: Optional[TransportOptions] = None,
) -> Union[None, Awaitable[None]]:
"""Asks the driver to send a takeoff signal to a single UAV managed
by this driver.
May return an awaitable if sending the signal takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all". It means that if it returns, the operation succeeded.
Raise an exception if the operation cannot be executed for any reason;
a RuntimeError is typically sufficient.
Parameters:
uav: the UAV to address with this request.
transport: transport options for sending the signal
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
raise NotImplementedError
def _set_parameter_single(
self, uav: TUAV, name: str, value: Any
) -> Union[None, Awaitable[None]]:
"""Asks the driver to set the value of a parameter with the given
name for a single UAV managed by this driver.
May return an awaitable if preparing the result takes a longer time.
The function follows the "samurai principle", i.e. "return victorious,
or not at all". It means that if it returns, the operation succeeded.
Raise an exception if the operation cannot be executed for any reason;
a RuntimeError is typically sufficient.
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
"""
raise NotImplementedError
async def _set_parameters_single(
self, uav: TUAV, parameters: dict[str, Any]
) -> ProgressEvents[BulkParameterUploadResponse]:
"""Asks the driver to set the values of multiple parameters for a single
UAV managed by this driver.
May return an awaitable if preparing the result takes a longer time.
Since we are dealing with multiple parameters here that may be changed
independently, the function attempts to return even if some of the
parameters have not been uploaded successfully. The result conveys
more detailed information about whether the upload succeeded or not.
The default implementation falls back to multiple calls to
`_set_parameter_single()` in a sequential manner, sorted by keys in
alphabetical order. Override this function in concrete driver
implementations if you have a more efficient method for bulk parameter
uploads.
The default implementation always returns an awaitable as we have no
way of knowing whether `_set_parameter_single()` is sync or async
without calling it.
Raises:
NotImplementedError: if the operation is not supported by the
driver yet, but there are plans to implement it
NotSupportedError: if the operation is not supported by the
driver and will not be supported in the future either
Returns:
a dictionary with a `success` key that contains whether all the
parameters have been uploaded successfully, and an optional `failed`
key that contains the list of parameter names where the upload
failed.
"""
failed: list[str] = []
num_params = len(parameters)
last_percentage = -1
for index, name in enumerate(sorted(parameters.keys())):
value = parameters[name]
try:
result = self._set_parameter_single(uav, name, value)
if isawaitable(result):
await result
except Exception:
failed.append(name)
percentage = int((index + 1) / num_params * 100)
if percentage > last_percentage:
yield Progress(percentage=percentage)
last_percentage = percentage
yield {"success": not failed, "failed": failed}
class PassiveUAV(UAVBase):
pass
class PassiveUAVDriver(UAVDriver[PassiveUAV]):
"""Implementation of an UAVDriver_ for passive UAV objects that do not
support responding to commands.
"""
def __init__(self, uav_factory: Callable[..., PassiveUAV] = PassiveUAV):
"""Constructor.
Parameters:
uav_factory (callable): callable that creates a new UAV managed by
this driver.
"""
super(PassiveUAVDriver, self).__init__()
self._uav_factory = uav_factory
def _create_uav(self, id: str) -> PassiveUAV:
"""Creates a new UAV that is to be managed by this driver.
Parameters:
id: the string identifier of the UAV to create
Returns:
an appropriate UAV object
"""
return self._uav_factory(id, driver=self)
def get_or_create_uav(self, id: str) -> PassiveUAV:
"""Retrieves the UAV with the given ID, or creates one if
the driver has not seen a UAV with the given ID.
Parameters:
id: the identifier of the UAV to retrieve
Returns:
an appropriate UAV object
"""
assert self.app is not None
return self.app.object_registry.add_if_missing(id, factory=self._create_uav)
def _dispatch_request(
self, uavs: Iterable[UAV], request_name: str, handler, broadcaster=None, **kwds
) -> dict[UAV, Any]:
error = RuntimeError("{0} not supported".format(request_name))
return dict.fromkeys(uavs, error)
def is_uav(x: Any) -> bool:
"""Returns whether the given object is a UAV."""
return isinstance(x, UAV)