Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
trident-helix4-parser / helix4_parser.py
Size: Mime:
#!/usr/bin/env python
# TridentSensors Helix 4 Tracker Iridium SBD Message Parser
#
# Parses binary SBD files and byte streams to TridentHelix4Message,
# which contains 5 GPS locations, battery level and temperature
#
# Author: Patrick Leibold
# Last change: 2019-12-06
# Copyright: GEOMAR Helmholtz Centre for Ocean Research Kiel, 2019

import io
from datetime import datetime
from datetime import timedelta
from dataclasses import dataclass


@dataclass
class GpsLocation:
    timestamp: datetime
    latitude: float
    longitude: float


@dataclass
class TridentHelix4Message:
    locations: list
    battery_voltage: float
    temperature: float

    def append_location(self, timestamp: datetime, location: GpsLocation):
        return

    def get_latest_location(self):
        return self.positions[len(self.positions) - 1]


class TridentHelix4MessageException(Exception):
    pass


class TridentHelix4MessageParser:
    # Day to use for timestamp generation
    __date: datetime

    # Message format constants
    MESSAGE_LENGTH = 50
    TIMESTAMP_BYTE_START = 0
    TIMESTAMP_BYTE_END = 3
    LOCATIONS_BYTE_START = 3
    LOCATIONS_BYTE_END = 47
    LOCATIONS_BYTES_COUNT = 9
    BATTERY_LEVEL_BYTE = 48
    TEMPERATURE_BYTE = 49

    # Byte constants
    BIT0_MASK = 0x01
    BIT1_MASK = 0x02
    BIT8_MASK = 0x08
    BIT8_MASK_NEG = 0x7F

    # Other constants
    TIME_BETWEEN_GPSFIXES_MIN = 2
    VOLTAGE_FACTOR = 10

    def __init__(self, date: datetime):
        """
        Initializes the parser. The date should be taken from the SBD email,
        as it will be required to specify the full GPS location timestamps
        """
        if not date:
            raise TridentHelix4MessageException('You have to provide a valid timestamp!')
        self.__date = date

    def parse_file(self, path) -> object:
        """
        Parses a file containing a binary encoded TridentHelix4Message
        :param path: Absolute or relative path to the file
        :return: TridentHelix4Message object
        """
        try:
            with io.open(path, "rb") as file:
                file_bytes = file.read()
                if len(file_bytes) != self.MESSAGE_LENGTH:
                    raise TridentHelix4MessageException
                    return False
        except IOError:
            raise TridentHelix4MessageParser(f'Failed to open file {path}')

        return self.parse_byte_array(file_bytes)

    def parse_byte_array(self, byte_array: bytes) -> object:
        """
        Parses a array of bytes to a TridentHelix4Message object
        :param byte_array: Byte array containing a TridentHelix4Message
        :return: TridentHelix4Message object
        """
        # Read start timestamp from byte array
        timestamp = self._parse_timestamp(byte_array[self.TIMESTAMP_BYTE_START:self.TIMESTAMP_BYTE_END])
        if timestamp:
            # Parse beacon locations
            locations = []
            for b in range(self.LOCATIONS_BYTE_START, self.LOCATIONS_BYTE_END, self.LOCATIONS_BYTES_COUNT):
                # Parse location and add to message
                coordinates = self._parse_coordinates(byte_array[b:b + self.LOCATIONS_BYTES_COUNT])
                locations.append(GpsLocation(timestamp, coordinates[0], coordinates[1]))
                # Increase location timestamp by number of minutes between GPS fixes
                timestamp += timedelta(minutes=self.TIME_BETWEEN_GPSFIXES_MIN)

            # Parse battery voltage and temperature
            battery_voltage = self._parse_battery_voltage(byte_array[self.BATTERY_LEVEL_BYTE])
            temperature = self._parse_temperature(byte_array[self.TEMPERATURE_BYTE])

            return TridentHelix4Message(locations, battery_voltage, temperature)

        return False

    def _parse_timestamp(self, byte_array: bytes) -> datetime:
        """
        Parses a 3-bytes array into a time (HH:MM:SS). The date previously specified
        in the constructor is taken to complete the timestamp
        :param byte_array: 3 bytes containing the time information HH:MM:SS
        :return: timestamp
        """
        if len(byte_array) == 3 and \
                0 <= (self.BIT8_MASK_NEG & byte_array[0]) <= 23 and \
                0 <= byte_array[1] <= 60 and \
                0 <= byte_array[2] <= 60:   # First byte: ID, not used anymore (Refer to mail from Helen)
            timestamp = datetime(self.__date.year, self.__date.month, self.__date.day,
                                 (self.BIT8_MASK_NEG & byte_array[0]), byte_array[1], byte_array[2])

            if self._has_dateskip(timestamp):
                adjusted_date = self.__date - timedelta(days=1)
                timestamp = datetime(adjusted_date.year, adjusted_date.month, adjusted_date.day,
                                 (self.BIT8_MASK_NEG & byte_array[0]), byte_array[1], byte_array[2])

            assert (timestamp - self.__date) < timedelta(hours=1), f'Parsed timestamp {timestamp} more than one hour ' \
                f'in the past future w/ parser initialisation timestamp {self.__date}'
            assert (timestamp - self.__date) > timedelta(hours=-2), f'Parsed timestamp {timestamp} more than two hours ' \
                f'in the past compared w/ parser initialisation timestamp {self.__date}'

            return timestamp

        return False


    def _has_dateskip(self, proposed_timestamp):
        """
        Check for midnight skip: since mail generation can take a few minutes,
        sometimes, close to midnight  the *next* day is written in mail body as reference day
        which results in timestamps which are off by 24hours (shifted to future)
        try to catch and correct these cases here
        :param proposed_timestamp: timestamp parsed with original data
        :return: True if dateskip was detected, False if everything seems to be allright
        """
        # assume that the gps times are within one hour of the time the parser was initialized in
        return proposed_timestamp > self.__date + timedelta(hours=1)


    def _parse_coordinates(self, byte_array: bytes):
        """
        Parses a 9-bytes array into a location (latitude/longitude)
        :param byte_array: 9 bytes containing the latitude / longitude information
        :return: Latitude and longitude
        """
        if len(byte_array) != 9:
            return False

        # Decode coordinates
        lat_deg = byte_array[0]
        lat_min = float(str(int(byte_array[1])) + "." + str(int((byte_array[2] << 8) | byte_array[3])))
        lon_deg = byte_array[4]
        lon_min = float(str(int(byte_array[5])) + "." + str(int((byte_array[6] << 8) | byte_array[7])))
        if (self.BIT0_MASK & byte_array[8]) == 0:   # First 6 Bytes: device ID (Refer to mail from Helen)
            north_south = 1     # Latitude North
        else:
            north_south = -1    # Latitude South
        if (self.BIT1_MASK & byte_array[8]) == 0:   # First 6 Bytes: device ID (Refer to mail from Helen)
            west_east = -1      # Longitude East    # East / West are swapped in manual! (Refer to mail from Helen)
        else:
            west_east = 1       # Longitude West
        latitude = north_south * (lat_deg + (lat_min / 60))
        longitude = west_east * (lon_deg + (lon_min / 60))

        return latitude, longitude

    def _parse_battery_voltage(self, byte: int) -> float:
        """
        Parses a single byte to battery voltage
        :param byte: Byte containing battery voltage x 10
        :return: Battery voltage in decimal format
        """
        return byte / self.VOLTAGE_FACTOR

    # noinspection PyMethodMayBeStatic
    def _parse_temperature(self, byte: int) -> int:
        """
        Parses a single byte to temperature
        :param byte: Byte containing temperature in two's complement format
        :return: Temperature in integer format
        """
        if (byte & (1 << 7)) != 0:  # If sign bit is set
            return byte - (1 << 7)  # Compute negative value
        return byte