Repository URL to install this package:
|
Version:
0.3.2 ▾
|
saildrone-connect
/
pull_data.py
|
|---|
from saildrone_connect import Config as cfg
import pandas as pd
import requests, datetime
import logging
logger = logging.getLogger(__name__)
url_endpoint_access = 'https://developer-mission.saildrone.com/v1/auth/access'
url_endpoint_timeseries = 'https://developer-mission.saildrone.com/v1/timeseries/'
known_ids = {1030: 'SD-1030', 1053: 'SD-1053'}
class SaildroneConnectPullError(Exception):
"""Exception occuring during pulling and processing data from Saildrone API"""
def get_access_data():
"""Reads data from the 'access' endpoint and returns it"""
r = requests.get(url_endpoint_access, headers={"authorization": cfg.api_token})
if not r.ok:
raise SaildroneConnectPullError(f"get access data request failed with status {r.status_code}; {r.text}")
else:
return r.json()
def get_drone_ids():
"""Returns list of ids of saildrones which can be accessed"""
ad = get_access_data()
try:
return [d['drone_id'] for d in ad['data']['access']]
except KeyError as exc:
raise SaildroneConnectPullError(f'Could not unpack access data, keys changed??\n{ad}') from exc
def poll_all_timeseries_data():
"""Reads all available data from saildrone APIs timeseries endpoint with data_set 'vehicle'.
Returns pandas df with data returned by API endpoint for each drone"""
ret = []
for d_id in get_drone_ids():
# test if this drown is known or not
platform_shortname = known_ids.get(d_id)
if not platform_shortname:
logging.warning(f'Drone with id {d_id} is not a known platform. Skipping.')
continue
for record in get_timeseries_data_json(d_id)['data']:
# add platform_shortname to record
if _is_record_vaild(record):
record['platform_shortname'] = platform_shortname
ret.append(record)
return pd.DataFrame(ret)
def _is_record_vaild(record):
"""Checks if the passed record is valid. Very rarely, saildrone API returns
records with essential data (lat, lon, timestamp) missing. In these cases, False is
returned here"""
found_error = False
mandatory_cols = ['gps_time', 'gps_lat', 'gps_lng']
found_cols = []
try:
found_cols = record.columns
except AttributeError: # hit dict?
found_cols = record.keys()
for col in mandatory_cols:
if not col in found_cols:
logging.warning(f'Did nor find required column "{col}" in saildrone data, discarding dataset!')
found_error = True
if found_error:
return False
else:
try:
datetime.datetime.fromtimestamp(record.get('gps_time'), tz=datetime.timezone.utc)
pd.to_numeric(record.get('gps_lat'), errors="raise")
pd.to_numeric(record.get('gps_lng'), errors="raise")
except:
logging.warning(f"Failed to convert data in saildrone record: {record}, discarding dataset!")
return False
return True
def _process_data(df):
"""Makes sure that data returned conforms to expectations of ingest/ geoserver"""
df['obs_timestamp'] = df['gps_time'].apply(datetime.datetime.fromtimestamp, tz=datetime.timezone.utc)
df['lat'] = pd.to_numeric(df['gps_lat'], errors="raise")
df['lon'] = pd.to_numeric(df['gps_lng'], errors="raise")
if 'heading_mean' in df.columns:
df['heading'] = df['heading_mean']
if 'speed_over_ground_mean' in df.columns:
df['speed_over_ground'] = df['speed_over_ground_mean']
def _drop_unused_columns(df):
"""Removes no longer needed columns from result data set"""
to_drop = []
for col in df:
if not col in ['platform_shortname', 'obs_timestamp', 'lat', 'lon', 'speed_over_ground', 'heading']:
to_drop.append(col)
df.drop(to_drop, axis=1, inplace=True)
def get_saildrone_data():
"""Returns all available saildrone data ready for ingest in geoserver"""
ret = poll_all_timeseries_data()
_process_data(ret)
_drop_unused_columns(ret)
return ret
def get_timeseries_data_json(drone_id):
"""
Reads data from saildrone APIs timeseries endpoint with data_set 'vehicle'.
:param drone_id: id of drone to read data from
:return json object from endpoint
"""
# set startdate to one hour ago, enddate to now. Needed by API
now = datetime.datetime.utcnow()
sooner = now - datetime.timedelta(hours=1)
# The params below return the latest (i.e. newest) record from the last hour
payload = {
'data_set': 'vehicle',
'interval': 1, # one minute
'limit': 1, # return max 3 records
'order_by': 'desc', # newest first
'start_date': sooner.strftime('%Y-%m-%dT%H:%M:%S%Z'),
'end_date': now.strftime('%Y-%m-%dT%H:%M:%S%Z')
}
r = requests.get(f'{url_endpoint_timeseries}{drone_id}', headers={"authorization": cfg.api_token}, params=payload)
if not r.ok:
raise SaildroneConnectPullError(f"get timeseries data request failed for drone {drone_id} "
f"with status {r.status_code}; {r.text}")
else:
return r.json()