Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
hzg-sos-connect / pull_data.py
Size: Mime:
# -*- coding: utf-8 -*-
"""Handling sensordata

This module handles communication with cosyna server.

Example query
http://sos.hzg.de/sos.py?request=GetObservation&service=SOS&offering=UWN_BoknisEck&observedProperty=PSAL&eventTime=2018-07-13T00:00:00Z/2018-08-15T23:59:59Z


"""

import pandas as pd
import requests
import untangle
import logging

from requests.exceptions import RequestException

from hzg_sos_connect.sensordata import SensorData, SensorDataException

logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

known_devices = {'OG_Amadeus': ('amadeus', 'salinity'),
                 'OG_Sebastian': ('sebastian', 'salinity'),
                 'OG_Comet': ('comet', 'salinity'),
                # not yet 'OG_Dipsydoodle': ('dipsydoodle', 'salinity'),
                # not yet 'stemme': ('stemme', 'in_air'),
                # alternativeto offerings above?
                # 'OG_Comet': ('OceanEddies', 'Comet'),
                # 'OG_Sebastian': ('OceanEddies', 'Sebastian'),
                'stemme': ('stemme', 'in_air'),

                'DI_02': ('OceanEddies', 'D02'),
                'DI_03': ('OceanEddies', 'D03'),
                'DI_04': ('OceanEddies', 'D04'),
                'DI_05': ('OceanEddies', 'D05'),
                'DI_06': ('OceanEddies', 'D06'),
                'DI_07': ('OceanEddies', 'D07'),
                'DI_09': ('OceanEddies', 'D09'),
                'DI_10': ('OceanEddies', 'D10'),
                'DI_11': ('OceanEddies', 'D11'),
                'DI_12': ('OceanEddies', 'D12'),
                'DI_13': ('OceanEddies', 'D13'),
                'DI_14': ('OceanEddies', 'D14'),
                'DI_15': ('OceanEddies', 'D15'),
                'DI_17': ('OceanEddies', 'D17'),
                'DI_18': ('OceanEddies', 'D18'),
                'DI_19': ('OceanEddies', 'D19')

                 }


request_base_url = """http://sos.hzg.de/sos.py?request=GetObservation&Service=SOS"""
"""Base URL for COSYNA SOS request. Separated mostly for readability."""

request_base_string = """{request_base_url}&offering={offering}&observedProperty={observed_property}"""
"""str: Base string that COSYNA SOS request is constructed from"""


def request_latest_data(offering, observed_property):
    """Performs request to COSYNA SOS, requesting the most recent data point

    :param offering: The COSYNA station, e.g. UWN_BoknisEck
    :param observed_property: The measured property of the requested sensor, e.g. PSAL
    """
    params = {
        'request_base_url': request_base_url,
        'offering': offering,
        'observed_property': observed_property
    }
    try:
        r = requests.get(request_base_string.format(**params))
        if r.ok:
            if "ValueError: No observedProperty" in r.text:
                raise SensorDataException(f"HZG SOS service responded with 'ValueError: No observedProperty' "
                                          f"({offering, observed_property})")
            else:
                return parse_sensor_data_xml(r.text)
        else:
            raise SensorDataException(f'Error connecting to {r.url}, status_code: {r.status_code}')

    except RequestException as exc:
        # RequestException occurs if request cannot reach endpoint.
        #  This is error is not stored in the response-object, request.get will raise Exception instead
        if hasattr(exc, 'message'):
            message = exc.message
        else:
            message = str(exc)
        # re-raise as SensorDataException
        raise SensorDataException(f'Error connecting to {request_base_string.format(**params)}: {message}')


def parse_sensor_data_xml(xml_str):
    """Generates a SensorData object from passed xml string"""

    try:
        xml = untangle.parse(xml_str)
        offering = xml.om_Observation.gml_name.cdata

        # DataBlockDefinition should always only contain one swe:components for our type of SOS query
        swe_dbd = xml.om_Observation.om_resultDefinition.swe_DataBlockDefinition

        observed_property = swe_dbd.swe_components.get_attribute('name')
        result_fields = [swe_field.get_attribute('name')
                         for swe_field in swe_dbd.swe_components.swe_DataRecord.swe_field]

        token_sep = swe_dbd.swe_encoding.swe_AsciiBlock.get_attribute('tokenSeparator')
        result_str = xml.om_Observation.om_result.cdata

        result = parse_result_str(result_str=result_str,
                                  token_sep=token_sep,
                                  fields=result_fields,
                                  observed_property=observed_property)

        return SensorData(offering=offering,
                          observed_property=result['observed_property'],
                          time=result['time'],
                          value=result['value'],
                          quality_flag=result['quality flag'],
                          data_dict=result)

    except Exception as exc:
        raise SensorDataException("Error parsing xml response") from exc



def parse_result_str(result_str, token_sep, fields, observed_property):
    """Parses result string

    :param result_str: The string found in SOS om:result element
    :param token_sep: Seperator. Value found in SOS om:result.swe:DataBlockDefinition
    :param fields: Fields found in result. Value found in SOS swe:DataBlockDefinition.swe:components
    :param observed_property: The observed property from om:observedProperty

    :returns dict with fields as keys, values from results_str.
        Additionally, dict also contains observed_property key and value key
    """
    ret = {'observed_property': observed_property}

    result = result_str.split(token_sep)
    for i, field in enumerate(fields):
        ret[field] = result[i]

    ret['value'] = ret[observed_property]

    return ret


class HzgSosConnectPullError(Exception):
    """Exception for occuring errors during connection to HZG SOS"""


def get_offerning_property(device_shortname):
    """Gets corresponding 'offering' (HZG SOS identifier) for a device identified by a shortname. Also returns
     an observerd_property (parameter) which is assumed to have a valid value.

     :param device_shortname: shortname of device
     :return tuple with strings (offering, observed_property)"""

    if not device_shortname in known_devices.keys():
        raise HzgSosConnectPullError(f"Unknown shortname '{device_shortname}'. Choose from {known_devices.keys()}")

    return known_devices[device_shortname]

def _poll_data():
    data = []
    for sn in known_devices.keys():
        try:
            platform_data = request_latest_data(*get_offerning_property(sn)).data_dict
            platform_data['platform_shortname'] = sn
            data.append(platform_data)
        except SensorDataException as exc:
            if hasattr(exc, 'message'):
                message = exc.message
            else:
                message = 'unknown error'

            logger.warning(f'Error getting data for {sn} ({get_offerning_property(sn)}) ')

    # data = [request_latest_data(*get_offerning_property(sn)).data_dict for sn in known_devices.keys()]

    return pd.DataFrame(data)

def _process_data(df):
    """Makes sure data formats and column names match expectations"""

    df['obs_timestamp'] = pd.to_datetime(df['time'])
    df['lat'] = pd.to_numeric(df['latitude'], errors='raise')
    df['lon'] = pd.to_numeric(df['longitude'], errors='raise')
    #df['platform_shortname'] = [get_shortname(offering) for offering in df['offering']]

    return df

def _drop_unused_columns(df):
    """Drops all columns not needed for import into postgis"""
    to_drop = []
    for col in df:
        if not col in ['platform_shortname', 'obs_timestamp', 'lat', 'lon', 'speed_over_ground', 'heading']:
            to_drop.append(col)
    df = df.drop(to_drop, axis=1)

    return df

def get_hzg_sos_data():
    """Returns well formated data frame with data for all known HZG devices.
        Assumes that the only sink for this data is the postgis db, i.e. contains only
        metadata not values for observed properties.
    """
    df = _poll_data()
    df = _process_data(df)
    df = _drop_unused_columns(df)

    return df