Repository URL to install this package:
|
Version:
0.4.5 ▾
|
hzg-sos-connect
/
pull_data.py
|
|---|
# -*- coding: utf-8 -*-
"""Handling sensordata
This module handles communication with cosyna server.
Example query
http://sos.hzg.de/sos.py?request=GetObservation&service=SOS&offering=UWN_BoknisEck&observedProperty=PSAL&eventTime=2018-07-13T00:00:00Z/2018-08-15T23:59:59Z
"""
import pandas as pd
import requests
import untangle
import logging
from requests.exceptions import RequestException
from hzg_sos_connect.sensordata import SensorData, SensorDataException
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
known_devices = {'OG_Amadeus': ('amadeus', 'salinity'),
'OG_Sebastian': ('sebastian', 'salinity'),
'OG_Comet': ('comet', 'salinity'),
# not yet 'OG_Dipsydoodle': ('dipsydoodle', 'salinity'),
# not yet 'stemme': ('stemme', 'in_air'),
# alternativeto offerings above?
# 'OG_Comet': ('OceanEddies', 'Comet'),
# 'OG_Sebastian': ('OceanEddies', 'Sebastian'),
'stemme': ('stemme', 'in_air'),
'DI_02': ('OceanEddies', 'D02'),
'DI_03': ('OceanEddies', 'D03'),
'DI_04': ('OceanEddies', 'D04'),
'DI_05': ('OceanEddies', 'D05'),
'DI_06': ('OceanEddies', 'D06'),
'DI_07': ('OceanEddies', 'D07'),
'DI_09': ('OceanEddies', 'D09'),
'DI_10': ('OceanEddies', 'D10'),
'DI_11': ('OceanEddies', 'D11'),
'DI_12': ('OceanEddies', 'D12'),
'DI_13': ('OceanEddies', 'D13'),
'DI_14': ('OceanEddies', 'D14'),
'DI_15': ('OceanEddies', 'D15'),
'DI_17': ('OceanEddies', 'D17'),
'DI_18': ('OceanEddies', 'D18'),
'DI_19': ('OceanEddies', 'D19')
}
request_base_url = """http://sos.hzg.de/sos.py?request=GetObservation&Service=SOS"""
"""Base URL for COSYNA SOS request. Separated mostly for readability."""
request_base_string = """{request_base_url}&offering={offering}&observedProperty={observed_property}"""
"""str: Base string that COSYNA SOS request is constructed from"""
def request_latest_data(offering, observed_property):
"""Performs request to COSYNA SOS, requesting the most recent data point
:param offering: The COSYNA station, e.g. UWN_BoknisEck
:param observed_property: The measured property of the requested sensor, e.g. PSAL
"""
params = {
'request_base_url': request_base_url,
'offering': offering,
'observed_property': observed_property
}
try:
r = requests.get(request_base_string.format(**params))
if r.ok:
if "ValueError: No observedProperty" in r.text:
raise SensorDataException(f"HZG SOS service responded with 'ValueError: No observedProperty' "
f"({offering, observed_property})")
else:
return parse_sensor_data_xml(r.text)
else:
raise SensorDataException(f'Error connecting to {r.url}, status_code: {r.status_code}')
except RequestException as exc:
# RequestException occurs if request cannot reach endpoint.
# This is error is not stored in the response-object, request.get will raise Exception instead
if hasattr(exc, 'message'):
message = exc.message
else:
message = str(exc)
# re-raise as SensorDataException
raise SensorDataException(f'Error connecting to {request_base_string.format(**params)}: {message}')
def parse_sensor_data_xml(xml_str):
"""Generates a SensorData object from passed xml string"""
try:
xml = untangle.parse(xml_str)
offering = xml.om_Observation.gml_name.cdata
# DataBlockDefinition should always only contain one swe:components for our type of SOS query
swe_dbd = xml.om_Observation.om_resultDefinition.swe_DataBlockDefinition
observed_property = swe_dbd.swe_components.get_attribute('name')
result_fields = [swe_field.get_attribute('name')
for swe_field in swe_dbd.swe_components.swe_DataRecord.swe_field]
token_sep = swe_dbd.swe_encoding.swe_AsciiBlock.get_attribute('tokenSeparator')
result_str = xml.om_Observation.om_result.cdata
result = parse_result_str(result_str=result_str,
token_sep=token_sep,
fields=result_fields,
observed_property=observed_property)
return SensorData(offering=offering,
observed_property=result['observed_property'],
time=result['time'],
value=result['value'],
quality_flag=result['quality flag'],
data_dict=result)
except Exception as exc:
raise SensorDataException("Error parsing xml response") from exc
def parse_result_str(result_str, token_sep, fields, observed_property):
"""Parses result string
:param result_str: The string found in SOS om:result element
:param token_sep: Seperator. Value found in SOS om:result.swe:DataBlockDefinition
:param fields: Fields found in result. Value found in SOS swe:DataBlockDefinition.swe:components
:param observed_property: The observed property from om:observedProperty
:returns dict with fields as keys, values from results_str.
Additionally, dict also contains observed_property key and value key
"""
ret = {'observed_property': observed_property}
result = result_str.split(token_sep)
for i, field in enumerate(fields):
ret[field] = result[i]
ret['value'] = ret[observed_property]
return ret
class HzgSosConnectPullError(Exception):
"""Exception for occuring errors during connection to HZG SOS"""
def get_offerning_property(device_shortname):
"""Gets corresponding 'offering' (HZG SOS identifier) for a device identified by a shortname. Also returns
an observerd_property (parameter) which is assumed to have a valid value.
:param device_shortname: shortname of device
:return tuple with strings (offering, observed_property)"""
if not device_shortname in known_devices.keys():
raise HzgSosConnectPullError(f"Unknown shortname '{device_shortname}'. Choose from {known_devices.keys()}")
return known_devices[device_shortname]
def _poll_data():
data = []
for sn in known_devices.keys():
try:
platform_data = request_latest_data(*get_offerning_property(sn)).data_dict
platform_data['platform_shortname'] = sn
data.append(platform_data)
except SensorDataException as exc:
if hasattr(exc, 'message'):
message = exc.message
else:
message = 'unknown error'
logger.warning(f'Error getting data for {sn} ({get_offerning_property(sn)}) ')
# data = [request_latest_data(*get_offerning_property(sn)).data_dict for sn in known_devices.keys()]
return pd.DataFrame(data)
def _process_data(df):
"""Makes sure data formats and column names match expectations"""
df['obs_timestamp'] = pd.to_datetime(df['time'])
df['lat'] = pd.to_numeric(df['latitude'], errors='raise')
df['lon'] = pd.to_numeric(df['longitude'], errors='raise')
#df['platform_shortname'] = [get_shortname(offering) for offering in df['offering']]
return df
def _drop_unused_columns(df):
"""Drops all columns not needed for import into postgis"""
to_drop = []
for col in df:
if not col in ['platform_shortname', 'obs_timestamp', 'lat', 'lon', 'speed_over_ground', 'heading']:
to_drop.append(col)
df = df.drop(to_drop, axis=1)
return df
def get_hzg_sos_data():
"""Returns well formated data frame with data for all known HZG devices.
Assumes that the only sink for this data is the postgis db, i.e. contains only
metadata not values for observed properties.
"""
df = _poll_data()
df = _process_data(df)
df = _drop_unused_columns(df)
return df