Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-server / server / show / trajectory.py
Size: Mime:
"""Temporary place for functions that are related to the processing of
Skybrush-related trajectories, until we find a better place for them.
"""

from dataclasses import dataclass
from itertools import chain
from math import ceil, inf
from typing import Iterable, Optional, Sequence

from .utils import BoundingBoxCalculator, Point

__all__ = ("TrajectorySpecification",)


@dataclass(frozen=True)
class TrajectorySegment:
    """A single segment in a trajectory specification."""

    t: float
    """The start time of the segment, relative to the takeoff time of the
    trajectory.
    """

    duration: float
    """The total duration of the segment."""

    points: list[Point]
    """The control points of the segment, including the start and end point."""

    @property
    def has_control_points(self) -> bool:
        """Returns whether the keypoint has control points."""
        return len(self.points) > 2

    @property
    def start(self) -> Point:
        """Returns the start point of the segment."""
        return self.points[0]

    @property
    def end(self) -> Point:
        """Returns the end point of the segment."""
        return self.points[-1]

    @property
    def start_time(self) -> float:
        """Returns the start time of the segment, relative to the takeoff time."""
        return self.t

    @property
    def end_time(self) -> float:
        """Returns the end time of the segment, relative to the takeoff time."""
        return self.t + self.duration

    def split_at(
        self, fraction: float
    ) -> tuple["TrajectorySegment", "TrajectorySegment"]:
        """Splits the segment into two pieces at the given relative fraction.

        Parameters:
            fraction: the fraction to split the segment at

        Returns:
            the two smaller pieces that the segment was split into
        """
        if fraction < 0 or fraction > 1:
            raise ValueError("fraction must be between 0 and 1")
        elif fraction == 0:
            first = TrajectorySegment(self.t, 0, [self.start])
            second = self
        elif fraction == 1:
            first = self
            second = TrajectorySegment(self.end_time, 0, [self.end])
        else:
            first_points: list[Point] = []
            second_points: list[Point] = []
            first_points, second_points = self._split_helper(fraction, self.points)
            first_duration = self.duration * fraction
            first = TrajectorySegment(self.t, first_duration, first_points)
            second = TrajectorySegment(
                self.t + first_duration, self.duration - first_duration, second_points
            )

        return first, second

    def split_to_max_duration(
        self, max_duration: float
    ) -> Iterable["TrajectorySegment"]:
        """Splits the segment into smaller pieces such that the duration of
        each piece is less than or equal to the given maxium duration.
        """
        if max_duration <= 0:
            raise ValueError("maximum duration must be positive")

        num_splits = self.duration // max_duration
        current = self
        if num_splits:
            while num_splits > 0:
                ratio = 1 / (num_splits + 1)
                head, current = current.split_at(ratio)
                num_splits -= 1
                yield head
        yield current

    @staticmethod
    def _split_helper(
        t: float, points: Sequence[Point]
    ) -> tuple[list[Point], list[Point]]:
        """Helper function for splitting the segment at a given fraction.
        See https://pomax.github.io/bezierinfo/#splitting for more details.
        """
        left: list[Point] = []
        right: list[Point] = []

        while True:
            left.append(points[0])
            right.append(points[-1])
            n = len(points)
            if n > 1:
                new_points: list[Point] = []
                for i in range(n - 1):
                    new_points.append(
                        (
                            (1 - t) * points[i][0] + t * points[i + 1][0],
                            (1 - t) * points[i][1] + t * points[i + 1][1],
                            (1 - t) * points[i][2] + t * points[i + 1][2],
                        )
                    )
                points = new_points
            else:
                break

        right.reverse()
        return left, right


class TrajectorySpecification:
    """Class representing a Skybrush trajectory specification received from the
    client during a show upload.
    """

    def __init__(self, data: dict):
        """Constructor.

        Parameters:
            data: the raw JSON trajectory dictionary in the show specification
        """
        self._data = data

        version = self._data.get("version")
        if version is None:
            raise RuntimeError("trajectory must have a version number")
        if version != 1:
            raise RuntimeError("only version 1 trajectories are supported")

    @property
    def bounding_box(self) -> tuple[Point, Point]:
        """Returns the coordinates of the opposite corners of the axis-aligned
        bounding box of the trajectory.

        The first point will contain the minimum coordinates, the second will
        contain the maximum coordinates.

        Raises:
            ValueError: if the margin is negative or if the trajectory has no
                points
        """
        return self.get_padded_bounding_box()

    @property
    def duration(self) -> float:
        """Returns the total duration of the trajectory in seconds, _excluding_
        the time spent on the ground before takeoff. In other words, the returned
        duration is the time elapsed between the takeoff and the landing.
        """
        points = self._data.get("points")
        if points:
            t, _, _ = points[-1]
            return float(t)
        else:
            return 0.0

    @property
    def is_empty(self) -> bool:
        """Returns whether the trajectory is empty (i.e. has no points)."""
        return not bool(self._data.get("points"))

    @property
    def home_position(self) -> Point:
        """Returns the home position of the drone within the show. Units are
        in meters.
        """
        # TODO(ntamas): I think the 'home' is not here by default but one level
        # higher in the original JSON structure. I think it's time we created a
        # formal specification and stick to it. :-/
        home = self._data.get("home")
        if not home:
            points = self._data.get("points")
            if points:
                _, home, _ = points[0]

        if home and len(home) == 3:
            return float(home[0]), float(home[1]), float(home[2])
        else:
            return 0.0, 0.0, 0.0

    @property
    def landing_height(self) -> float:
        """Returns the height of the last point of the show, in meters.

        TODO(ntamas): this is correct only as long as the trajectory is
        pre-processed when we receive it and the last segment is cut. Fix this
        when we finally migrate to sending the entire trajectory from the client
        to the server.
        """
        height = self._data.get("landingHeight")
        if height is None:
            points = self._data.get("points")
            if points:
                _, last_pos, _ = points[-1]
                height = float(last_pos[2])
            else:
                height = 0.0
        return height

    @property
    def takeoff_time(self) -> float:
        """Returns the takeoff time of the drone within the show, in seconds."""
        return float(self._data.get("takeoffTime", 0.0))

    def get_padded_bounding_box(self, margin: float = 0) -> tuple[Point, Point]:
        """Returns the coordinates of the opposite corners of the axis-aligned
        bounding box of the trajectory, optionally padded with the given margin.

        The first point will contain the minimum coordinates, the second will
        contain the maximum coordinates.

        Parameters:
            margin: the margin to apply on each side of the bounding box

        Raises:
            ValueError: if the margin is negative or if the trajectory has no
                points
        """
        points = self._data.get("points", [])

        bbox = BoundingBoxCalculator(dim=3)
        for _, point, control_points in points:
            bbox.add(point)
            for control_point in control_points:
                bbox.add(control_point)

        if margin > 0:
            bbox.pad(margin)

        return bbox.get_corners()  # type: ignore

    def iter_segments(
        self, max_length: float = inf, absolute: bool = False
    ) -> Iterable[TrajectorySegment]:
        """Iterates over the segments of the trajectory.

        Args:
            max_length: maximum allowed length of a single segment, in seconds.
                Segments longer than this will be split as needed.
            absolute: whether to use absolute timestamps in the returned
                segments. When ``False``, the timestamp in each segment will be
                relative to the takeoff time of the trajectory. When ``True``,
                timestamps will be expressed relative to T=0 and an optional
                constant segment will be inserted in front of the first segment
                if the takeoff time is positive.
        """
        point_iter: Optional[Iterable[tuple[float, Point, list[Point]]]] = (
            self._data.get("points")
        )
        if not point_iter:
            return

        prev_t: Optional[float] = None
        start: Optional[Point] = None

        if absolute:
            time_offset = self.takeoff_time
            if time_offset > 0:
                # We need to add a stationary segment
                point_iter = chain([(-time_offset, self.home_position, [])], point_iter)
        else:
            time_offset = 0.0

        for point in point_iter:
            t, point, control = point
            if prev_t is None:
                # This is the first keyframe so we simply make sure that there
                # are no control points
                if control:
                    raise ValueError("first keyframe must have no control points")
            else:
                # We have to be careful and round dt to three digits (i.e.
                # milliseconds, otherwise floating-point errors will slowly
                # accumulate. For instance, if every segment is 0.2 sec, we might
                # drift by ~160 msec over a minute or so)
                dt = round(t - prev_t, 3)
                if dt < 0:
                    raise ValueError(f"time should not move backwards at t = {t}")
                elif dt == 0:
                    raise ValueError(f"time should not stand still at t = {t}")
                else:
                    points = [start, *control, point]  # type: ignore
                    segment = TrajectorySegment(
                        t=prev_t + time_offset, duration=dt, points=points
                    )
                    if dt > max_length:
                        yield from segment.split_to_max_duration(max_length)
                    else:
                        yield segment

            prev_t, start = t, point

    def propose_scaling_factor(self) -> int:
        """Proposes a scaling factor to use in a Skybrush binary show file when
        storing the trajectory.
        """
        if self.is_empty:
            return 1

        mins, maxs = self.bounding_box

        coords = []
        coords.extend(abs(x) for x in mins)
        coords.extend(abs(x) for x in maxs)
        extremum = ceil(max(coords) * 1000)

        # With scale=1, we can fit values from 0 to 32767 into the binary show
        # file, so we basically need to divide (extremum+1) by 32768 and round
        # up. This gives us scale = 1 for extrema in [0; 32767],
        # scale = 2 for extrema in [32768; 65535] and so on.
        return ceil((extremum + 1) / 32768)