Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
hzg-sos-connect / push_data.py
Size: Mime:
# -*- coding: utf-8 -*-
"""Handling sensordata

This module handles communication with cosyna server.

Example query
http://tsimp:qgz32pu@surveydata.hzg.de/import/import.cgi?op=add_value;station=Meteor;parameter=PSAL;time=2018-08-06%2015:52:53;value=36;latitude=54;longitude=12

"""
from .sensordata import SensorData
import pandas as pd
import requests

from . import Config
cfg = Config



known_stations = {
                    'meteor': 'Meteor',
                    'm': 'Meteor'
                  }

known_parameters = {
                        'Meteor':
                            {
                                'watersalinity': 'PSAL',
                                'salinity': 'PSAL',
                                'sal': 'PSAL',
                                'psal': 'PSAL',

                                'watertemperature': 'TEMP',
                                'temperature': 'TEMP',
                                'temp': 'TEMP'
                            }
                    }

class HzgSosConnectPushError(Exception):
    """Exception for occuring errors during connection to HZG SOS"""

def get_valid_station_name(offering):
    """Selects 'official' station name used in tsimport for a device/ ship. Searches a list of pre-define aliases """
    ret = known_stations.get(offering.lower().strip())

    if not ret:
        raise HzgSosConnectPushError(f"Unknown offering '{offering}'. Choose from {known_stations.keys()}")

    return ret

def get_valid_parameter_name(station, property):
    """Selects 'official' parameter name for use in surveydate import. Searches a list of pre-define aliases
    :param station: Station name. See @get_get_valid_station_name
    :param property: name of observed property
    """

    v_station = get_valid_station_name(station)
    assert v_station in known_parameters.keys(), f"Did not find paraneters for station {station}"

    params = known_parameters.get(v_station)
    ret = params.get(property.lower().strip())

    #if not ret:
    #    raise HzgSosConnectPushError(f"Unknown parameter '{property}' for station {station}. Choose from {params.keys()}")

    return ret

def _check_input_data(data_frame):
    required_columns = ['lat', 'lon', 'obs_timestamp']
    for col in required_columns:
        assert col in data_frame.columns, f"Did not find column '{col}' in input data!"



def create_sensor_date(offering, data_frame):
    """
    Creates SensorData objects from data frame.

    :param offering the offering/station/ship/device
    :param data_frame: The data for which items are to be created
    :returns List of SensorData objects, one for each identified observation in input data
    """

    _check_input_data(data_frame)
    station = get_valid_station_name(offering)

    ret = []

    for i, data_row in data_frame.iterrows():
        time = data_row['obs_timestamp']
        latitude = data_row['lat']
        longitude = data_row['lon']
        for property in data_row.index:
            # try to find a mapping in known praneters
            parameter = get_valid_parameter_name(station, property)
            if not parameter:
                continue  # no known parameter
            value = data_row[property]
            if pd.isnull(value):
                continue  # no valid value

            sd = SensorData(offering=station, observed_property=parameter,  value=value,
                            time=time, data_dict={'latitude': latitude, 'longitude': longitude},
                            quality_flag=0)

            ret.append(sd)

    return ret


def push_sensor_data(sd: SensorData):
    """Attempts to write data to surveydata via http inerface"""
    request_url = 'http://surveydata.hzg.de/import/import.cgi'
    request_params = {
        'op': 'add_value',
        'station': sd.offering,
        'parameter': sd.observed_property,
        'time': sd.time,
        'value': sd.value,
        'latitude': sd.data_dict['latitude'],
        'longitude': sd.data_dict['longitude']
    }
    request_auth = (cfg.import_user, cfg.import_pwd)

    response = requests.get(url=request_url, params=request_params, auth=request_auth)

    if response.ok:
        return response.text
    else:
        raise HzgSosConnectPushError(f'Failed write to surveydata. {response.status_code}: {response.text}')

def write_to_surveydata(offering, data_frame):
    """
    Creates SensorData objects from data frame and pushes them to SurveyData using http interface

    :param offering the offering/station/ship/device
    :param data_frame: The data for which items are to be created
    :returns List of responses from SurveyData
    """

    ret = []
    for sd in create_sensor_date(offering, data_frame):
        ret.append(push_sensor_data(sd))

    return ret