Repository URL to install this package:
|
Version:
0.1.6 ▾
|
trident-helix4-parser
/
helix4_parser.py
|
|---|
#!/usr/bin/env python
# TridentSensors Helix 4 Tracker Iridium SBD Message Parser
#
# Parses binary SBD files and byte streams to TridentHelix4Message,
# which contains 5 GPS locations, battery level and temperature
#
# Author: Patrick Leibold
# Last change: 2019-12-06
# Copyright: GEOMAR Helmholtz Centre for Ocean Research Kiel, 2019
import io
from datetime import datetime
from datetime import timedelta
from dataclasses import dataclass
@dataclass
class GpsLocation:
timestamp: datetime
latitude: float
longitude: float
@dataclass
class TridentHelix4Message:
locations: list
battery_voltage: float
temperature: float
def append_location(self, timestamp: datetime, location: GpsLocation):
return
def get_latest_location(self):
return self.positions[len(self.positions) - 1]
class TridentHelix4MessageException(Exception):
pass
class TridentHelix4MessageParser:
# Day to use for timestamp generation
__date: datetime
# Message format constants
MESSAGE_LENGTH = 50
TIMESTAMP_BYTE_START = 0
TIMESTAMP_BYTE_END = 3
LOCATIONS_BYTE_START = 3
LOCATIONS_BYTE_END = 47
LOCATIONS_BYTES_COUNT = 9
BATTERY_LEVEL_BYTE = 48
TEMPERATURE_BYTE = 49
# Byte constants
BIT0_MASK = 0x01
BIT1_MASK = 0x02
BIT8_MASK = 0x08
BIT8_MASK_NEG = 0x7F
# Other constants
TIME_BETWEEN_GPSFIXES_MIN = 2
VOLTAGE_FACTOR = 10
def __init__(self, date: datetime):
"""
Initializes the parser. The date should be taken from the SBD email,
as it will be required to specify the full GPS location timestamps
"""
if not date:
raise TridentHelix4MessageException('You have to provide a valid timestamp!')
self.__date = date
def parse_file(self, path) -> object:
"""
Parses a file containing a binary encoded TridentHelix4Message
:param path: Absolute or relative path to the file
:return: TridentHelix4Message object
"""
try:
with io.open(path, "rb") as file:
file_bytes = file.read()
if len(file_bytes) != self.MESSAGE_LENGTH:
raise TridentHelix4MessageException
return False
except IOError:
raise TridentHelix4MessageParser(f'Failed to open file {path}')
return self.parse_byte_array(file_bytes)
def parse_byte_array(self, byte_array: bytes) -> object:
"""
Parses a array of bytes to a TridentHelix4Message object
:param byte_array: Byte array containing a TridentHelix4Message
:return: TridentHelix4Message object
"""
# Read start timestamp from byte array
timestamp = self._parse_timestamp(byte_array[self.TIMESTAMP_BYTE_START:self.TIMESTAMP_BYTE_END])
if timestamp:
# Parse beacon locations
locations = []
for b in range(self.LOCATIONS_BYTE_START, self.LOCATIONS_BYTE_END, self.LOCATIONS_BYTES_COUNT):
# Parse location and add to message
coordinates = self._parse_coordinates(byte_array[b:b + self.LOCATIONS_BYTES_COUNT])
locations.append(GpsLocation(timestamp, coordinates[0], coordinates[1]))
# Increase location timestamp by number of minutes between GPS fixes
timestamp += timedelta(minutes=self.TIME_BETWEEN_GPSFIXES_MIN)
# Parse battery voltage and temperature
battery_voltage = self._parse_battery_voltage(byte_array[self.BATTERY_LEVEL_BYTE])
temperature = self._parse_temperature(byte_array[self.TEMPERATURE_BYTE])
return TridentHelix4Message(locations, battery_voltage, temperature)
return False
def _parse_timestamp(self, byte_array: bytes) -> datetime:
"""
Parses a 3-bytes array into a time (HH:MM:SS). The date previously specified
in the constructor is taken to complete the timestamp
:param byte_array: 3 bytes containing the time information HH:MM:SS
:return: timestamp
"""
if len(byte_array) == 3 and \
0 <= (self.BIT8_MASK_NEG & byte_array[0]) <= 23 and \
0 <= byte_array[1] <= 60 and \
0 <= byte_array[2] <= 60: # First byte: ID, not used anymore (Refer to mail from Helen)
timestamp = datetime(self.__date.year, self.__date.month, self.__date.day,
(self.BIT8_MASK_NEG & byte_array[0]), byte_array[1], byte_array[2])
if self._has_dateskip(timestamp):
adjusted_date = self.__date - timedelta(days=1)
timestamp = datetime(adjusted_date.year, adjusted_date.month, adjusted_date.day,
(self.BIT8_MASK_NEG & byte_array[0]), byte_array[1], byte_array[2])
assert (timestamp - self.__date) < timedelta(hours=1), f'Parsed timestamp {timestamp} more than one hour ' \
f'in the past future w/ parser initialisation timestamp {self.__date}'
assert (timestamp - self.__date) > timedelta(hours=-2), f'Parsed timestamp {timestamp} more than two hours ' \
f'in the past compared w/ parser initialisation timestamp {self.__date}'
return timestamp
return False
def _has_dateskip(self, proposed_timestamp):
"""
Check for midnight skip: since mail generation can take a few minutes,
sometimes, close to midnight the *next* day is written in mail body as reference day
which results in timestamps which are off by 24hours (shifted to future)
try to catch and correct these cases here
:param proposed_timestamp: timestamp parsed with original data
:return: True if dateskip was detected, False if everything seems to be allright
"""
# assume that the gps times are within one hour of the time the parser was initialized in
return proposed_timestamp > self.__date + timedelta(hours=1)
def _parse_coordinates(self, byte_array: bytes):
"""
Parses a 9-bytes array into a location (latitude/longitude)
:param byte_array: 9 bytes containing the latitude / longitude information
:return: Latitude and longitude
"""
if len(byte_array) != 9:
return False
# Decode coordinates
lat_deg = byte_array[0]
lat_min = float(str(int(byte_array[1])) + "." + str(int((byte_array[2] << 8) | byte_array[3])))
lon_deg = byte_array[4]
lon_min = float(str(int(byte_array[5])) + "." + str(int((byte_array[6] << 8) | byte_array[7])))
if (self.BIT0_MASK & byte_array[8]) == 0: # First 6 Bytes: device ID (Refer to mail from Helen)
north_south = 1 # Latitude North
else:
north_south = -1 # Latitude South
if (self.BIT1_MASK & byte_array[8]) == 0: # First 6 Bytes: device ID (Refer to mail from Helen)
west_east = -1 # Longitude East # East / West are swapped in manual! (Refer to mail from Helen)
else:
west_east = 1 # Longitude West
latitude = north_south * (lat_deg + (lat_min / 60))
longitude = west_east * (lon_deg + (lon_min / 60))
return latitude, longitude
def _parse_battery_voltage(self, byte: int) -> float:
"""
Parses a single byte to battery voltage
:param byte: Byte containing battery voltage x 10
:return: Battery voltage in decimal format
"""
return byte / self.VOLTAGE_FACTOR
# noinspection PyMethodMayBeStatic
def _parse_temperature(self, byte: int) -> int:
"""
Parses a single byte to temperature
:param byte: Byte containing temperature in two's complement format
:return: Temperature in integer format
"""
if (byte & (1 << 7)) != 0: # If sign bit is set
return byte - (1 << 7) # Compute negative value
return byte