Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
gmr-ingest / ingestctl.py
Size: Mime:
import click
import wgc
import po_minijson_connect
import eddy_cv_connect
import hzg_sos_connect
import gmr_underway_connect
import saildrone_connect.pull_data
from gmr_ingest import fixed_point_observatories
import xpd_mails_connect
from pgimport import pgimport
import logging
from connectors.noaa_erddap_connect import NoaaErddapConnect
from connectors.glider_estimated_positions_connector import GliderEstimatedPositionConnect
from connectors.argo_float_ftp_connect import ArgoFloatFtpConnect
from connectors.argo_fleetmonitoring_connect import ArgoFleetmonitoringConnect
from connectors.sy_eugen_seibold_connect import SyEugenSeiboldConnect


known_sources=['WaveGlider', 'SlocumGlider', 'Ships', 'EddyCV', 'HZGSOS', 'UnderwayNMEA', 'PoMiniJson','Saildrone',
               'FixedPointObservatories', 'XpdMails', "NoaaErddap", "GliderEstimatedPositions",
               "ArgoFloatFtp", "ArgoFleetmonitoring",
               "EugenSeibold"]
known_sinks=['OceanEddiesPostGIS', 'HZGSurveyData']

CONTEXT_SETTINGS = dict(token_normalize_func=lambda x: x.lower())

@click.command(context_settings=CONTEXT_SETTINGS)  # enable lowercase option values
@click.option('--data-source', type=click.Choice(known_sources), help='Source of data', required=True)
@click.option('--data-destination', type=click.Choice(known_sinks), help='Destination of data', required=True)
def ingest(data_source, data_destination):
    ingest = DataIngest(data_source, data_destination)
    ingest.do_ingest()

@click.command(context_settings=CONTEXT_SETTINGS)  # enable lowercase option values
def ingest_wrapup():
    """Takes care of some post-ingest stuff, e.g. calculating missing heading fields.

        Should be called after all ingest calls have finished.
    """
    logger = logging.getLogger(__name__)
    logger.info('Calculating missing nav info (heading)...')
    pgimport.add_navinfo()
    logger.info('... Done! (Calculating missing nav info)')


class DataIngestError(Exception):
    pass

class DataIngest(object):
    def __init__(self, data_source, data_destination):
        """Handles reading data from a defined source and writing it to a deined destination
        :param data_source: Source of data, specified as device type e.g. WaveGlider
        :param data_destination: Destination of data, specified by system name, e.g. OceanEddiesPostGIS"""

        assert data_source.lower() in [s.lower() for s in known_sources], f'Unknown data source: {data_source}. Currently implemented: {known_sources}'
        assert data_destination.lower() in [s.lower() for s in known_sinks], f'Unknown data destination: {data_destination}. Currently implemented: {known_sinks}'

        self.logger = logging.getLogger(__name__)
        self.source = data_source
        self.destination = data_destination
        self.data = None

    def do_ingest(self):
        """Performs ingest by getting data from source and writing it to destination"""
        self.logger.info(f'Getting data for {self.source} ...')
        self.data == self.get_data()
        self.logger.info(f'...done! (Getting data for {self.source})')

        self.logger.info(f'Writing data to {self.destination} ...')
        try:
            self.write_data()
            self.logger.info(f'... done! (Writing data to {self.destination})')
        except DataIngestError:
            logging.exception(f'Error writing data for {self.source}:')
            raise


    def get_data(self):
        if self.source.lower() == 'waveglider':
            self.data = wgc.get_waveglider_data()
        elif self.source.lower() == 'eddycv':
            self.data = eddy_cv_connect.get_eddy_cv_data()
            # only 30 placeholders for eddies in platform DB--> import max. 30 rows
            # This happens rarely should make no practical difference
            if len(self.data) > 30:
                self.data = self.data.iloc[:30]
        elif self.source.lower() == 'hzgsos':
            self.data = hzg_sos_connect.get_hzg_sos_data()
        elif self.source.lower() == 'UnderwayNMEA'.lower():
            self.data = gmr_underway_connect.get_underway_nmea_data(['meteor'])
            # TODO add more data sources? Make configurable?
        elif self.source.lower() == 'pominijson':
            self.data = po_minijson_connect.get_po_minijson_data()
        elif self.source.lower() == 'FixedPointObservatories'.lower():
            self.data = fixed_point_observatories.get_fixed_point_onservatories_data()
        elif self.source.lower().startswith('saildrone'):
            self.data = saildrone_connect.pull_data.get_saildrone_data()
        elif self.source.lower().startswith('xpdmail'):
            # temporarily set to days in the past to 0 -> all mails for initial test
            # set to 1 --> i.e. current day only
            self.data = xpd_mails_connect.get_xpd_mail_data(1)
        elif self.source.lower().startswith('NoaaErddap'.lower()):
            self.data = NoaaErddapConnect.get_all_erddap_tabledap_data()
        elif self.source.lower().startswith('GliderEstimatedPositions'.lower()):
            self.data = GliderEstimatedPositionConnect.get_all_estimated_glider_data()
        elif self.source.lower().startswith('ArgoFloatFtp'.lower()):
            self.data = ArgoFloatFtpConnect.get_all_float_data()
        elif self.source.lower().startswith('ArgoFleetmonitoring'.lower()):
            self.data = ArgoFleetmonitoringConnect.get_all_argo_fleetmonitoring_data()
        elif self.source.lower().startswith('EugenSeibold'.lower()):
            self.data = SyEugenSeiboldConnect.get_data_eugen_seibold()


        else:
            raise DataIngestError(f'Data source {self.source} not implemented yet. Sorry.')


    def write_data(self):
        if self.data is None:
            raise DataIngestError('Attempted to write date before getting data. Call get_data() first!')
        if self.destination.lower() == 'OceanEddiesPostGIS'.lower():
            pgimport.write_to_postgis(self.data)
        elif self.destination.lower() == 'HZGSurveyData'.lower():
            hzg_sos_connect.write_to_surveydata('Meteor', self.data)  # TODO fix in hzgconnect, having to pass 'meteor' here is not cool
        else:
            raise DataIngestError(f'Data destination {self.destination} not implemented yet. Sorry.')