Repository URL to install this package:
|
Version:
1.9.2 ▾
|
dship2postgis
/
core.py
|
|---|
# -*- coding: utf-8 -*-
import os
from . import osis_service, geometry_generator, db_service, check_mk_service
import gmr_underway_connect.underway_data as underway_data
from gmr_underway_connect.config import Config as udw_mail_cfg
import dshipparser.parser
from dshipparser.parser import helpers
from .config import Config as cfg
import click
import logging
import pandas as pd
import datetime
from pathlib import Path
def underwaydaily2geodataframe(platform, days_in_the_past=0):
"""
Reads daily underway mail dm-data mails, detaches attachment, parses/imports them and generates a geodataframe.
Will open DB connection and query OSIS for leg label, leg id, platform id for each row.
**Requires working db config**
:param platform: The platform of interes (alkor, poseidon, meteor,...)
:param days_in_the_past: The number of days in the past to consider. Default:0, read all available mails
:return: GeoDataFrame with extracted information
"""
logging.debug(f'underwaydaily2geodataframe(platform={platform}, days_in_the_past={days_in_the_past}')
logging.info(f'Importing daily data found in from underway mail for {platform}, '
f'using mailserver: {udw_mail_cfg.MAIL_SERVER}, user: {udw_mail_cfg.MAIL_USER}')
dh = underway_data.UnderwayDailyHandler(platform)
dh.unzip_daily_mails(days_in_the_past)
to_import = [str(x) for x in Path(dh.unpacked_base_dir).glob('*')]
logging.info(f"processing {len(to_import)} folders: {to_import}")
gdfs = []
for it in to_import:
try:
gdfs.append(_gdf_from_daily(it, dh.unpacked_base_dir))
except:
logging.exception(f"Import of {it} failed!")
# at this point, the tmp files are no longer needed
dh.clean_temp_dirs()
gdf = pd.concat(gdfs)
# fix cases where mails did not arrive in the correct order. see #22
if not gdf.index.is_monotonic_increasing:
gdf.sort_index(inplace=True)
gdf = helpers.de_duplicate_index(gdf)
return gdf
def _gdf_from_daily(dirname, unpacked_base_dir, freq='60s'):
# cannot just call dship2postgis on each directory here since we resample the data from 1s to 60s
base_dir = os.path.join(unpacked_base_dir, dirname)
# generate gdfs. Skip query of osis db here, wait till resampled
gdf = dhsip2geodataframe(base_dir, add_osis_info_cols=False)
# resample
gdf = gdf.asfreq(freq)
gdf.dropna(axis=0, how='all', inplace=True) # just in case there is empty row due to resampling
gdf.drop(gdf[pd.isnull(gdf[dshipparser.parser.LAT_COL])].index, axis=0, inplace=True) # and because I'm paranoid
gdf.drop(gdf[pd.isnull(gdf[dshipparser.parser.LON_COL])].index, axis=0, inplace=True)
# add columns w/ OSIS info now
logging.info("Adding column with leg info rom OSIS")
_add_osis_info_columns(gdf)
return gdf
def import_daily_data(platform, days_in_the_past=0, import_label=None):
"""
Reads daily underway mail dm-data mails, and imports them to postgis
:param platform: The platform of interest (alkor, poseidon, meteor,...)
:param days_in_the_past: The number of days in the past to consider. Default:0, read all available mails
:param import_label: A string that will be written do DB to reference the origin of each imported row.
If no value is provided, the import label will be generated automatically as follows:
nmea_import_{platform}_{date}
"""
if not import_label:
ts_label = _generate_timestamp_label()
import_label = f"daily_import_{platform}_{ts_label}"
logging.info(f'Importing daily underway data for {platform} into postgresql://{cfg.DB_USER}:XXXX@{cfg.DB_HOST}:5432/{cfg.DB_NAME} '
f'{cfg.DB_SCHEMA}.{cfg.DB_POSITIONS_TABLE}')
gdf = underwaydaily2geodataframe(platform, days_in_the_past=days_in_the_past)
persist_data(gdf, import_label)
def nmea2geodataframe(platform, days_in_the_past=0, add_osis_info_columns=True):
"""
Reads nmea telegrams from dm-data mails, parses them and generates a geodataframe
:param platform: The platform of interes (alkor, poseidon, meteor,...)
:param days_in_the_past: The number of days in the past to consider. Default:0, read all available mails
:param add_leg_column: if True, will open DB connection and query OSIS for leg label for each row.
**Requires working db config**
:return: GeoDataFrame with extracted information
"""
logging.info(f'Parsing nmea-data found in from underway mail for {platform}, '
f'using mailserver: {udw_mail_cfg.MAIL_SERVER}, user: {udw_mail_cfg.MAIL_USER}')
nmea_handler = underway_data.UnderwayNmeaHandler(platform)
df = nmea_handler.get_nmea_data(days_in_the_past)
if df is None or len(df) < 1:
logging.warning('Could not create geodataframe, no data!')
return None
df = helpers.de_duplicate_index(df)
if add_osis_info_columns:
_add_osis_info_columns(df)
else:
logging.info('Skipping generation of osis info columns for each row from OSIS')
logging.info('Generating geodataframe from parsed data')
gdf = geometry_generator._make_geodataframe(df)
return gdf
def import_nmea_data(platform, days_in_the_past=0, import_label=None):
"""
Reads in nmea mails and imports the contents to postgis DB
:param platform: The platform of interes (alkor, meteor,...)
:param days_in_the_past: The number of days in the past to consider. Default:0, read all available mails
:param import_label: A string that will be written do DB to reference the origin of each imported row.
- If no value is provided, the import label will be generated automatically as follows:
nmea_import_{platform}_{date_of_nmea_email}
"""
logging.info(f'Importing nmea data for {platform} into postgresql://{cfg.DB_USER}:XXXX@{cfg.DB_HOST}:5432/{cfg.DB_NAME} '
f'{cfg.DB_SCHEMA}.{cfg.DB_POSITIONS_TABLE}')
gdf = nmea2geodataframe(platform, days_in_the_past=days_in_the_past, add_osis_info_columns=True)
if gdf is not None and len(gdf) > 0:
if not import_label:
ts_label = _generate_timestamp_label()
default_label = f"nmea_import_{platform}_{ts_label}"
if 'timestamp_email' in gdf.columns:
gdf['import_label'] = gdf.timestamp_email.apply(lambda x: f"nmea_import_{platform}_{x}" if not pd.isnull(x) else default_label)
else: # column 'timestamp_email' not found
logging.warning("Expected to find column 'timestamp_email' in data from NMEA mails. Update gmr_underway_connect!")
gdf['import_label'] = default_label
persist_data(gdf, 'PROVIDED')
else:
logging.warning('No data received, not persisting nothing!')
def dhsip2geodataframe(base_dir, add_osis_info_cols=True):
"""
Reads in dship export located in the base directory and returns a GeoDataFrame
For dhsip3 exports, the directory has to contain
- *.sys file with meta-information
- order_*.xml file with format and column specification
- *.dat file the actual data
The export **has to contain columns for lat/ lon** (see config.py for recognized columns) and a *timestamp*.
The .dat file is expected to be headerless.
:param base_dir: string with directory base path
:param add_osis_info_cols: if True, will open DB connection and query OSIS for leg label, leg id and platform id
for each row.
**Requires working db config**
:return: GeoDataFrame with extracted information
"""
# first parse dship export into standardized data frame
logging.info(f'Parsing dship-data found in {base_dir}')
df = dshipparser.parser.parse_data(base_dir)
df = helpers.de_duplicate_index(df)
if add_osis_info_cols:
_add_osis_info_columns(df)
else:
logging.info('Skipping generation of osis info columns for each row from OSIS')
gdf = geometry_generator._make_geodataframe(df)
return gdf
def import_dship_data(base_dir, import_label=None):
"""
Reads in dship export located in the base directory and imports it to postgis DB
For dhsip3 exports, the directory has to contain
- *.sys file with meta-information
- order_*.xml file with format and column specification
- *.dat file the actual data
The export **has to contain columns for lat/ lon** (see config.py for recognized columns) and a *timestamp*.
The .dat file is expected to be headerless.
:param base_dir: string with directory base path
:param import_label: A string that will be written do DB to reference the origin of each imported row.
If no value is provided, the import label will be generated automatically as follows:
dship_import_{base_dir}_{date}
"""
if not import_label:
dir_label = _generate_import_dir_label(base_dir)
ts_label = _generate_timestamp_label()
import_label = f'dship_import_{dir_label}_{ts_label}'
logging.info(f'Importing data from {base_dir} into postgresql://{cfg.DB_USER}:XXXX@{cfg.DB_HOST}:5432/{cfg.DB_NAME} '
f'{cfg.DB_SCHEMA}.{cfg.DB_POSITIONS_TABLE}')
gdf = dhsip2geodataframe(base_dir, add_osis_info_cols=True)
persist_data(gdf, import_label)
def persist_data(geo_df, import_label):
"""
Writes geodataframe to DB.
:param import_label: import_label to attach to all data rows. If string 'PROVIDED' is passed, expects to
find a columm 'import_label' in passed data
"""
logging.info('Writing data to database')
dbs = db_service.DbService()
dbs.create_positions_table() # creates table if not exists
tmp_table = dbs.create_temp_table(geo_df, import_label)
logging.info(f'wrote data to temporary table {tmp_table}')
dbs.merge_temp_table(tmp_table)
logging.info(f'merged temporary table into {cfg.DB_POSITIONS_TABLE}')
dbs.drop_temp_tables()
logging.info(f'dropped temporary table {dbs.temp_tables}')
@click.command(name='underwaydaily2postgis', short_help="Reads in daily underway mails imports data to postgis DB")
@click.option('-l', '--import_label', help='Label that gets attached to imported rows. If nothing is passed, label is automatically generated.')
@click.option('-d', '--days_in_the_past', type=int, help='Number of days in the past to be considered. Passing nothing or 0 will process all available mails')
@click.argument('Platform')
def cli_underwaydaily2postgis(platform, days_in_the_past=0, import_label=None):
"""
Reads in nmea telegrams received by mail and imports data to postgis DB
:param platform: platform to consider, e.g. alkor, meteor, ...
:param days_in_the_past: Number of days in the past to be considered. Passing nothing or 0 will process all available mails.
:param import_label: A string that will be written do DB to reference the origin of each imported row.
If no value is provided, the import label will be generated automatically.
"""
# setup logging to stdout
_setup_logging()
print(f'days_in_the_past recieved: {days_in_the_past}, type:{type(days_in_the_past)}')
import_daily_data(platform, days_in_the_past, import_label)
@click.command(name='nmea2postgis', short_help="Reads in nmea telegrams received by mail and imports data to postgis DB")
@click.option('-l', '--import_label', help='Label that gets attached to imported rows. If nothing is passed, label is automatically generated.')
@click.option('-d', '--days_in_the_past', type=int, help='Number of days in the past to be considered. Passing nothing or 0 will process all available mails')
@click.argument('Platform')
def cli_nmea2postgis(platform, days_in_the_past=0, import_label=None):
"""
Reads in nmea telegrams received by mail and imports data to postgis DB
:param platform: platform to consider, e.g. alkor, meteor, ...
:param days_in_the_past: Number of days in the past to be considered. Passing nothing or 0 will process all available mails.
:param import_label: A string that will be written do DB to reference the origin of each imported row.
If no value is provided, the import label will be generated automatically.
"""
# setup logging to stdout
_setup_logging()
import_nmea_data(platform, days_in_the_past, import_label)
@click.command(name='dhsip2postgis', short_help="Reads in dship export imports it to postgis DB")
@click.option('-l', '--import_label', help='Label that gates attached to imported rows. If nothing is passed, label is automatically generated.')
@click.argument('base_dir')
def cli_dhsip2postgis(base_dir, import_label=None):
"""
Reads in dship export located in the base directory and imports it to postgis DB
For dhsip3 exports, the directory has to contain
- *.sys file with meta-information
- order_*.xml file with format and column specification
- *.dat file the actual data
The export **has to contain columns for lat/ lon** (see config.py for recognized columns) and a *timestamp*.
The .dat file is expected to be headerless.
:param base_dir: string with directory base path
:param import_label: A string that will be written do DB to reference the origin of each imported row.
If no value is provided, the import label will be generated automatically as follows:
dship_import_{base_dir}_{date}
"""
# setup logging to stdout
_setup_logging()
import_dship_data(base_dir, import_label)
@click.command(name='uderway_nmea_age_check', short_help="Checks age of received NMEA underway data ")
@click.option('-w', '--warn', help='Optional: Threshold for warning age in hours', type=int, default=check_mk_service.default_warn_nmea)
@click.option('-c', '--crit', help='Optional: for critical age in hours', type=int, default=check_mk_service.default_crit_nmea)
@click.argument('platform')
def cli_underway_nmea_age_check(platform, warn, crit):
print(check_mk_service.do_check_age_nmea(platform, warn, crit))
@click.command(name='uderway_daily_age_check', short_help="Checks age of received daily underway data ")
@click.option('-w', '--warn', help='Optional: Threshold for warning age in hours', type=int, default=check_mk_service.default_warn_daily)
@click.option('-c', '--crit', help='Optional: for critical age in hours', type=int, default=check_mk_service.default_crit_daily)
@click.argument('platform')
def cli_underway_daily_age_check(platform, warn, crit):
print(check_mk_service.do_check_age_daily(platform, warn, crit))
def _add_osis_info_columns(df):
"""Adds columns with OSIS info (leg label, leg id, platform id) to dataframe"""
# add column with leg label to dataframe
logging.info('Getting leg-label for each row from OSIS')
try:
osis_service.add_leg_column(df)
except:
logging.exception('Could not create leg info column! Parse error??')
# add column with leg label to dataframe
logging.info("Adding column with leg id from OSIS")
try:
osis_service.add_leg_id_column(df)
except:
logging.exception('Could not create leg id column! Parse error??')
# add column with leg label to dataframe
logging.info("Adding column with platform id from OSIS")
try:
osis_service.add_platform_id_column(df)
except:
logging.exception('Could not create platform id column! Parse error??')
return df
def _setup_logging():
# setup logging to stdout
# TODO configure log level using params (-v, -vv)
import sys
root = logging.getLogger()
root.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
def _generate_timestamp_label():
"""
generates string from current time to be used in import labels
:return:
"""
now = (datetime.datetime.utcnow() - datetime.datetime.fromtimestamp(0))
now = round(now.total_seconds())
return f"{now}"
def _generate_import_dir_label(base_dir):
"""
Generate label from import base dir for ise as import label in dsjip imports
:param base_dir:
:return:
"""
# use only topmost folder name
dir_name = os.path.normpath(base_dir)
dir_name = dir_name.split(os.path.sep)[-1]
dir_name = dir_name.replace('-', '_')
return f'{dir_name}'
def get_hmm():
"""Get a thought."""
return 'hmmm...'
def hmm():
"""Contemplation..."""
if helpers.get_answer():
print(get_hmm())