Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-server / server / ext / crazyflie / trajectory.py
Size: Mime:
"""Classes and functions for modeling trajectories in a format that is
understood by the Crazyflies.
"""

from dataclasses import dataclass
from enum import IntEnum
from struct import Struct
from typing import Sequence

from flockwave.server.show.formats import SegmentEncoder
from flockwave.server.show.trajectory import TrajectorySpecification

from .math import get_poly_degree, to_bernstein_form


__all__ = ("encode_trajectory", "Poly4D", "to_poly4d_sequence")


class TrajectoryEncoding(IntEnum):
    """Enum representing the various trajectory encodings that the Crazyflie
    supports.
    """

    POLY4D = 0
    COMPRESSED = 1


@dataclass
class Poly4D:
    """Four-dimensional 7-th degree polynomial, used by the Crazyflie
    to represent trajectory segments. The four dimensions are: x, y, z and
    yaw.
    """

    duration: float
    xs: tuple[float, ...] = (0.0,) * 8
    ys: tuple[float, ...] = (0.0,) * 8
    zs: tuple[float, ...] = (0.0,) * 8
    yaws: tuple[float, ...] = (0.0,) * 8

    _float_coords_struct = Struct("<ffffffff")
    _duration_struct = Struct("<f")

    _compressed_header_struct = Struct("<BH")
    _short_coord_struct = Struct("<h")
    _short_coords_struct = Struct("<hhhh")

    def encode(self) -> bytes:
        """Encodes this Poly4D instance into a raw byte-level representation
        understood by the Crazyflie in its uncompressed trajectory format.

        Returns:
            the uncompressed representation of this trajectory segment
        """
        return b"".join(
            [
                self._float_coords_struct.pack(*self.xs),
                self._float_coords_struct.pack(*self.ys),
                self._float_coords_struct.pack(*self.zs),
                self._float_coords_struct.pack(*self.yaws),
                self._duration_struct.pack(self.duration),
            ]
        )

    def encode_compressed(self, with_start_point: bool = False) -> bytes:
        """Encodes this Poly4D instance into the compressed trajectory
        representation of the Crazyflie.

        Parameters:
            with_start_point: whether th prepend the Poly4D data with the
                start point; this must be used for the first Poly4D segment
                in the encoding

        Returns:
            the compressed representation of this trajectory segment
        """
        formats = []
        parts = []

        all_polys_and_scales = (
            (self.xs, 1000),  # scaling factor: 1m = 1000 units
            (self.ys, 1000),
            (self.zs, 1000),
            (self.yaws, 10),  # scaling factor: 1 degree = 10 units
        )

        for poly, scale in all_polys_and_scales:
            # Rescale the argument of the parametric curve to the [0; 1] range
            if self.duration != 1:
                poly = [x * (self.duration**i) for i, x in enumerate(poly)]

            # Encode polynomial coefficients in Bernstein form
            format, data = self._encode_polynomial_compressed(poly, scale)

            # Store the data
            formats.append(format)
            parts.append(data)

        header = 0
        header |= formats[0]
        header |= formats[1] << 2
        header |= formats[2] << 4
        header |= formats[3] << 6

        duration = int(round(self.duration * 1000))
        parts.insert(0, self._compressed_header_struct.pack(header, duration))

        if with_start_point:
            parts.insert(
                0,
                self._short_coords_struct.pack(
                    *[round(poly[0] * scale) for poly, scale in all_polys_and_scales]
                ),
            )

        return b"".join(parts)

    @classmethod
    def _encode_polynomial_compressed(
        cls, coeffs: Sequence[float], scale: int = 1000, *, eps: float = 1e-7
    ) -> tuple[int, bytes]:
        """Encodes the coefficients of the given polynomial into the compressed
        byte-level representation of the Crazyflie, retuning the chosen
        compression scheme and the raw bytes.

        The 0-degree coefficient will _not_ be encoded; we don't need it when
        we encode continuous curves as the start of a segment is the same as
        the end of the previous segment, which we know. The remaining
        nonzero coefficients will be encoded as unsigned short integers such
        that the raw value of the coefficient is multiplied by 1000 and then
        rounded to the nearest integer.

        Parameters:
            coeffs: raw coefficients of the polynomial to encode
            eps: threshold below which a coefficient is treated as zero
            scale: scaling factor to use when encoding the coordinates of
                the control points in Bernstein form as integers

        Returns:
            a tuple consisting of the chosen compression scheme (0 = constant,
            1 = linear, 2 = cubic, 3 = 7-th degree polynomial), and the raw
            bytes that encode the coefficients.
        """
        degree = get_poly_degree(coeffs, eps=eps)
        coeffs = to_bernstein_form(coeffs, eps=eps)
        if len(coeffs) < degree + 1:
            coeffs += [0] * (degree + 1 - len(coeffs))

        if degree <= 0:
            format = 0
        elif degree <= 1:
            format = 1
        elif degree <= 3:
            format = 2
        elif degree <= 7:
            format = 3
        else:
            raise ValueError(
                "polynomials with nonzero coefficients above the 7th degree "
                "are not supported"
            )

        data = b"".join(
            cls._short_coord_struct.pack(int(round(coeff * scale)))
            for coeff in coeffs[1:]
        )
        return format, data


def encode_trajectory(
    trajectory: TrajectorySpecification,
    *,
    encoding: TrajectoryEncoding = TrajectoryEncoding.POLY4D,
) -> bytes:
    """Returns a byte-level representation of the given sequence of Poly4D
    segments.

    Parameters:
        segments: the segments to encode
        encoding: the encoding format to use

    Returns:
        the encoded trajectory
    """
    if encoding is TrajectoryEncoding.POLY4D:
        polynomials = to_poly4d_sequence(trajectory)
        result = b"".join(polynomial.encode() for polynomial in polynomials)
    else:
        encoder = SegmentEncoder(scale=1)
        encoded = encoder.iter_encode_multiple_segments(
            trajectory.iter_segments(max_length=65)
        )
        result = b"".join(encoded) + b"\x00\x00\x00"

    return result


def to_poly4d_sequence(trajectory: TrajectorySpecification) -> Sequence[Poly4D]:
    result = []

    for segment in trajectory.iter_segments(max_length=65):
        if segment.has_control_points:
            raise ValueError("control points are not implemented yet")

        start, end = segment.start, segment.end
        dx, dy, dz = end[0] - start[0], end[1] - start[1], end[2] - start[2]
        dt = segment.duration

        xs = (start[0], dx / dt, 0, 0, 0, 0, 0, 0)
        ys = (start[1], dy / dt, 0, 0, 0, 0, 0, 0)
        zs = (start[2], dz / dt, 0, 0, 0, 0, 0, 0)

        result.append(Poly4D(duration=dt, xs=xs, ys=ys, zs=zs))

    return result