Repository URL to install this package:
|
Version:
0.4.5 ▾
|
hzg-sos-connect
/
push_data.py
|
|---|
# -*- coding: utf-8 -*-
"""Handling sensordata
This module handles communication with cosyna server.
Example query
http://tsimp:qgz32pu@surveydata.hzg.de/import/import.cgi?op=add_value;station=Meteor;parameter=PSAL;time=2018-08-06%2015:52:53;value=36;latitude=54;longitude=12
"""
from .sensordata import SensorData
import pandas as pd
import requests
from . import Config
cfg = Config
known_stations = {
'meteor': 'Meteor',
'm': 'Meteor'
}
known_parameters = {
'Meteor':
{
'watersalinity': 'PSAL',
'salinity': 'PSAL',
'sal': 'PSAL',
'psal': 'PSAL',
'watertemperature': 'TEMP',
'temperature': 'TEMP',
'temp': 'TEMP'
}
}
class HzgSosConnectPushError(Exception):
"""Exception for occuring errors during connection to HZG SOS"""
def get_valid_station_name(offering):
"""Selects 'official' station name used in tsimport for a device/ ship. Searches a list of pre-define aliases """
ret = known_stations.get(offering.lower().strip())
if not ret:
raise HzgSosConnectPushError(f"Unknown offering '{offering}'. Choose from {known_stations.keys()}")
return ret
def get_valid_parameter_name(station, property):
"""Selects 'official' parameter name for use in surveydate import. Searches a list of pre-define aliases
:param station: Station name. See @get_get_valid_station_name
:param property: name of observed property
"""
v_station = get_valid_station_name(station)
assert v_station in known_parameters.keys(), f"Did not find paraneters for station {station}"
params = known_parameters.get(v_station)
ret = params.get(property.lower().strip())
#if not ret:
# raise HzgSosConnectPushError(f"Unknown parameter '{property}' for station {station}. Choose from {params.keys()}")
return ret
def _check_input_data(data_frame):
required_columns = ['lat', 'lon', 'obs_timestamp']
for col in required_columns:
assert col in data_frame.columns, f"Did not find column '{col}' in input data!"
def create_sensor_date(offering, data_frame):
"""
Creates SensorData objects from data frame.
:param offering the offering/station/ship/device
:param data_frame: The data for which items are to be created
:returns List of SensorData objects, one for each identified observation in input data
"""
_check_input_data(data_frame)
station = get_valid_station_name(offering)
ret = []
for i, data_row in data_frame.iterrows():
time = data_row['obs_timestamp']
latitude = data_row['lat']
longitude = data_row['lon']
for property in data_row.index:
# try to find a mapping in known praneters
parameter = get_valid_parameter_name(station, property)
if not parameter:
continue # no known parameter
value = data_row[property]
if pd.isnull(value):
continue # no valid value
sd = SensorData(offering=station, observed_property=parameter, value=value,
time=time, data_dict={'latitude': latitude, 'longitude': longitude},
quality_flag=0)
ret.append(sd)
return ret
def push_sensor_data(sd: SensorData):
"""Attempts to write data to surveydata via http inerface"""
request_url = 'http://surveydata.hzg.de/import/import.cgi'
request_params = {
'op': 'add_value',
'station': sd.offering,
'parameter': sd.observed_property,
'time': sd.time,
'value': sd.value,
'latitude': sd.data_dict['latitude'],
'longitude': sd.data_dict['longitude']
}
request_auth = (cfg.import_user, cfg.import_pwd)
response = requests.get(url=request_url, params=request_params, auth=request_auth)
if response.ok:
return response.text
else:
raise HzgSosConnectPushError(f'Failed write to surveydata. {response.status_code}: {response.text}')
def write_to_surveydata(offering, data_frame):
"""
Creates SensorData objects from data frame and pushes them to SurveyData using http interface
:param offering the offering/station/ship/device
:param data_frame: The data for which items are to be created
:returns List of responses from SurveyData
"""
ret = []
for sd in create_sensor_date(offering, data_frame):
ret.append(push_sensor_data(sd))
return ret