Repository URL to install this package:
|
Version:
2.38.1 ▾
|
"""Driver class for virtual drones."""
from __future__ import annotations
from colour import Color
from enum import Enum
from hashlib import sha1 as firmware_hash
from math import atan2, cos, hypot, radians, sin
from random import random, randint, choice
from time import monotonic
from trio import CancelScope, sleep
from trio_util import periodic
from typing import Any, Callable, Collection, NoReturn, Optional, Union
from flockwave.concurrency import delayed
from flockwave.ext.manager import ExtensionAPIProxy
from flockwave.gps.vectors import (
FlatEarthCoordinate,
GPSCoordinate,
FlatEarthToGPSCoordinateTransformation,
Vector3D,
VelocityNED,
)
from flockwave.server.command_handlers import (
create_calibration_command_handler,
create_color_command_handler,
create_parameter_command_handler,
create_test_command_handler,
create_version_command_handler,
)
from flockwave.server.errors import NotSupportedError
from flockwave.server.model.commands import (
Progress,
Suspend,
ProgressEventsWithSuspension,
)
from flockwave.server.model.devices import ObjectNode
from flockwave.server.model.gps import GPSFixType
from flockwave.server.model.log import FlightLog, FlightLogKind, FlightLogMetadata
from flockwave.server.model.mission import MissionItemBundle
from flockwave.server.model.preflight import PreflightCheckResult, PreflightCheckInfo
from flockwave.server.model.transport import TransportOptions
from flockwave.server.model.uav import VersionInfo, UAVBase, UAVDriver
from flockwave.server.show import (
get_coordinate_system_from_show_specification,
get_light_program_from_show_specification,
TrajectoryPlayer,
TrajectorySpecification,
)
from flockwave.server.utils import color_to_rgb565
from flockwave.spec.errors import FlockwaveErrorCode
from .battery import VirtualBattery
from .fw_upload import FIRMWARE_UPDATE_TARGET_ID
from .lights import DefaultLightController
__all__ = ("VirtualUAVDriver",)
class VirtualUAVState(Enum):
"""Enum class that represents the possible states of a virtual UAV."""
LANDED = 0
TAKEOFF = 1
AIRBORNE = 2
LANDING = 3
#: Dummy preflight check information object returned from all virtual UAVs
_dummy_preflight_check_info = PreflightCheckInfo()
_dummy_preflight_check_info.add_item("accel", "Accelerometer")
_dummy_preflight_check_info.add_item("compass", "Compass")
_dummy_preflight_check_info.add_item("ekf", "EKF")
_dummy_preflight_check_info.add_item("gps", "GPS fix")
_dummy_preflight_check_info.add_item("gyro", "Gyroscope")
_dummy_preflight_check_info.add_item("pressure", "Pressure sensor")
for item in _dummy_preflight_check_info.items:
item.result = PreflightCheckResult.PASS
_dummy_preflight_check_info.update_summary()
class VirtualUAV(UAVBase):
"""Model object representing a virtual UAV provided by this extension.
The virtual UAV will follow a given target position by default. The UAV may
be given a new target position (and a new altitude), in which case it will
approach the new target with a reasonable maximum velocity. No attempts are
made to make the acceleration realistic.
"""
_armed: bool
_autopilot_initializing: bool
_light_controller: DefaultLightController
_mission_started_at: Optional[float]
_motors_running: bool
_parameters: dict[str, str]
_position_xyz: Vector3D
_position_flat: FlatEarthCoordinate
_request_shutdown: Optional[Callable[[], None]]
_shutdown_reason: Optional[str]
_sleeping: bool
_trajectory_transformation: Optional[FlatEarthToGPSCoordinateTransformation]
_trans: FlatEarthToGPSCoordinateTransformation
_velocity_xyz: Vector3D
_velocity_ned: VelocityNED
_state: VirtualUAVState
"""The state of the UAV."""
_target: Optional[GPSCoordinate]
"""The target coordinates of the UAV, in geodetic coordinates."""
_target_xyz: Optional[Vector3D]
"""The target coordinates of the UAV, in topocentric coordinates."""
_user_defined_error: Optional[int]
"""User defined simulated error code, if any."""
_version: int
"""Simulated version number of the UAV, used in the output of the `version`
command.
"""
_version_hash: bytes = b"\x00" * 32
"""Hash of the last (simulated) firmware that was uploaded to the drone."""
boots_armed: bool
"""Whether the drone boots in an armed state."""
max_acceleration_xy: float
"""The maximum acceleration of the UAV in the X-Y plane (parallel to the
surface of the Earth), in m/s/s. To simplify the simulation a bit, the
virtual UAVs are capable of infinite deceleration.
"""
max_acceleration_z: float
"""The maximum acceleration of the UAV in the Z direction (perpendicular to
the surface of the Earth), in m/s/s. To simplify the simulation a bit, the
virtual UAVs are capable of infinite deceleration.
"""
max_velocity_xy: float
"""The maximum velocity of the UAV along the X-Y plane (parallel to the
surface of the Earth), in m/s.
"""
max_velocity_z: float
"""The maximum velocity of the UAV along the Z direction (perpendicular to
the surface of the Earth), in m/s.
"""
takeoff_altitude: float
use_battery_percentage: bool
"""Whether the virtual battery of the UAV reports percentages (True) or only
voltages (False).
"""
radiation_ext: Optional[ExtensionAPIProxy] = None
def __init__(self, *args, use_battery_percentage: bool, **kwds):
self.use_battery_percentage = use_battery_percentage
super().__init__(*args, **kwds)
self._version = 1
self._armed = True # will be disarmed when booting if needed
self._autopilot_initializing = False
self._home_amsl = None
self._light_controller = DefaultLightController(self)
self._mission_items = None
self._mission_started_at = None
self._motors_running = False
self._parameters = {}
self._position_xyz = Vector3D()
self._position_flat = FlatEarthCoordinate()
self._sleeping = False
self._state = None # type: ignore
self._target_xyz = None
self._trajectory = None
self._trajectory_player = None
self._trajectory_transformation = None
self._trans = FlatEarthToGPSCoordinateTransformation(
origin=GPSCoordinate(lat=0, lon=0)
)
self._user_defined_error = None
self._velocity_xyz = Vector3D()
self._velocity_ned = VelocityNED()
self._request_shutdown = None
self._shutdown_reason = None
self.boots_armed = False
self.max_acceleration_xy = 4
self.max_acceleration_z = 1
self.max_velocity_z = 2
self.max_velocity_xy = 10
self.radiation_ext = None
self.state = VirtualUAVState.LANDED
self.takeoff_altitude = 3
self.target = None
self.step(0)
@property
def armed(self) -> bool:
"""Returns whether the drone is armed."""
return self._armed
@armed.setter
def armed(self, value: bool) -> None:
self._armed = value
self.ensure_error(FlockwaveErrorCode.DISARMED, present=not self._armed)
def arm_if_on_ground(self) -> bool:
"""Arms the virtual drone if it is standing on the ground.
Returns:
whether the operation has succeeded
"""
if self.sleeping:
return False
if self.state is VirtualUAVState.LANDED:
self.armed = True
return True
else:
return False
def disarm_if_on_ground(self) -> bool:
"""Disarms the virtual drone if it is standing on the ground.
Returns:
whether the operation has succeeded
"""
if self.sleeping:
return False
if self.state is VirtualUAVState.LANDED:
self.armed = False
return True
else:
return False
@property
def autopilot_initializing(self) -> bool:
"""Returns whether the simulated autopilot is currently initializing."""
return self._autopilot_initializing
@autopilot_initializing.setter
def autopilot_initializing(self, value: bool) -> None:
value = bool(value)
if self._autopilot_initializing == value:
return
self._autopilot_initializing = value
self.ensure_error(
FlockwaveErrorCode.AUTOPILOT_INITIALIZING,
present=self._autopilot_initializing,
)
async def calibrate_component(
self, component: str
) -> ProgressEventsWithSuspension[None, str]:
"""Calibrates a component of the UAV.
Parameters:
component: the component to calibrate; we plan to support
``accel``, ``baro``, ``compass``, ``gyro`` and ``level``.
Raises:
NotImplementedError: if the calibration of the given component is
not yet implemented on this UAV
NotSupportedError: if the calibration of the given component is not
supported on this UAV
RuntimeError: if the UAV rejected to calibrate the component
"""
match component:
case "compass":
for i in range(10):
yield Progress(percentage=i * 10)
await sleep(0.5 + random() * 1.5)
yield Progress(percentage=100)
case "accel" | "baro" | "gyro" | "level":
raise NotImplementedError
case _:
raise NotSupportedError
# TODO: Dynamically set preflight results and implement other components
success = random() < 0.9
if not success:
raise RuntimeError(f"Failed to calibrate component: {component!r}")
def can_handle_firmware_update_target(self, target_id: str) -> bool:
"""Returns whether the virtual UAV can handle uploads with the given
target.
"""
return target_id == FIRMWARE_UPDATE_TARGET_ID
@property
def elapsed_time_in_mission(self) -> Optional[float]:
"""Returns the number of seconds elapsed in the execution of the current
mission (trajectory) or `None` if no mission is running yet.
"""
return (
monotonic() - self._mission_started_at
if self._mission_started_at is not None
else None
)
@property
def errors(self) -> Collection[int]:
return self._status.errors
async def get_parameter(self, name: str, fetch: bool = False) -> str:
if fetch:
# Simulate a bit of delay to make it more realistic
await sleep(0.05)
return self._parameters.get(name, "unset")
def get_version_info(self) -> VersionInfo:
from flockwave.server.version import __version__ as server_version
return {
"server": server_version,
"firmware": f"v{self._version}-{self._version_hash[:4].hex()}",
}
async def handle_firmware_update(self, target_id: str, blob: bytes):
assert self.can_handle_firmware_update_target(target_id)
new_hash = firmware_hash(blob).digest()
# Calculate the hash of the received blob and wait for a random time
# interval between 3 and 5 seconds, posting a fake progress update
# periodically
iterations = randint(30, 50)
for i in range(iterations):
await sleep(0.1)
yield Progress(percentage=100 * i // iterations)
yield Progress(percentage=100)
self._version_hash = new_hash
@property
def has_trajectory(self) -> bool:
return self._trajectory is not None
@property
def has_mission_items(self) -> bool:
return self._mission_items is not None
@property
def has_user_defined_error(self) -> bool:
return bool(self._user_defined_error)
@property
def home(self) -> GPSCoordinate:
coord = self._trans.origin.copy()
if self._home_amsl:
coord.amsl = self._home_amsl
return coord
@home.setter
def home(self, value: GPSCoordinate) -> None:
self._trans.origin = value
if value.amsl is not None:
self._home_amsl = float(value.amsl)
@property
def motors_running(self) -> bool:
"""Returns whether motors of the drone are running."""
return self._motors_running
def send_log_message_to_gcs(self, *args, **kwds):
kwds["sender"] = self.id
assert self.driver.app is not None
return self.driver.app.request_to_send_SYS_MSG_message(*args, **kwds)
async def set_parameter(self, name: str, value: Any) -> None:
# Simulate a bit of delay to make it more realistic
await sleep(0.05)
self._parameters[name] = str(value)
@property
def sleeping(self) -> bool:
"""Returns whether the drone is sleeping."""
return self._sleeping
@sleeping.setter
def sleeping(self, value: bool) -> None:
self._sleeping = value
self.ensure_error(FlockwaveErrorCode.SLEEPING, present=self._sleeping)
@property
def state(self) -> VirtualUAVState:
"""The state of the UAV; one of the constants from the VirtualUAVState_
enum class.
"""
return self._state
@state.setter
def state(self, value: VirtualUAVState) -> None:
if self._state == value:
return
old_state = self._state
self._state = value
self.ensure_error(FlockwaveErrorCode.RETURN_TO_HOME, present=False)
self.ensure_error(
FlockwaveErrorCode.TAKEOFF, present=self._state is VirtualUAVState.TAKEOFF
)
self.ensure_error(
FlockwaveErrorCode.LANDING, present=self._state is VirtualUAVState.LANDING
)
# Motors must be running if the UAV is not on the ground
if self._state is not VirtualUAVState.LANDED:
self.start_motors()
if self._state is VirtualUAVState.TAKEOFF:
if old_state is VirtualUAVState.LANDED:
# Start following the trajectory if we have one
if self._trajectory is not None:
self._trajectory_player = TrajectoryPlayer(
TrajectorySpecification(self._trajectory)
)
# Start the light program
self._light_controller.play_light_program()
# TODO: add yaw control player
elif self._state is VirtualUAVState.AIRBORNE:
if old_state is not VirtualUAVState.TAKEOFF:
# Stop following the trajectory, just in case
self.stop_trajectory()
elif self._state is VirtualUAVState.LANDED:
# Mission ended, stop playing the light program and stop the motors
self._light_controller.stop_light_program()
self.stop_motors()
@property
def target(self) -> Optional[GPSCoordinate]:
"""The target coordinates of the UAV in GPS coordinates."""
return self._target
@target.setter
def target(self, value: Optional[GPSCoordinate]) -> None:
self._target = value
# Clear the "return to home" error code (if any)
self.ensure_error(FlockwaveErrorCode.RETURN_TO_HOME, present=False)
if value is None:
self._target_xyz = None
else:
# Calculate the real altitude component of the target
if value.ahl is None:
if value.amsl is None or self._home_amsl is None:
new_altitude = self._position_xyz.z
else:
new_altitude = value.amsl - self._home_amsl
else:
new_altitude = value.ahl
# Update the target and its XYZ representation
assert self._target is not None
self._target.update(ahl=new_altitude)
flat = self._trans.to_flat_earth(value)
self._target_xyz = Vector3D(x=flat.x, y=flat.y, z=new_altitude)
@property
def target_xyz(self) -> Optional[Vector3D]:
"""The target coordinates of the UAV in flat Earth coordinates around
its home.
"""
return self._target_xyz
@target_xyz.setter
def target_xyz(self, value):
if value is None:
self.target = None
else:
x, y, z = value
amsl = self._home_amsl + z if self._home_amsl is not None else None
flat_earth = FlatEarthCoordinate(x=x, y=y, amsl=amsl, ahl=z)
self.target = self._trans.to_gps(flat_earth)
async def test_component(self, component: str):
"""Tests a component of the UAV.
Parameters:
component: the component to test; currently we support ``motor`` and
``led``
"""
if component == "motor":
self.start_motors()
await sleep(3)
self.stop_motors()
elif component == "led":
color_sequence = [
Color(name)
for name in "red lime blue yellow cyan magenta white".split()
]
for index, color in enumerate(color_sequence):
if index > 0:
await sleep(1)
self.set_led_color(color)
yield Progress(
percentage=int((index + 1) * (100 / len(color_sequence)))
)
else:
raise NotSupportedError
@property
def user_defined_error(self) -> Optional[int]:
"""Returns the single user-defined error code or `None` if the UAV
is currently not simulating any error condition.
"""
return self._user_defined_error
@user_defined_error.setter
def user_defined_error(self, value: Optional[int]) -> None:
if value is not None:
value = int(value)
if self._user_defined_error == value:
return
if self._user_defined_error is not None:
self.ensure_error(self._user_defined_error, present=False)
self._user_defined_error = value
if self._user_defined_error is not None:
self.ensure_error(self._user_defined_error, present=True)
def handle_mission_upload(self, bundle: MissionItemBundle, format: str) -> None:
"""Handles the upload of a waypoint based mission.
Parameters:
bundle: the mission bundle
format: the mission format
Raises:
RuntimeError: if the drone is not on the ground
"""
if self.state is not VirtualUAVState.LANDED:
raise RuntimeError("Cannot upload a mission while the drone is airborne")
self._mission_items = bundle.get("items", None)
self.update_status(mode="mission" if self.has_mission_items else "stab")
def handle_show_upload(self, show) -> None:
"""Handles the upload of a full drone show (trajectory + light program).
Parameters:
show: the uploaded show in Skybrush format
Raises:
RuntimeError: if the drone is not on the ground
"""
if self.state is not VirtualUAVState.LANDED:
raise RuntimeError("Cannot upload a show while the drone is airborne")
self._trajectory_transformation = get_coordinate_system_from_show_specification(
show
)
self._trajectory = show.get("trajectory", None)
self._light_controller.load_light_program(
get_light_program_from_show_specification(show)
)
self.update_status(mode="mission" if self.has_trajectory else "stab")
def handle_where_are_you(self, duration: float) -> None:
"""Handles a 'where are you' command.
Parameters:
duration: duration of the signal in seconds.
"""
self._light_controller.where_are_you(duration)
def hold_position(self) -> None:
"""Requests the UAV to hold its current position if it is flying,
otherwise do nothing.
"""
if self.state is VirtualUAVState.LANDED:
# Do nothing
return
self.stop_trajectory()
self._target_xyz = self._position_xyz.copy()
self.state = VirtualUAVState.AIRBORNE
def land(self) -> None:
"""Starts a simulated landing with the virtual UAV."""
if self.state != VirtualUAVState.AIRBORNE:
return
if self._target_xyz is None:
self._target_xyz = self._position_xyz.copy()
self._target_xyz.z = 0
self.state = VirtualUAVState.LANDING
def set_led_color(self, color: Optional[Color]) -> None:
"""Overrides the current color of the simulated light on the drone."""
self._light_controller.override = color
def reset(self) -> None:
"""Requests the UAV to reset itself if it is currently running."""
if self._request_shutdown:
self._shutdown_reason = "reset"
self._request_shutdown()
async def run_single_boot(
self,
delay: float,
*,
mutate: Callable,
notify: Callable[[], None],
spawn: Callable,
) -> str:
"""Simulates a single boot session of the virtual UAV.
Parameters:
delay: number of seconds to wait between consecutive status updates
notify: function to call when new status information should be
dispatched about the UAV
spawn: function to call when the UAV wishes to spawn a background
task
Returns:
`"shutdown"` if the user requested the UAV to shut down;
`"reset"` if the user requested the UAV to reset itself.
"""
# Booting takes a bit of time; we simulate this with a random delay
await sleep(random() + 1)
# Now we enter the main control loop of the UAV. We assume that the
# autopilot initialization takes about 2 seconds.
self._notify_booted()
spawn(
delayed(
random() * 0.5 + 2,
self._notify_autopilot_initialized,
ensure_async=True,
)
)
try:
with CancelScope() as scope:
self._request_shutdown = scope.cancel
async for _ in periodic(delay):
with mutate() as mutator:
self.step(mutator=mutator, dt=delay)
notify()
return self._shutdown_reason or "shutdown"
finally:
self._notify_shutdown()
def shutdown(self) -> None:
"""Requests the UAV to shutdown if it is currently running."""
if self._request_shutdown:
self._shutdown_reason = "shutdown"
self._request_shutdown()
def start_motors(self) -> None:
"""Starts the motors of the UAV if they are not running yet."""
self._motors_running = True
def stop_motors(self) -> None:
"""Stop the motors of the UAV if they are running and the UAV has
landed.
"""
if self.state is VirtualUAVState.LANDED:
self._motors_running = False
def step(self, dt: float, mutator=None) -> None:
"""Simulates a single step of the trajectory of the virtual UAV based
on its state and the amount of time that has passed.
Parameters:
dt (float): the time that has passed, in seconds.
mutator (DeviceTreeMutator): the mutator object that should be
used by the UAV to update its channel nodes
"""
state = self._state
# Update the target of the drone if it is currently following a
# predefined trajectory and it is not landing or landed
if state is VirtualUAVState.TAKEOFF or state is VirtualUAVState.AIRBORNE:
if self._trajectory_player and self._mission_started_at is not None:
self._update_target_from_trajectory()
# Do we have a target?
if self._target_xyz is not None:
# We aim for the target in the XY plane only if we are airborne
if state is VirtualUAVState.AIRBORNE:
dx = self._target_xyz.x - self._position_xyz.x
dy = self._target_xyz.y - self._position_xyz.y
else:
dx, dy = 0, 0
# During the takeoff phase, if we are flying a mission and the
# takeoff time has not been reached yet, we are not allowed to
# move in the Z direction either
dz = self._target_xyz.z - self._position_xyz.z
if state is VirtualUAVState.TAKEOFF and self._trajectory_player:
t = self.elapsed_time_in_mission
if t is not None and self._trajectory_player.is_before_takeoff(t):
dz = 0
angle = atan2(dy, dx)
dist_xy = hypot(dx, dy)
if dist_xy < 1e-6:
dist_xy = 0
dist_z = abs(dz)
reachable_velocity_xy = min(
hypot(self._velocity_xyz.x, self._velocity_xyz.y)
+ self.max_acceleration_xy * dt,
self.max_velocity_xy,
)
displacement_xy = min(dist_xy, dt * reachable_velocity_xy)
displacement_x = cos(angle) * displacement_xy
displacement_y = sin(angle) * displacement_xy
if dz < 0:
# Descending
reachable_velocity_z = max(
self._velocity_xyz.z - self.max_acceleration_z * dt,
-self.max_velocity_z,
)
displacement_z = max(
dz, dt * reachable_velocity_z, -self._position_xyz.z
)
elif dz == 0:
displacement_z = 0
else:
# Ascending
reachable_velocity_z = min(
self._velocity_xyz.z + self.max_acceleration_z * dt,
self.max_velocity_z,
)
displacement_z = min(dz, dt * reachable_velocity_z)
self._velocity_xyz.x = displacement_x / dt if dt > 0 else 0.0
self._velocity_xyz.y = displacement_y / dt if dt > 0 else 0.0
self._velocity_xyz.z = displacement_z / dt if dt > 0 else 0.0
self._position_xyz.x += displacement_x
self._position_xyz.y += displacement_y
self._position_xyz.z += displacement_z
# If we are above the takeoff altitude minus some threshold and
# we are in the TAKEOFF stage, move to being airborne. Also, if
# we are landing and we are very close to the ground, consider
# ourselves as landed.
eps = 0.2
if state is VirtualUAVState.TAKEOFF:
if self._position_xyz.z > max(eps, self.takeoff_altitude - eps):
self.state = VirtualUAVState.AIRBORNE
elif state is VirtualUAVState.LANDING:
if dist_z < eps * 0.5:
self.state = VirtualUAVState.LANDED
self._mission_started_at = None
self.target = None
elif state is VirtualUAVState.AIRBORNE:
# If we have reached the target, we can clear it
if dist_xy < eps and dist_z < eps:
self.target = None
# Calculate our coordinates in flat Earth
self._position_flat.x = self._position_xyz.x
self._position_flat.y = self._position_xyz.y
self._position_flat.ahl = self._position_xyz.z
self._position_flat.amsl = (
self._position_xyz.z + self._home_amsl
if self._home_amsl is not None
else None
)
# Transform the flat Earth coordinates to GPS around our
# current position as origin
position = self._trans.to_gps(self._position_flat)
# Calculate the velocity in NED
s_o = sin(radians(-self._trans.orientation))
c_o = cos(radians(-self._trans.orientation))
self._velocity_ned.update(
north=(c_o * self._velocity_xyz.x) + (-s_o * self._velocity_xyz.y),
east=(-s_o * self._velocity_xyz.x) + (-c_o * self._velocity_xyz.y),
down=-self._velocity_xyz.z,
)
# Discharge the battery
load = 0.01 if self.state is VirtualUAVState.LANDED else 1.0
self.battery.discharge(dt, load, mutator=mutator)
# Update the error code based on the battery status
self.ensure_errors(
{
FlockwaveErrorCode.BATTERY_CRITICAL: self.battery.is_critical,
FlockwaveErrorCode.BATTERY_LOW_ERROR: self.battery.is_very_low,
FlockwaveErrorCode.BATTERY_LOW_WARNING: self.battery.is_low,
}
)
# Update the error codes when the UAV is still on the ground, based on
# whether its motors are running or not
self.ensure_errors(
{
FlockwaveErrorCode.ON_GROUND: self.state is VirtualUAVState.LANDED
and not self.motors_running,
FlockwaveErrorCode.MOTORS_RUNNING_WHILE_ON_GROUND: self.state
is VirtualUAVState.LANDED
and self.motors_running,
}
)
# Update the UAV status
updates = {
"position": position,
"velocity": self._velocity_ned,
"battery": self.battery.status,
"light": color_to_rgb565(self._light_controller.evaluate(monotonic())),
}
self.update_status(**updates)
# Measure radiation if possible
# TODO(ntamas): calculate radiation halfway between the current
# position and the previous one instead
if self.radiation_ext is not None and self.radiation_ext.loaded:
observed_count = self.radiation_ext.measure_at(position, seconds=dt)
# Okay, now we extrapolate from the observed count to the
# per-second intensity. This should be made smarter; for
# instance, we should report a new value only if we have
# observed enough data
radiation_intensity_estimate = observed_count / dt
else:
observed_count = 0
radiation_intensity_estimate = 0
# Also update our sensors
if mutator is not None:
mutator.update(
self.thermometer,
{"lat": position.lat, "lon": position.lon, "value": 24.0},
)
mutator.update(
self.geiger_counter["averaged"],
{
"lat": position.lat,
"lon": position.lon,
"value": radiation_intensity_estimate,
},
)
mutator.update(
self.geiger_counter["raw"],
{"lat": position.lat, "lon": position.lon, "value": observed_count},
)
def stop_trajectory(self) -> None:
"""Prevents the UAV from following its pre-defined trajectory if it is
currently following one. No-op if the UAV is not following a predefined
trajectory.
Also makes the UAV "forget" its current trajectory.
"""
if self._trajectory_player:
self._trajectory = None
self._trajectory_player = None
self._trajectory_transformation = None
def takeoff(self) -> None:
"""Starts a simulated take-off with the virtual UAV."""
if self.sleeping:
return
if self.state != VirtualUAVState.LANDED:
return
if not self.armed:
return
self._mission_started_at = monotonic()
if self._target_xyz is None:
self._target_xyz = self._position_xyz.copy()
self._target_xyz.z = self.takeoff_altitude
self.state = VirtualUAVState.TAKEOFF
def _initialize_device_tree_node(self, node: ObjectNode) -> None:
self.battery = VirtualBattery(report_percentage=self.use_battery_percentage)
self.battery.register_in_device_tree(node)
device = node.add_device("thermometer")
self.thermometer = device.add_channel(
"temperature", type=object, unit="\u00b0C"
)
device = node.add_device("geiger_counter")
self.geiger_counter = {
"raw": device.add_channel("raw_measurement", type=object, unit="counts"),
"averaged": device.add_channel(
"measurement", type=object, unit="counts/sec"
),
}
def _notify_booted(self) -> None:
"""Notifies the virtual UAV that the boot process has ended."""
self._request_shutdown = None
self._shutdown_reason = None
self.armed = bool(self.boots_armed)
self.autopilot_initializing = True
self.sleeping = False
self._trajectory = None
self._trajectory_player = None
self._trajectory_transformation = None
self.update_status(
gps=GPSFixType.DGPS, mode="mission" if self.has_trajectory else "stab"
)
def _notify_autopilot_initialized(self) -> None:
"""Notifies the virtual UAV that the autopilot has initialized."""
self.autopilot_initializing = False
def _notify_shutdown(self) -> None:
"""Notifies the virtual UAV that it is about to shut down."""
self._request_shutdown = None
self._shutdown_reason = None
self.autopilot_initializing = False
def _update_target_from_trajectory(self) -> None:
"""Updates the target of the UAV based on the time elapsed since takeoff
and the trajectory that it needs to follow.
"""
t = self.elapsed_time_in_mission
if t is None:
return
assert self._trajectory_player is not None
assert self._trajectory_transformation is not None
if self._trajectory_player.ended:
# Trajectory ended, land the drone
self.land()
return
if not self._trajectory_player.is_before_takeoff(t):
# Time is after the start of the trajectory so evaluate it
x, y, z = self._trajectory_player.position_at(t)
self.target = self._trajectory_transformation.to_gps(
FlatEarthCoordinate(x=x, y=y, ahl=z)
)
_LOGS: dict[str, FlightLog] = {
"1": FlightLog.create(
id="1",
kind=FlightLogKind.TEXT,
body="test flight log 1",
timestamp=1688328000,
),
"2": FlightLog.create(
id="2",
kind=FlightLogKind.TEXT,
body="test flight log 2",
timestamp=1688328900,
),
}
"""Fake logs for the virtual UAVs"""
class VirtualUAVDriver(UAVDriver[VirtualUAV]):
"""Virtual UAV driver that manages a group of virtual UAVs provided by this
extension.
"""
uavs_armed_after_boot: bool
use_battery_percentages: bool
def __init__(self, *args, **kwds):
super().__init__(*args, **kwds)
self.uavs_armed_after_boot = False
self.use_battery_percentages = False
def create_uav(
self, id: str, home: GPSCoordinate, heading: float = 0
) -> VirtualUAV:
"""Creates a new UAV that is to be managed by this driver.
Parameters:
id: the identifier of the UAV to create
home: the home position of the UAV
Returns:
an appropriate virtual UAV object
"""
uav = VirtualUAV(
id, driver=self, use_battery_percentage=self.use_battery_percentages
)
uav.boots_armed = bool(self.uavs_armed_after_boot)
uav.takeoff_altitude = 3
uav.home = home.copy()
uav.home.amsl = None
uav.home.agl = None
uav.home.ahl = 0
uav.target = home.copy()
uav.update_status(heading=heading)
return uav
async def handle_command_arm(self, uav: VirtualUAV) -> str:
"""Command that arms the virtual drone if it is on the ground."""
if uav.arm_if_on_ground():
return "Armed"
else:
return "Failed to arm"
async def handle_command_async_exception(self, uav: VirtualUAV) -> NoReturn:
"""Throws an synchronous exception."""
await sleep(0.2)
raise RuntimeError("Async exception raised")
async def handle_command_battery(self, uav: VirtualUAV, value: str) -> str:
"""Command that sets the battery voltage to a given value."""
if hasattr(value, "endswith") and value.endswith("%"):
uav.battery.percentage = float(value[:-1])
if uav.battery.percentage is not None:
return f"Battery percentage set to {uav.battery.percentage:.2f}%"
else:
# This may happen if the UAV is configured not to report percentages
return f"Voltage set to {uav.battery.voltage:.2f}V"
else:
uav.battery.voltage = float(value)
return f"Voltage set to {uav.battery.voltage:.2f}V"
handle_command_calib = create_calibration_command_handler(
("accel", "baro", "compass", "gyro", "level")
)
async def handle_command_disarm(self, uav: VirtualUAV) -> str:
"""Command that disarms the virtual drone if it is on the ground."""
if uav.disarm_if_on_ground():
return "Disarmed"
else:
return "Failed to disarm"
def handle_command_echo(self, uav: VirtualUAV, message: str) -> str:
"""Echoes a message back to the client using a SYS-MSG message and as
normal response as well.
"""
uav.send_log_message_to_gcs(message)
return message
def handle_command_error(self, uav: VirtualUAV, value: Union[str, int] = 0) -> str:
"""Sets or clears the error code of the virtual drone."""
value = int(value)
uav.user_defined_error = value
return (
f"Error code set to {uav.user_defined_error}"
if uav.has_user_defined_error
else "Error code cleared"
)
def handle_command_exception(self, uav: VirtualUAV) -> NoReturn:
"""Throws a synchronous exception."""
raise RuntimeError("Sync exception raised")
async def handle_command_progress(self, uav: VirtualUAV):
"""Dummy command that can be used to test progress reports sent
during the execution of a command.
The execution of this command takes five seconds. A progress report
is sent every 500 milliseconds.
"""
for i in range(10):
yield Progress(percentage=i * 10)
await sleep(0.5)
yield Progress(percentage=100)
yield "Result."
async def handle_command_progress_and_suspend(self, uav: VirtualUAV):
"""Dummy command that can be used to test progress reports sent
during the execution of a command, with suspension support.
The execution of this command takes five seconds. A progress report
is sent every 500 milliseconds. After the fifth progress report, the
execution is suspended; the client is expected to send back a truthy
value to resume the execution. Sending back a falsy value will make
the operation throw an exception.
"""
yield Progress(percentage=0)
for i in range(5):
await sleep(0.5)
yield Progress(percentage=(i + 1) * 10)
value = yield Suspend(
message="Send a truthy value back", object={"expected": True}
)
if not value:
raise RuntimeError("falsy value received")
for i in range(6, 11):
await sleep(0.5)
yield Progress(percentage=i * 10)
yield "Result."
handle_command_test = create_test_command_handler(("motor", "led"))
async def handle_command_timeout(self, uav: VirtualUAV) -> None:
"""Dummy command that does not respond in a reasonable amount of time.
Can be used on the client side to test response timeouts.
"""
await sleep(1000000)
async def handle_command___mission_upload(
self, uav: VirtualUAV, bundle: MissionItemBundle, format: str
) -> None:
"""Handles a drone mission upload request for the given UAV.
This is a temporary solution until we figure out something that is
more sustainable in the long run.
Parameters:
bundle: the mission bundle
format: the mission format
"""
uav.handle_mission_upload(bundle, format)
await sleep(0.25 + random() * 0.5)
async def handle_command___show_upload(self, uav: VirtualUAV, *, show) -> None:
"""Handles a drone show upload request for the given UAV.
This is a temporary solution until we figure out something that is
more sustainable in the long run.
Parameters:
show: the show data
"""
uav.handle_show_upload(show)
await sleep(0.25 + random() * 0.5)
async def handle_command_yo(self, uav: VirtualUAV) -> str:
await sleep(0.5 + random())
return "yo" + choice("?!.")
handle_command_color = create_color_command_handler()
handle_command_param = create_parameter_command_handler()
handle_command_version = create_version_command_handler()
async def get_log(self, uav: VirtualUAV, log_id: str) -> FlightLog:
# Simulate a bit of delay to make it more realistic
await sleep(0.5)
try:
return _LOGS[log_id]
except KeyError:
raise RuntimeError(f"no such log: {log_id}") from None
async def _enter_low_power_mode_single(
self, uav: VirtualUAV, *, transport: Optional[TransportOptions] = None
) -> None:
if uav.motors_running:
raise RuntimeError(
"Cannot bring UAV to sleep mode while motors are running"
)
uav.sleeping = True
async def _get_log_list_single(self, uav: VirtualUAV) -> list[FlightLogMetadata]:
# Simulate a bit of delay to make it more realistic
await sleep(0.2)
return [log.get_metadata() for log in _LOGS.values()]
async def _get_parameter_single(self, uav: VirtualUAV, name: str) -> Any:
return await uav.get_parameter(name)
async def _resume_from_low_power_mode_single(
self, uav: VirtualUAV, *, transport: Optional[TransportOptions] = None
) -> None:
uav.sleeping = False
def _request_preflight_report_single(self, uav: VirtualUAV) -> PreflightCheckInfo:
return _dummy_preflight_check_info
def _request_version_info_single(self, uav: VirtualUAV) -> VersionInfo:
return uav.get_version_info()
def _send_fly_to_target_signal_single(
self, uav: VirtualUAV, target: GPSCoordinate
) -> None:
if uav.state == VirtualUAVState.LANDED:
uav.takeoff()
if target.ahl is None and target.amsl is None:
target.ahl = uav.takeoff_altitude
uav.stop_trajectory()
uav.target = target
async def _send_hover_signal_single(
self, uav: VirtualUAV, *, transport: Optional[TransportOptions] = None
) -> None:
# Make the hover signal async to simulate how it works for "real" drones
await sleep(0.2)
uav.hold_position()
async def _send_landing_signal_single(
self, uav: VirtualUAV, *, transport: Optional[TransportOptions] = None
) -> None:
# Make the landing signal async to simulate how it works for "real" drones
await sleep(0.2)
uav.land()
def _send_light_or_sound_emission_signal_single(
self,
uav: VirtualUAV,
signals: list[str],
duration: float,
*,
transport: Optional[TransportOptions] = None,
) -> None:
if "light" in signals:
uav.handle_where_are_you(duration)
def _send_motor_start_stop_signal_single(
self,
uav: VirtualUAV,
start: bool,
force: bool = False,
*,
transport: Optional[TransportOptions] = None,
) -> None:
if start:
uav.start_motors()
else:
uav.stop_motors()
def _send_reset_signal_single(
self,
uav: VirtualUAV,
component: str,
*,
transport: Optional[TransportOptions] = None,
) -> None:
if not component:
# Resetting the whole UAV, this is supported
uav.reset()
else:
# No components on this UAV
raise RuntimeError(f"Resetting {component!r} is not supported")
def _send_return_to_home_signal_single(
self, uav: VirtualUAV, *, transport=None
) -> None:
if uav.state == VirtualUAVState.AIRBORNE:
target = uav.home.copy()
target.ahl = uav.status.position.ahl
uav.stop_trajectory()
uav.target = target
uav.ensure_error(FlockwaveErrorCode.RETURN_TO_HOME)
else:
raise RuntimeError("UAV is not airborne, cannot start RTH")
def _send_shutdown_signal_single(self, uav: VirtualUAV, *, transport=None) -> None:
uav.shutdown()
async def _send_takeoff_signal_single(
self, uav: VirtualUAV, *, scheduled: bool = False, transport=None
) -> None:
await sleep(0.2)
uav.takeoff()
async def _set_parameter_single(
self, uav: VirtualUAV, name: str, value: Any
) -> None:
await uav.set_parameter(name, value)