Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
dship2postgis / core.py
Size: Mime:
# -*- coding: utf-8 -*-
import os

from . import osis_service, geometry_generator, db_service, check_mk_service
import gmr_underway_connect.underway_data as underway_data
from gmr_underway_connect.config import Config as udw_mail_cfg
import dshipparser.parser
from dshipparser.parser import helpers
from .config import Config as cfg
import click
import logging
import pandas as pd
import datetime
from pathlib import Path


def underwaydaily2geodataframe(platform, days_in_the_past=0):
    """
    Reads daily underway mail dm-data mails, detaches attachment, parses/imports them and generates a geodataframe.
    Will open DB connection and query OSIS for leg label, leg id, platform id for each row.
        **Requires working db config**
    :param platform: The platform of interes (alkor, poseidon, meteor,...)
    :param days_in_the_past: The number of days in the past to consider. Default:0, read all available mails
    :return: GeoDataFrame with extracted information
    """
    logging.debug(f'underwaydaily2geodataframe(platform={platform}, days_in_the_past={days_in_the_past}')
    logging.info(f'Importing daily data found in from underway mail for {platform}, '
                 f'using mailserver: {udw_mail_cfg.MAIL_SERVER}, user: {udw_mail_cfg.MAIL_USER}')
    dh = underway_data.UnderwayDailyHandler(platform)
    dh.unzip_daily_mails(days_in_the_past)

    to_import = [str(x) for x in Path(dh.unpacked_base_dir).glob('*')]
    logging.info(f"processing {len(to_import)} folders: {to_import}")
    gdfs = []

    for it in to_import:
        try:
            gdfs.append(_gdf_from_daily(it, dh.unpacked_base_dir))
        except:
            logging.exception(f"Import of {it} failed!")

    # at this point, the tmp files are no longer needed
    dh.clean_temp_dirs()

    gdf = pd.concat(gdfs)

    # fix cases where mails did not arrive in the correct order. see #22
    if not gdf.index.is_monotonic_increasing:
        gdf.sort_index(inplace=True)

    gdf = helpers.de_duplicate_index(gdf)

    return gdf


def _gdf_from_daily(dirname, unpacked_base_dir, freq='60s'):
    # cannot just call dship2postgis on each directory here since we resample the data from 1s to 60s
    base_dir = os.path.join(unpacked_base_dir, dirname)
    # generate gdfs. Skip query of osis db here, wait till resampled
    gdf = dhsip2geodataframe(base_dir, add_osis_info_cols=False)
    # resample
    gdf = gdf.asfreq(freq)
    gdf.dropna(axis=0, how='all', inplace=True)  # just in case there is empty row due to resampling
    gdf.drop(gdf[pd.isnull(gdf[dshipparser.parser.LAT_COL])].index, axis=0, inplace=True)  # and because I'm paranoid
    gdf.drop(gdf[pd.isnull(gdf[dshipparser.parser.LON_COL])].index, axis=0, inplace=True)

    # add columns w/ OSIS info now
    logging.info("Adding column with leg info rom OSIS")
    _add_osis_info_columns(gdf)

    return gdf


def import_daily_data(platform, days_in_the_past=0, import_label=None):
    """
        Reads daily underway mail dm-data mails, and imports them to postgis

        :param platform: The platform of interest (alkor, poseidon, meteor,...)
        :param days_in_the_past: The number of days in the past to consider. Default:0, read all available mails
        :param import_label: A string that will be written do DB to reference the origin of each imported row.
            If no value is provided, the import label will be generated automatically as follows:
            nmea_import_{platform}_{date}
    """
    if not import_label:
        ts_label = _generate_timestamp_label()
        import_label = f"daily_import_{platform}_{ts_label}"

    logging.info(f'Importing daily underway data for {platform} into postgresql://{cfg.DB_USER}:XXXX@{cfg.DB_HOST}:5432/{cfg.DB_NAME} '
                 f'{cfg.DB_SCHEMA}.{cfg.DB_POSITIONS_TABLE}')

    gdf = underwaydaily2geodataframe(platform, days_in_the_past=days_in_the_past)
    persist_data(gdf, import_label)


def nmea2geodataframe(platform, days_in_the_past=0, add_osis_info_columns=True):
    """
    Reads nmea telegrams from dm-data mails, parses them and generates a geodataframe
    :param platform: The platform of interes (alkor, poseidon, meteor,...)
    :param days_in_the_past: The number of days in the past to consider. Default:0, read all available mails
    :param add_leg_column: if True, will open DB connection and query OSIS for leg label for each row.
        **Requires working db config**
    :return: GeoDataFrame with extracted information
    """
    logging.info(f'Parsing nmea-data found in from underway mail for {platform}, '
                 f'using mailserver: {udw_mail_cfg.MAIL_SERVER}, user: {udw_mail_cfg.MAIL_USER}')
    nmea_handler = underway_data.UnderwayNmeaHandler(platform)
    df = nmea_handler.get_nmea_data(days_in_the_past)

    if df is None or len(df) < 1:
        logging.warning('Could not create geodataframe, no data!')
        return None

    df = helpers.de_duplicate_index(df)

    if add_osis_info_columns:
        _add_osis_info_columns(df)
    else:
        logging.info('Skipping generation of osis info columns for each row from OSIS')

    logging.info('Generating geodataframe from parsed data')
    gdf = geometry_generator._make_geodataframe(df)

    return gdf


def import_nmea_data(platform, days_in_the_past=0, import_label=None):
    """
        Reads in nmea mails and imports the contents to postgis DB

        :param platform: The platform of interes (alkor, meteor,...)
        :param days_in_the_past: The number of days in the past to consider. Default:0, read all available mails
        :param import_label: A string that will be written do DB to reference the origin of each imported row.
            - If no value is provided, the import label will be generated automatically as follows:
            nmea_import_{platform}_{date_of_nmea_email}

    """

    logging.info(f'Importing nmea data for {platform} into postgresql://{cfg.DB_USER}:XXXX@{cfg.DB_HOST}:5432/{cfg.DB_NAME} '
                 f'{cfg.DB_SCHEMA}.{cfg.DB_POSITIONS_TABLE}')

    gdf = nmea2geodataframe(platform, days_in_the_past=days_in_the_past, add_osis_info_columns=True)

    if gdf is not None and len(gdf) > 0:
        if not import_label:
            ts_label = _generate_timestamp_label()
            default_label = f"nmea_import_{platform}_{ts_label}"
            if 'timestamp_email' in gdf.columns:
                gdf['import_label'] = gdf.timestamp_email.apply(lambda x: f"nmea_import_{platform}_{x}" if not pd.isnull(x) else default_label)
            else:  # column 'timestamp_email' not found
                logging.warning("Expected to find column 'timestamp_email' in data from NMEA mails. Update gmr_underway_connect!")
                gdf['import_label'] = default_label

        persist_data(gdf, 'PROVIDED')
    else:
        logging.warning('No data received, not persisting nothing!')


def dhsip2geodataframe(base_dir, add_osis_info_cols=True):
    """
    Reads in dship export located in the base directory and returns a GeoDataFrame

    For dhsip3 exports, the directory has to contain
      -  *.sys file with meta-information
      - order_*.xml file with format and column specification
      - *.dat file the actual data

    The export **has to contain columns for lat/ lon** (see config.py for recognized columns) and a *timestamp*.
    The .dat file is expected to be headerless.

    :param base_dir: string with directory base path
    :param add_osis_info_cols: if True, will open DB connection and query OSIS for leg label, leg id and platform id
    for each row.
        **Requires working db config**
    :return: GeoDataFrame with extracted information
    """
    # first parse dship export into standardized data frame
    logging.info(f'Parsing dship-data found in {base_dir}')
    df = dshipparser.parser.parse_data(base_dir)
    df = helpers.de_duplicate_index(df)

    if add_osis_info_cols:
        _add_osis_info_columns(df)
    else:
        logging.info('Skipping generation of osis info columns for each row from OSIS')

    gdf = geometry_generator._make_geodataframe(df)

    return gdf


def import_dship_data(base_dir, import_label=None):
    """
        Reads in dship export located in the base directory and imports it to postgis DB

        For dhsip3 exports, the directory has to contain
        -  *.sys file with meta-information
        - order_*.xml file with format and column specification
        - *.dat file the actual data

        The export **has to contain columns for lat/ lon** (see config.py for recognized columns) and a *timestamp*.
        The .dat file is expected to be headerless.

        :param base_dir: string with directory base path
        :param import_label: A string that will be written do DB to reference the origin of each imported row.
            If no value is provided, the import label will be generated automatically as follows:
            dship_import_{base_dir}_{date}
    """
    if not import_label:
        dir_label = _generate_import_dir_label(base_dir)
        ts_label = _generate_timestamp_label()
        import_label = f'dship_import_{dir_label}_{ts_label}'

    logging.info(f'Importing data from {base_dir} into postgresql://{cfg.DB_USER}:XXXX@{cfg.DB_HOST}:5432/{cfg.DB_NAME} '
                 f'{cfg.DB_SCHEMA}.{cfg.DB_POSITIONS_TABLE}')
    gdf = dhsip2geodataframe(base_dir, add_osis_info_cols=True)
    persist_data(gdf, import_label)


def persist_data(geo_df, import_label):
    """
        Writes geodataframe to DB.
        :param import_label: import_label to attach to all data rows. If string 'PROVIDED' is passed, expects to
        find a columm 'import_label' in passed data
    """
    logging.info('Writing data to database')
    dbs = db_service.DbService()
    dbs.create_positions_table()  # creates table if not exists
    tmp_table = dbs.create_temp_table(geo_df, import_label)
    logging.info(f'wrote data to temporary table {tmp_table}')

    dbs.merge_temp_table(tmp_table)
    logging.info(f'merged temporary table into {cfg.DB_POSITIONS_TABLE}')

    dbs.drop_temp_tables()
    logging.info(f'dropped temporary table {dbs.temp_tables}')


@click.command(name='underwaydaily2postgis', short_help="Reads in daily underway mails imports data to postgis DB")
@click.option('-l', '--import_label', help='Label that gets attached to imported rows. If nothing is passed, label is automatically generated.')
@click.option('-d', '--days_in_the_past', type=int, help='Number of days in the past to be considered. Passing nothing or 0 will process all available mails')
@click.argument('Platform')
def cli_underwaydaily2postgis(platform, days_in_the_past=0, import_label=None):
    """
            Reads in nmea telegrams received by mail and imports data to postgis DB
            :param platform: platform to consider, e.g. alkor, meteor, ...
            :param days_in_the_past: Number of days in the past to be considered. Passing nothing or 0 will process all available mails.
            :param import_label: A string that will be written do DB to reference the origin of each imported row.
                If no value is provided, the import label will be generated automatically.

        """
    # setup logging to stdout
    _setup_logging()
    print(f'days_in_the_past recieved: {days_in_the_past}, type:{type(days_in_the_past)}')
    import_daily_data(platform, days_in_the_past, import_label)


@click.command(name='nmea2postgis', short_help="Reads in nmea telegrams received by mail and imports data to postgis DB")
@click.option('-l', '--import_label', help='Label that gets attached to imported rows. If nothing is passed, label is automatically generated.')
@click.option('-d', '--days_in_the_past', type=int, help='Number of days in the past to be considered. Passing nothing or 0 will process all available mails')
@click.argument('Platform')
def cli_nmea2postgis(platform, days_in_the_past=0, import_label=None):
    """
            Reads in nmea telegrams received by mail and imports data to postgis DB
            :param platform: platform to consider, e.g. alkor, meteor, ...
            :param days_in_the_past: Number of days in the past to be considered. Passing nothing or 0 will process all available mails.
            :param import_label: A string that will be written do DB to reference the origin of each imported row.
                If no value is provided, the import label will be generated automatically.

        """
    # setup logging to stdout
    _setup_logging()
    import_nmea_data(platform, days_in_the_past, import_label)


@click.command(name='dhsip2postgis', short_help="Reads in dship export imports it to postgis DB")
@click.option('-l', '--import_label', help='Label that gates attached to imported rows. If nothing is passed, label is automatically generated.')
@click.argument('base_dir')
def cli_dhsip2postgis(base_dir, import_label=None):
    """
            Reads in dship export located in the base directory and imports it to postgis DB

            For dhsip3 exports, the directory has to contain
            -  *.sys file with meta-information
            - order_*.xml file with format and column specification
            - *.dat file the actual data

            The export **has to contain columns for lat/ lon** (see config.py for recognized columns) and a *timestamp*.
            The .dat file is expected to be headerless.

            :param base_dir: string with directory base path
            :param import_label: A string that will be written do DB to reference the origin of each imported row.
                If no value is provided, the import label will be generated automatically as follows:
                dship_import_{base_dir}_{date}
        """
    # setup logging to stdout
    _setup_logging()

    import_dship_data(base_dir, import_label)


@click.command(name='uderway_nmea_age_check', short_help="Checks age of received NMEA underway data ")
@click.option('-w', '--warn', help='Optional: Threshold for warning age in hours', type=int, default=check_mk_service.default_warn_nmea)
@click.option('-c', '--crit', help='Optional: for critical age in hours', type=int, default=check_mk_service.default_crit_nmea)
@click.argument('platform')
def cli_underway_nmea_age_check(platform, warn, crit):
    print(check_mk_service.do_check_age_nmea(platform, warn, crit))


@click.command(name='uderway_daily_age_check', short_help="Checks age of received daily underway data ")
@click.option('-w', '--warn', help='Optional: Threshold for warning age in hours', type=int, default=check_mk_service.default_warn_daily)
@click.option('-c', '--crit', help='Optional: for critical age in hours', type=int, default=check_mk_service.default_crit_daily)
@click.argument('platform')
def cli_underway_daily_age_check(platform, warn, crit):
    print(check_mk_service.do_check_age_daily(platform, warn, crit))


def _add_osis_info_columns(df):
    """Adds columns with OSIS info (leg label, leg id, platform id) to dataframe"""

    # add column with leg label to dataframe
    logging.info('Getting leg-label for each row from OSIS')
    try:
        osis_service.add_leg_column(df)
    except:
        logging.exception('Could not create leg info column! Parse error??')

    # add column with leg label to dataframe
    logging.info("Adding column with leg id from OSIS")
    try:
        osis_service.add_leg_id_column(df)
    except:
        logging.exception('Could not create leg id column! Parse error??')

    # add column with leg label to dataframe
    logging.info("Adding column with platform id from OSIS")
    try:
        osis_service.add_platform_id_column(df)
    except:
        logging.exception('Could not create platform id column! Parse error??')

    return df


def _setup_logging():
    # setup logging to stdout
    # TODO configure log level using params (-v, -vv)
    import sys
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)

    ch = logging.StreamHandler(sys.stdout)
    ch.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    ch.setFormatter(formatter)
    root.addHandler(ch)


def _generate_timestamp_label():
    """
    generates string from current time to be used in import labels
    :return:
    """
    now = (datetime.datetime.utcnow() - datetime.datetime.fromtimestamp(0))
    now = round(now.total_seconds())

    return f"{now}"


def _generate_import_dir_label(base_dir):
    """
    Generate label from import base dir for ise as import label in dsjip imports
    :param base_dir:
    :return:
    """
    # use only topmost folder name
    dir_name = os.path.normpath(base_dir)
    dir_name = dir_name.split(os.path.sep)[-1]
    dir_name = dir_name.replace('-', '_')

    return f'{dir_name}'


def get_hmm():
    """Get a thought."""
    return 'hmmm...'


def hmm():
    """Contemplation..."""
    if helpers.get_answer():
        print(get_hmm())