Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
"""
MAVLink protocol implementation (auto-generated by mavgen.py)

Generated from: ('icarous.xml',)

Note: this file has been auto-generated. DO NOT EDIT
"""

import hashlib
import json
import logging
import os
import struct
import sys
import time
from builtins import object, range
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Mapping,
    Optional,
    Sequence,
    Tuple,
    Type,
    Union,
)

WIRE_PROTOCOL_VERSION = "1.0"
DIALECT = "icarous"

PROTOCOL_MARKER_V1 = 0xFE
PROTOCOL_MARKER_V2 = 0xFD
HEADER_LEN_V1 = 6
HEADER_LEN_V2 = 10

MAVLINK_SIGNATURE_BLOCK_LEN = 13

MAVLINK_IFLAG_SIGNED = 0x01

logger = logging.getLogger(__name__)

# allow MAV_IGNORE_CRC=1 to ignore CRC, allowing some
# corrupted msgs to be seen
MAVLINK_IGNORE_CRC = os.environ.get("MAV_IGNORE_CRC", 0)

# some base types from mavlink_types.h
MAVLINK_TYPE_CHAR = 0
MAVLINK_TYPE_UINT8_T = 1
MAVLINK_TYPE_INT8_T = 2
MAVLINK_TYPE_UINT16_T = 3
MAVLINK_TYPE_INT16_T = 4
MAVLINK_TYPE_UINT32_T = 5
MAVLINK_TYPE_INT32_T = 6
MAVLINK_TYPE_UINT64_T = 7
MAVLINK_TYPE_INT64_T = 8
MAVLINK_TYPE_FLOAT = 9
MAVLINK_TYPE_DOUBLE = 10

# CRC calculation using fastcrc, falling back to a pure Python implementation
# if fastcrc is not available
try:
    import fastcrc

    mcrf4xx = fastcrc.crc16.mcrf4xx
except Exception:
    mcrf4xx = None  # type: ignore


BytesLike = Union[List[int], Tuple[int], bytes, bytearray, str]


class _x25crc_slow(object):
    """CRC-16/MCRF4XX - based on checksum.h from mavlink library"""

    crc: int

    def __init__(self, buf: Optional[BytesLike] = None):
        self.crc = 0xFFFF
        if buf is not None:
            self.accumulate(buf)

    def accumulate(self, buf: BytesLike) -> None:
        """add in some more bytes (it also accepts strings)"""
        if isinstance(buf, str):
            buf = buf.encode()

        accum = self.crc
        for b in buf:
            tmp = b ^ (accum & 0xFF)
            tmp = (tmp ^ (tmp << 4)) & 0xFF
            accum = (accum >> 8) ^ (tmp << 8) ^ (tmp << 3) ^ (tmp >> 4)
        self.crc = accum


class _x25crc_fast(object):
    """CRC-16/MCRF4XX - based on checksum.h from mavlink library"""

    def __init__(self, buf: Optional[BytesLike] = None):
        self.crc = 0xFFFF
        if buf is not None:
            self.accumulate(buf)

    def accumulate(self, buf: BytesLike) -> None:
        """add in some more bytes (it also accepts strings)"""
        if isinstance(buf, str):
            buf_as_bytes = bytes(buf.encode())
        elif isinstance(buf, (list, tuple, bytearray)):
            buf_as_bytes = bytes(buf)
        else:
            buf_as_bytes = buf
        self.crc = mcrf4xx(buf_as_bytes, self.crc)


x25crc = _x25crc_fast if mcrf4xx is not None else _x25crc_slow


class MAVLink_header(object):
    """MAVLink message header"""

    def __init__(
        self,
        msgId: int,
        incompat_flags: int = 0,
        compat_flags: int = 0,
        mlen: int = 0,
        seq: int = 0,
        srcSystem: int = 0,
        srcComponent: int = 0,
    ) -> None:
        self.mlen = mlen
        self.seq = seq
        self.srcSystem = srcSystem
        self.srcComponent = srcComponent
        self.msgId = msgId
        self.incompat_flags = incompat_flags
        self.compat_flags = compat_flags

    def pack(self, force_mavlink1: bool = False) -> bytes:
        if float(WIRE_PROTOCOL_VERSION) == 2.0 and not force_mavlink1:
            return struct.pack(
                "<BBBBBBBHB",
                254,
                self.mlen,
                self.incompat_flags,
                self.compat_flags,
                self.seq,
                self.srcSystem,
                self.srcComponent,
                self.msgId & 0xFFFF,
                self.msgId >> 16,
            )
        return struct.pack(
            "<BBBBBB",
            PROTOCOL_MARKER_V1,
            self.mlen,
            self.seq,
            self.srcSystem,
            self.srcComponent,
            self.msgId,
        )


class MAVLink_message(object):
    """base MAVLink message class"""

    id = 0
    msgname = ""
    fieldnames: List[str] = []
    ordered_fieldnames: List[str] = []
    fieldtypes: List[str] = []
    fielddisplays_by_name: Dict[str, str] = {}
    fieldenums_by_name: Dict[str, str] = {}
    fieldunits_by_name: Dict[str, str] = {}
    native_format = bytearray(b"")
    orders: List[int] = []
    lengths: List[int] = []
    array_lengths: List[int] = []
    crc_extra = 0
    unpacker = struct.Struct("")
    instance_field: Optional[str] = None
    instance_offset = -1

    def __init__(self, msgId: int, name: str) -> None:
        self._header = MAVLink_header(msgId)
        self._payload: Optional[bytes] = None
        self._msgbuf = bytearray(b"")
        self._crc: Optional[int] = None
        self._fieldnames: List[str] = []
        self._type = name
        self._signed = False
        self._link_id: Optional[int] = None
        self._instances: Optional[Dict[str, str]] = None
        self._instance_field: Optional[str] = None

    def format_attr(self, field: str) -> Union[str, float, int]:
        """override field getter"""
        raw_attr: Union[bytes, float, int] = getattr(self, field)
        if isinstance(raw_attr, bytes):
            return raw_attr.decode(errors="backslashreplace").rstrip("\x00")
        return raw_attr

    def get_msgbuf(self) -> bytearray:
        return self._msgbuf

    def get_header(self) -> MAVLink_header:
        return self._header

    def get_payload(self) -> Optional[bytes]:
        return self._payload

    def get_crc(self) -> Optional[int]:
        return self._crc

    def get_fieldnames(self) -> List[str]:
        return self._fieldnames

    def get_type(self) -> str:
        return self._type

    def get_msgId(self) -> int:
        return self._header.msgId

    def get_srcSystem(self) -> int:
        return self._header.srcSystem

    def get_srcComponent(self) -> int:
        return self._header.srcComponent

    def get_seq(self) -> int:
        return self._header.seq

    def get_signed(self) -> bool:
        return self._signed

    def get_link_id(self) -> Optional[int]:
        return self._link_id

    def __str__(self) -> str:
        ret = "%s {" % self._type
        for a in self._fieldnames:
            v = self.format_attr(a)
            ret += "%s : %s, " % (a, v)
        ret = ret[0:-2] + "}"
        return ret

    def __ne__(self, other: object) -> bool:
        return not self.__eq__(other)

    def __eq__(self, other: object) -> bool:
        if other is None:
            return False

        if not isinstance(other, MAVLink_message):
            return False

        if self.get_type() != other.get_type():
            return False

        if self.get_crc() != other.get_crc():
            return False

        if self.get_seq() != other.get_seq():
            return False

        if self.get_srcSystem() != other.get_srcSystem():
            return False

        if self.get_srcComponent() != other.get_srcComponent():
            return False

        for a in self._fieldnames:
            if self.format_attr(a) != other.format_attr(a):
                return False

        return True

    def to_dict(self) -> Dict[str, Union[str, float, int]]:
        d: Dict[str, Union[str, float, int]] = {}
        d["mavpackettype"] = self._type
        for a in self._fieldnames:
            d[a] = self.format_attr(a)
        return d

    def to_json(self) -> str:
        return json.dumps(self.to_dict())

    def sign_packet(self, mav: "MAVLink") -> None:
        assert mav.signing.secret_key is not None

        h = hashlib.new("sha256")
        self._msgbuf += struct.pack("<BQ", mav.signing.link_id, mav.signing.timestamp)[
            :7
        ]
        h.update(mav.signing.secret_key)
        h.update(self._msgbuf)
        sig = h.digest()[:6]
        self._msgbuf += sig
        mav.signing.timestamp += 1

    def _pack(
        self,
        mav: "MAVLink",
        crc_extra: int,
        payload: bytes,
        force_mavlink1: bool = False,
    ) -> bytes:
        plen = len(payload)
        if float(WIRE_PROTOCOL_VERSION) == 2.0 and not force_mavlink1:
            # in MAVLink2 we can strip trailing zeros off payloads. This allows for simple
            # variable length arrays and smaller packets
            nullbyte = 0
            while plen > 1 and payload[plen - 1] == nullbyte:
                plen -= 1
        self._payload = payload[:plen]
        incompat_flags = 0
        if mav.signing.sign_outgoing:
            incompat_flags |= MAVLINK_IFLAG_SIGNED
        self._header = MAVLink_header(
            self._header.msgId,
            incompat_flags=incompat_flags,
            compat_flags=0,
            mlen=len(self._payload),
            seq=mav.seq,
            srcSystem=mav.srcSystem,
            srcComponent=mav.srcComponent,
        )
        self._msgbuf = bytearray(self._header.pack(force_mavlink1=force_mavlink1))
        self._msgbuf += self._payload
        crc = x25crc(self._msgbuf[1:])
        if True:
            # we are using CRC extra
            crc.accumulate(struct.pack("B", crc_extra))
        self._crc = crc.crc
        self._msgbuf += struct.pack("<H", self._crc)
        if mav.signing.sign_outgoing and not force_mavlink1:
            self.sign_packet(mav)
        return bytes(self._msgbuf)

    def pack(self, mav: "MAVLink", force_mavlink1: bool = False) -> bytes:
        raise NotImplementedError("MAVLink_message cannot be serialized directly")

    def __getitem__(self, key: str) -> str:
        """support indexing, allowing for multi-instance sensors in one message"""
        if self._instances is None:
            raise IndexError()
        if key not in self._instances:
            raise IndexError()
        return self._instances[key]


class mavlink_msg_deprecated_name_property(object):
    """
    This handles the class variable name change from name to msgname for
    subclasses of MAVLink_message during a transition period.

    This is used by setting the class variable to
    `mavlink_msg_deprecated_name_property()`.
    """

    def __get__(
        self, instance: Optional[MAVLink_message], owner: Type[MAVLink_message]
    ) -> str:
        if instance is not None:
            logger.error(
                "Using .name on a MAVLink_message is not supported, use .get_type() instead."
            )
            raise AttributeError(
                "Class {} has no attribute 'name'".format(owner.__name__)
            )
        logger.warning(
            """Using .name on a MAVLink_message class is deprecated, consider using .msgname instead.
Note that if compatibility with pymavlink 2.4.30 and earlier is desired, use something like this:

msg_name =  msg.msgname if hasattr(msg, "msgname") else msg.name"""
        )
        return owner.msgname


# enums


class EnumEntry(object):
    def __init__(self, name: str, description: str) -> None:
        self.name = name
        self.description = description
        self.param: Dict[int, str] = {}
        self.has_location = False


class Enum(Dict[int, EnumEntry]):
    def __init__(self) -> None:
        self.bitmask = False


enums: Dict[str, Enum] = {}

# ICAROUS_TRACK_BAND_TYPES
enums["ICAROUS_TRACK_BAND_TYPES"] = Enum()
enums["ICAROUS_TRACK_BAND_TYPES"].bitmask = False
ICAROUS_TRACK_BAND_TYPE_NONE = 0
enums["ICAROUS_TRACK_BAND_TYPES"][0] = EnumEntry("ICAROUS_TRACK_BAND_TYPE_NONE", """""")
ICAROUS_TRACK_BAND_TYPE_NEAR = 1
enums["ICAROUS_TRACK_BAND_TYPES"][1] = EnumEntry("ICAROUS_TRACK_BAND_TYPE_NEAR", """""")
ICAROUS_TRACK_BAND_TYPE_RECOVERY = 2
enums["ICAROUS_TRACK_BAND_TYPES"][2] = EnumEntry(
    "ICAROUS_TRACK_BAND_TYPE_RECOVERY", """"""
)
ICAROUS_TRACK_BAND_TYPES_ENUM_END = 3
enums["ICAROUS_TRACK_BAND_TYPES"][3] = EnumEntry(
    "ICAROUS_TRACK_BAND_TYPES_ENUM_END", """"""
)

# ICAROUS_FMS_STATE
enums["ICAROUS_FMS_STATE"] = Enum()
enums["ICAROUS_FMS_STATE"].bitmask = False
ICAROUS_FMS_STATE_IDLE = 0
enums["ICAROUS_FMS_STATE"][0] = EnumEntry("ICAROUS_FMS_STATE_IDLE", """""")
ICAROUS_FMS_STATE_TAKEOFF = 1
enums["ICAROUS_FMS_STATE"][1] = EnumEntry("ICAROUS_FMS_STATE_TAKEOFF", """""")
ICAROUS_FMS_STATE_CLIMB = 2
enums["ICAROUS_FMS_STATE"][2] = EnumEntry("ICAROUS_FMS_STATE_CLIMB", """""")
ICAROUS_FMS_STATE_CRUISE = 3
enums["ICAROUS_FMS_STATE"][3] = EnumEntry("ICAROUS_FMS_STATE_CRUISE", """""")
ICAROUS_FMS_STATE_APPROACH = 4
enums["ICAROUS_FMS_STATE"][4] = EnumEntry("ICAROUS_FMS_STATE_APPROACH", """""")
ICAROUS_FMS_STATE_LAND = 5
enums["ICAROUS_FMS_STATE"][5] = EnumEntry("ICAROUS_FMS_STATE_LAND", """""")
ICAROUS_FMS_STATE_ENUM_END = 6
enums["ICAROUS_FMS_STATE"][6] = EnumEntry("ICAROUS_FMS_STATE_ENUM_END", """""")

# message IDs
MAVLINK_MSG_ID_BAD_DATA = -1
MAVLINK_MSG_ID_UNKNOWN = -2


mavlink_map: Dict[int, Type[MAVLink_message]] = {}


class MAVError(Exception):
    """MAVLink error class"""

    def __init__(self, msg: str) -> None:
        Exception.__init__(self, msg)
        self.message = msg


class MAVLink_bad_data(MAVLink_message):
    """
    a piece of bad data in a mavlink stream
    """

    def __init__(self, data: bytes, reason: str) -> None:
        MAVLink_message.__init__(self, MAVLINK_MSG_ID_BAD_DATA, "BAD_DATA")
        self._fieldnames = ["data", "reason"]
        self.data = data
        self.reason = reason
        self._msgbuf = bytearray(data)
        self._instance_field = None

    def __str__(self) -> str:
        """Override the __str__ function from MAVLink_messages because non-printable characters are common in to be the reason for this message to exist."""
        hexstr = ["{:x}".format(i) for i in self.data]
        return "%s {%s, data:%s}" % (self._type, self.reason, hexstr)


class MAVLink_unknown(MAVLink_message):
    """
    a message that we don't have in the XML used when built
    """

    def __init__(self, msgid: int, data: bytes) -> None:
        MAVLink_message.__init__(self, MAVLINK_MSG_ID_UNKNOWN, "UNKNOWN_%u" % msgid)
        self._fieldnames = ["data"]
        self.data = data
        self._msgbuf = bytearray(data)
        self._instance_field = None

    def __str__(self) -> str:
        """Override the __str__ function from MAVLink_messages because non-printable characters are common."""
        hexstr = ["{:x}".format(i) for i in self.data]
        return "%s {data:%s}" % (self._type, hexstr)


class MAVLinkSigning(object):
    """MAVLink signing state class"""

    def __init__(self) -> None:
        self.secret_key: Optional[bytes] = None
        self.timestamp = 0
        self.link_id = 0
        self.sign_outgoing = False
        self.allow_unsigned_callback: Optional[Callable[["MAVLink", int], bool]] = None
        self.stream_timestamps: Dict[Tuple[int, int, int], int] = {}
        self.sig_count = 0
        self.badsig_count = 0
        self.goodsig_count = 0
        self.unsigned_count = 0
        self.reject_count = 0


MAVLinkV1Header = Tuple[bytes, int, int, int, int, int]
MAVLinkV2Header = Tuple[bytes, int, int, int, int, int, int, int, int]


class MAVLink(object):
    """MAVLink protocol handling class"""

    def __init__(
        self,
        file: Any,
        srcSystem: int = 0,
        srcComponent: int = 0,
        use_native: bool = False,
    ) -> None:
        self.seq = 0
        self.file = file
        self.srcSystem = srcSystem
        self.srcComponent = srcComponent
        self.callback: Optional[Callable[..., None]] = None
        self.callback_args: Optional[Iterable[Any]] = None
        self.callback_kwargs: Optional[Mapping[str, Any]] = None
        self.send_callback: Optional[Callable[..., None]] = None
        self.send_callback_args: Optional[Iterable[Any]] = None
        self.send_callback_kwargs: Optional[Mapping[str, Any]] = None
        self.buf = bytearray()
        self.buf_index = 0
        self.expected_length = HEADER_LEN_V1 + 2
        self.have_prefix_error = False
        self.robust_parsing = False
        self.protocol_marker = 254
        self.little_endian = True
        self.crc_extra = True
        self.sort_fields = True
        self.total_packets_sent = 0
        self.total_bytes_sent = 0
        self.total_packets_received = 0
        self.total_bytes_received = 0
        self.total_receive_errors = 0
        self.startup_time = time.time()
        self.signing = MAVLinkSigning()
        self.mav20_unpacker = struct.Struct("<cBBBBBBHB")
        self.mav10_unpacker = struct.Struct("<cBBBBB")
        self.mav20_h3_unpacker = struct.Struct("BBB")
        self.mav_csum_unpacker = struct.Struct("<H")
        self.mav_sign_unpacker = struct.Struct("<IH")

    def set_callback(
        self, callback: Callable[..., None], *args: Any, **kwargs: Any
    ) -> None:
        self.callback = callback
        self.callback_args = args
        self.callback_kwargs = kwargs

    def set_send_callback(
        self, callback: Callable[..., None], *args: Any, **kwargs: Any
    ) -> None:
        self.send_callback = callback
        self.send_callback_args = args
        self.send_callback_kwargs = kwargs

    def send(self, mavmsg: MAVLink_message, force_mavlink1: bool = False) -> None:
        """send a MAVLink message"""
        buf = mavmsg.pack(self, force_mavlink1=force_mavlink1)
        self.file.write(buf)
        self.seq = (self.seq + 1) % 256
        self.total_packets_sent += 1
        self.total_bytes_sent += len(buf)
        if (
            self.send_callback is not None
            and self.send_callback_args is not None
            and self.send_callback_kwargs is not None
        ):
            self.send_callback(
                mavmsg, *self.send_callback_args, **self.send_callback_kwargs
            )

    def buf_len(self) -> int:
        return len(self.buf) - self.buf_index

    def bytes_needed(self) -> int:
        """return number of bytes needed for next parsing stage"""
        ret = self.expected_length - self.buf_len()

        if ret <= 0:
            return 1
        return ret

    def __callbacks(self, msg: MAVLink_message) -> None:
        """this method exists only to make profiling results easier to read"""
        if (
            self.callback is not None
            and self.callback_args is not None
            and self.callback_kwargs is not None
        ):
            self.callback(msg, *self.callback_args, **self.callback_kwargs)

    def parse_char(self, c: Sequence[int]) -> Optional[MAVLink_message]:
        """input some data bytes, possibly returning a new message"""
        self.buf.extend(c)

        self.total_bytes_received += len(c)

        m = self.__parse_char_legacy()

        if m is not None:
            self.total_packets_received += 1
            self.__callbacks(m)
        else:
            # XXX The idea here is if we've read something and there's nothing left in
            # the buffer, reset it to 0 which frees the memory
            if self.buf_len() == 0 and self.buf_index != 0:
                self.buf = bytearray()
                self.buf_index = 0

        return m

    def __parse_char_legacy(self) -> Optional[MAVLink_message]:
        """input some data bytes, possibly returning a new message"""
        header_len = HEADER_LEN_V1
        if self.buf_len() >= 1 and self.buf[self.buf_index] == PROTOCOL_MARKER_V2:
            header_len = HEADER_LEN_V2

        m: Optional[MAVLink_message] = None
        if (
            self.buf_len() >= 1
            and self.buf[self.buf_index] != PROTOCOL_MARKER_V1
            and self.buf[self.buf_index] != PROTOCOL_MARKER_V2
        ):
            magic = self.buf[self.buf_index]
            self.buf_index += 1
            if self.robust_parsing:
                invalid_prefix_start = self.buf_index - 1
                while (
                    self.buf_len() >= 1
                    and self.buf[self.buf_index] != PROTOCOL_MARKER_V1
                    and self.buf[self.buf_index] != PROTOCOL_MARKER_V2
                ):
                    self.buf_index += 1
                m = MAVLink_bad_data(
                    self.buf[invalid_prefix_start : self.buf_index], "Bad prefix"
                )
                self.expected_length = header_len + 2
                self.total_receive_errors += 1
                return m
            if self.have_prefix_error:
                return None
            self.have_prefix_error = True
            self.total_receive_errors += 1
            raise MAVError("invalid MAVLink prefix '%s'" % magic)
        self.have_prefix_error = False
        if self.buf_len() >= 3:
            sbuf = self.buf[self.buf_index : 3 + self.buf_index]
            unpacked_h3: Tuple[int, int, int] = self.mav20_h3_unpacker.unpack(sbuf)
            magic, self.expected_length, incompat_flags = unpacked_h3
            if magic == PROTOCOL_MARKER_V2 and (incompat_flags & MAVLINK_IFLAG_SIGNED):
                self.expected_length += MAVLINK_SIGNATURE_BLOCK_LEN
            self.expected_length += header_len + 2
        if (
            self.expected_length >= (header_len + 2)
            and self.buf_len() >= self.expected_length
        ):
            mbuf = self.buf[self.buf_index : self.buf_index + self.expected_length]
            self.buf_index += self.expected_length
            self.expected_length = header_len + 2
            if self.robust_parsing:
                try:
                    if (
                        magic == PROTOCOL_MARKER_V2
                        and (incompat_flags & ~MAVLINK_IFLAG_SIGNED) != 0
                    ):
                        raise MAVError(
                            "invalid incompat_flags 0x%x 0x%x %u"
                            % (incompat_flags, magic, self.expected_length)
                        )
                    m = self.decode(mbuf)
                except MAVError as reason:
                    m = MAVLink_bad_data(mbuf, reason.message)
                    self.total_receive_errors += 1
            else:
                if (
                    magic == PROTOCOL_MARKER_V2
                    and (incompat_flags & ~MAVLINK_IFLAG_SIGNED) != 0
                ):
                    raise MAVError(
                        "invalid incompat_flags 0x%x 0x%x %u"
                        % (incompat_flags, magic, self.expected_length)
                    )
                m = self.decode(mbuf)
            return m
        return None

    def parse_buffer(self, s: Sequence[int]) -> Optional[List[MAVLink_message]]:
        """input some data bytes, possibly returning a list of new messages"""
        m = self.parse_char(s)
        if m is None:
            return None
        ret = [m]
        while True:
            m = self.parse_char(b"")
            if m is None:
                return ret
            ret.append(m)

    def check_signature(
        self, msgbuf: bytearray, srcSystem: int, srcComponent: int
    ) -> bool:
        """check signature on incoming message"""
        assert self.signing.secret_key is not None

        timestamp_buf = msgbuf[-12:-6]
        link_id = msgbuf[-13]
        tbytes: Tuple[int, int] = self.mav_sign_unpacker.unpack(timestamp_buf)
        tlow, thigh = tbytes
        timestamp = tlow + (thigh << 32)

        # see if the timestamp is acceptable
        stream_key = (link_id, srcSystem, srcComponent)
        if stream_key in self.signing.stream_timestamps:
            if timestamp <= self.signing.stream_timestamps[stream_key]:
                # reject old timestamp
                logger.info("old timestamp")
                return False
        else:
            # a new stream has appeared. Accept the timestamp if it is at most
            # one minute behind our current timestamp
            if timestamp + 6000 * 1000 < self.signing.timestamp:
                logger.info(
                    "bad new stream %s %s",
                    timestamp / (100.0 * 1000 * 60 * 60 * 24 * 365),
                    self.signing.timestamp / (100.0 * 1000 * 60 * 60 * 24 * 365),
                )
                return False

        # set the streams timestamp so we reject timestamps that go backwards
        self.signing.stream_timestamps[stream_key] = timestamp

        h = hashlib.new("sha256")
        h.update(self.signing.secret_key)
        h.update(msgbuf[:-6])
        sig1 = h.digest()[:6]
        sig2 = msgbuf[-6:]
        if sig1 != sig2:
            logger.info("sig mismatch")
            return False

        # the timestamp we next send with is the max of the received timestamp and
        # our current timestamp
        self.signing.timestamp = max(self.signing.timestamp, timestamp)
        return True

    def decode(self, msgbuf: bytearray) -> MAVLink_message:
        """decode a buffer as a MAVLink message"""
        # decode the header
        if msgbuf[0] != PROTOCOL_MARKER_V1:
            headerlen = 10
            try:
                header_v2: MAVLinkV2Header = self.mav20_unpacker.unpack(
                    msgbuf[:headerlen]
                )
            except struct.error as emsg:
                raise MAVError("Unable to unpack MAVLink header: %s" % emsg)
            (
                magic,
                mlen,
                incompat_flags,
                compat_flags,
                seq,
                srcSystem,
                srcComponent,
                msgIdlow,
                msgIdhigh,
            ) = header_v2
            msgId = msgIdlow | (msgIdhigh << 16)
        else:
            headerlen = 6
            try:
                header_v1: MAVLinkV1Header = self.mav10_unpacker.unpack(
                    msgbuf[:headerlen]
                )
            except struct.error as emsg:
                raise MAVError("Unable to unpack MAVLink header: %s" % emsg)
            magic, mlen, seq, srcSystem, srcComponent, msgId = header_v1
            incompat_flags = 0
            compat_flags = 0
        mapkey = msgId
        if (incompat_flags & MAVLINK_IFLAG_SIGNED) != 0:
            signature_len = MAVLINK_SIGNATURE_BLOCK_LEN
        else:
            signature_len = 0

        if ord(magic) != PROTOCOL_MARKER_V1 and ord(magic) != PROTOCOL_MARKER_V2:
            raise MAVError("invalid MAVLink prefix '{}'".format(hex(ord(magic))))
        if mlen != len(msgbuf) - (headerlen + 2 + signature_len):
            raise MAVError(
                "invalid MAVLink message length. Got %u expected %u, msgId=%u headerlen=%u"
                % (
                    len(msgbuf) - (headerlen + 2 + signature_len),
                    mlen,
                    msgId,
                    headerlen,
                )
            )

        if mapkey not in mavlink_map:
            return MAVLink_unknown(msgId, msgbuf)

        # decode the payload
        msgtype = mavlink_map[mapkey]
        order_map = msgtype.orders
        len_map = msgtype.lengths
        has_array = msgtype.has_array
        has_bytes = msgtype.has_bytes
        clen_map = msgtype.cumulative_lengths
        crc_extra = msgtype.crc_extra

        # decode the checksum
        try:
            crc: int = self.mav_csum_unpacker.unpack(
                msgbuf[-(2 + signature_len) :][:2]
            )[0]
        except struct.error as emsg:
            raise MAVError("Unable to unpack MAVLink CRC: %s" % emsg)
        crcbuf = msgbuf[1 : -(2 + signature_len)]
        if True:
            # using CRC extra
            crcbuf.append(crc_extra)
        crc2 = x25crc(crcbuf)
        if crc != crc2.crc and not MAVLINK_IGNORE_CRC:
            raise MAVError(
                "invalid MAVLink CRC in msgID %u 0x%04x should be 0x%04x"
                % (msgId, crc, crc2.crc)
            )

        sig_ok = False
        if signature_len == MAVLINK_SIGNATURE_BLOCK_LEN:
            self.signing.sig_count += 1
        if self.signing.secret_key is not None:
            accept_signature = False
            if signature_len == MAVLINK_SIGNATURE_BLOCK_LEN:
                sig_ok = self.check_signature(msgbuf, srcSystem, srcComponent)
                accept_signature = sig_ok
                if sig_ok:
                    self.signing.goodsig_count += 1
                else:
                    self.signing.badsig_count += 1
                if (
                    not accept_signature
                    and self.signing.allow_unsigned_callback is not None
                ):
                    accept_signature = self.signing.allow_unsigned_callback(self, msgId)
                    if accept_signature:
                        self.signing.unsigned_count += 1
                    else:
                        self.signing.reject_count += 1
            elif self.signing.allow_unsigned_callback is not None:
                accept_signature = self.signing.allow_unsigned_callback(self, msgId)
                if accept_signature:
                    self.signing.unsigned_count += 1
                else:
                    self.signing.reject_count += 1
            if not accept_signature:
                raise MAVError("Invalid signature")

        csize = msgtype.unpacker.size
        mbuf = msgbuf[headerlen : -(2 + signature_len)]
        if len(mbuf) < csize:
            # zero pad to give right size
            mbuf.extend([0] * (csize - len(mbuf)))
        if len(mbuf) < csize:
            raise MAVError(
                "Bad message of type %s length %u needs %s"
                % (msgtype, len(mbuf), csize)
            )
        mbuf = mbuf[:csize]
        try:
            t: Tuple[Union[bytes, int, float], ...] = msgtype.unpacker.unpack(mbuf)
        except struct.error as emsg:
            raise MAVError(
                "Unable to unpack MAVLink payload type=%s payloadLength=%u: %s"
                % (msgtype, len(mbuf), emsg)
            )

        tlist: List[Union[bytes, float, int, Sequence[Union[bytes, float, int]]]] = (
            list(t)
        )
        # handle sorted fields
        if True:
            if not has_array:
                # message has no arrays in it
                for i in range(0, len(tlist)):
                    tlist[i] = t[order_map[i]]
            else:
                # message has some arrays
                tlist = []
                for i in range(0, len(order_map)):
                    order = order_map[i]
                    L = len_map[order]
                    tip = clen_map[order]
                    field = t[tip]
                    if L == 1 or isinstance(field, bytes):
                        tlist.append(field)
                    else:
                        tlist.append(list(t[tip : (tip + L)]))

        # terminate any strings
        for i, elem in enumerate(tlist) if has_bytes else ():
            if isinstance(elem, bytes):
                tlist[i] = elem.rstrip(b"\x00")

        # construct the message object
        try:
            # Note that initializers don't follow the Liskov Substitution Principle
            # therefore it can't be typechecked
            m = msgtype(*tlist)  # type: ignore
        except Exception as emsg:
            raise MAVError(
                "Unable to instantiate MAVLink message of type %s : %s"
                % (msgtype, emsg)
            )
        m._signed = sig_ok
        if m._signed:
            m._link_id = msgbuf[-13]
        m._msgbuf = msgbuf
        m._payload = msgbuf[6 : -(2 + signature_len)]
        m._crc = crc
        m._header = MAVLink_header(
            msgId, incompat_flags, compat_flags, mlen, seq, srcSystem, srcComponent
        )
        return m