Repository URL to install this package:
|
Version:
0.17.0 ▾
|
gmr-ingest
/
ingestctl.py
|
|---|
import click
import wgc
import po_minijson_connect
import eddy_cv_connect
import hzg_sos_connect
import gmr_underway_connect
import saildrone_connect.pull_data
from gmr_ingest import fixed_point_observatories
import xpd_mails_connect
from pgimport import pgimport
import logging
from connectors.noaa_erddap_connect import NoaaErddapConnect
from connectors.glider_estimated_positions_connector import GliderEstimatedPositionConnect
from connectors.argo_float_ftp_connect import ArgoFloatFtpConnect
from connectors.argo_fleetmonitoring_connect import ArgoFleetmonitoringConnect
from connectors.sy_eugen_seibold_connect import SyEugenSeiboldConnect
known_sources=['WaveGlider', 'SlocumGlider', 'Ships', 'EddyCV', 'HZGSOS', 'UnderwayNMEA', 'PoMiniJson','Saildrone',
'FixedPointObservatories', 'XpdMails', "NoaaErddap", "GliderEstimatedPositions",
"ArgoFloatFtp", "ArgoFleetmonitoring",
"EugenSeibold"]
known_sinks=['OceanEddiesPostGIS', 'HZGSurveyData']
CONTEXT_SETTINGS = dict(token_normalize_func=lambda x: x.lower())
@click.command(context_settings=CONTEXT_SETTINGS) # enable lowercase option values
@click.option('--data-source', type=click.Choice(known_sources), help='Source of data', required=True)
@click.option('--data-destination', type=click.Choice(known_sinks), help='Destination of data', required=True)
def ingest(data_source, data_destination):
ingest = DataIngest(data_source, data_destination)
ingest.do_ingest()
@click.command(context_settings=CONTEXT_SETTINGS) # enable lowercase option values
def ingest_wrapup():
"""Takes care of some post-ingest stuff, e.g. calculating missing heading fields.
Should be called after all ingest calls have finished.
"""
logger = logging.getLogger(__name__)
logger.info('Calculating missing nav info (heading)...')
pgimport.add_navinfo()
logger.info('... Done! (Calculating missing nav info)')
class DataIngestError(Exception):
pass
class DataIngest(object):
def __init__(self, data_source, data_destination):
"""Handles reading data from a defined source and writing it to a deined destination
:param data_source: Source of data, specified as device type e.g. WaveGlider
:param data_destination: Destination of data, specified by system name, e.g. OceanEddiesPostGIS"""
assert data_source.lower() in [s.lower() for s in known_sources], f'Unknown data source: {data_source}. Currently implemented: {known_sources}'
assert data_destination.lower() in [s.lower() for s in known_sinks], f'Unknown data destination: {data_destination}. Currently implemented: {known_sinks}'
self.logger = logging.getLogger(__name__)
self.source = data_source
self.destination = data_destination
self.data = None
def do_ingest(self):
"""Performs ingest by getting data from source and writing it to destination"""
self.logger.info(f'Getting data for {self.source} ...')
self.data == self.get_data()
self.logger.info(f'...done! (Getting data for {self.source})')
self.logger.info(f'Writing data to {self.destination} ...')
try:
self.write_data()
self.logger.info(f'... done! (Writing data to {self.destination})')
except DataIngestError:
logging.exception(f'Error writing data for {self.source}:')
raise
def get_data(self):
if self.source.lower() == 'waveglider':
self.data = wgc.get_waveglider_data()
elif self.source.lower() == 'eddycv':
self.data = eddy_cv_connect.get_eddy_cv_data()
# only 30 placeholders for eddies in platform DB--> import max. 30 rows
# This happens rarely should make no practical difference
if len(self.data) > 30:
self.data = self.data.iloc[:30]
elif self.source.lower() == 'hzgsos':
self.data = hzg_sos_connect.get_hzg_sos_data()
elif self.source.lower() == 'UnderwayNMEA'.lower():
self.data = gmr_underway_connect.get_underway_nmea_data(['meteor'])
# TODO add more data sources? Make configurable?
elif self.source.lower() == 'pominijson':
self.data = po_minijson_connect.get_po_minijson_data()
elif self.source.lower() == 'FixedPointObservatories'.lower():
self.data = fixed_point_observatories.get_fixed_point_onservatories_data()
elif self.source.lower().startswith('saildrone'):
self.data = saildrone_connect.pull_data.get_saildrone_data()
elif self.source.lower().startswith('xpdmail'):
# temporarily set to days in the past to 0 -> all mails for initial test
# set to 1 --> i.e. current day only
self.data = xpd_mails_connect.get_xpd_mail_data(1)
elif self.source.lower().startswith('NoaaErddap'.lower()):
self.data = NoaaErddapConnect.get_all_erddap_tabledap_data()
elif self.source.lower().startswith('GliderEstimatedPositions'.lower()):
self.data = GliderEstimatedPositionConnect.get_all_estimated_glider_data()
elif self.source.lower().startswith('ArgoFloatFtp'.lower()):
self.data = ArgoFloatFtpConnect.get_all_float_data()
elif self.source.lower().startswith('ArgoFleetmonitoring'.lower()):
self.data = ArgoFleetmonitoringConnect.get_all_argo_fleetmonitoring_data()
elif self.source.lower().startswith('EugenSeibold'.lower()):
self.data = SyEugenSeiboldConnect.get_data_eugen_seibold()
else:
raise DataIngestError(f'Data source {self.source} not implemented yet. Sorry.')
def write_data(self):
if self.data is None:
raise DataIngestError('Attempted to write date before getting data. Call get_data() first!')
if self.destination.lower() == 'OceanEddiesPostGIS'.lower():
pgimport.write_to_postgis(self.data)
elif self.destination.lower() == 'HZGSurveyData'.lower():
hzg_sos_connect.write_to_surveydata('Meteor', self.data) # TODO fix in hzgconnect, having to pass 'meteor' here is not cool
else:
raise DataIngestError(f'Data destination {self.destination} not implemented yet. Sorry.')