Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
saildrone-connect / pull_data.py
Size: Mime:
from saildrone_connect import Config as cfg

import pandas as pd
import requests, datetime

import logging
logger = logging.getLogger(__name__)

url_endpoint_access = 'https://developer-mission.saildrone.com/v1/auth/access'
url_endpoint_timeseries = 'https://developer-mission.saildrone.com/v1/timeseries/'

known_ids = {1030: 'SD-1030', 1053: 'SD-1053'}

class SaildroneConnectPullError(Exception):
    """Exception occuring during pulling and processing data from Saildrone API"""

def get_access_data():
    """Reads data from the 'access' endpoint and returns it"""
    r = requests.get(url_endpoint_access, headers={"authorization": cfg.api_token})
    if not r.ok:
        raise SaildroneConnectPullError(f"get access data request failed with status {r.status_code}; {r.text}")
    else:
        return r.json()

def get_drone_ids():
    """Returns list of ids of saildrones which can be accessed"""
    ad = get_access_data()
    try:
        return [d['drone_id'] for d in ad['data']['access']]
    except KeyError as exc:
        raise SaildroneConnectPullError(f'Could not unpack access data, keys changed??\n{ad}') from exc

def poll_all_timeseries_data():
    """Reads all available data from saildrone APIs timeseries endpoint with data_set 'vehicle'.
    Returns pandas df with data returned by API endpoint for each drone"""
    ret = []
    for d_id in get_drone_ids():
        # test if this drown is known or not
        platform_shortname = known_ids.get(d_id)
        if not platform_shortname:
            logging.warning(f'Drone with id {d_id} is not a known platform. Skipping.')
            continue

        for record in get_timeseries_data_json(d_id)['data']:
            # add platform_shortname to record
            if _is_record_vaild(record):
                record['platform_shortname'] = platform_shortname
                ret.append(record)

    return pd.DataFrame(ret)

def _is_record_vaild(record):
    """Checks if the passed record is valid. Very rarely, saildrone API returns
    records with essential data (lat, lon, timestamp) missing. In these cases, False is
    returned here"""

    found_error = False
    mandatory_cols = ['gps_time', 'gps_lat', 'gps_lng']
    found_cols = []
    try:
        found_cols = record.columns
    except AttributeError:  # hit dict?
        found_cols = record.keys()

    for col in mandatory_cols:
        if not col in found_cols:
            logging.warning(f'Did nor find required column "{col}" in saildrone data, discarding dataset!')
            found_error = True

    if found_error:
        return False

    else:
        try:
            datetime.datetime.fromtimestamp(record.get('gps_time'), tz=datetime.timezone.utc)
            pd.to_numeric(record.get('gps_lat'), errors="raise")
            pd.to_numeric(record.get('gps_lng'), errors="raise")
        except:
            logging.warning(f"Failed to convert data in saildrone record: {record}, discarding dataset!")
            return False

    return True


def _process_data(df):
    """Makes sure that data returned conforms to expectations of ingest/ geoserver"""

    df['obs_timestamp'] = df['gps_time'].apply(datetime.datetime.fromtimestamp, tz=datetime.timezone.utc)
    df['lat'] = pd.to_numeric(df['gps_lat'], errors="raise")
    df['lon'] = pd.to_numeric(df['gps_lng'], errors="raise")
    if 'heading_mean' in df.columns:
        df['heading'] = df['heading_mean']
    if 'speed_over_ground_mean' in df.columns:
        df['speed_over_ground'] = df['speed_over_ground_mean']


def _drop_unused_columns(df):
    """Removes no longer needed columns from result data set"""
    to_drop = []
    for col in df:
        if not col in ['platform_shortname', 'obs_timestamp', 'lat', 'lon', 'speed_over_ground', 'heading']:
            to_drop.append(col)

    df.drop(to_drop, axis=1, inplace=True)


def get_saildrone_data():
    """Returns all available saildrone data ready for ingest in geoserver"""
    ret = poll_all_timeseries_data()

    _process_data(ret)
    _drop_unused_columns(ret)

    return ret


def get_timeseries_data_json(drone_id):
    """
    Reads data from saildrone APIs timeseries endpoint with data_set 'vehicle'.
    :param drone_id: id of drone to read data from
    :return json object from endpoint
    """

    # set startdate to one hour ago, enddate to now. Needed by API
    now = datetime.datetime.utcnow()
    sooner = now - datetime.timedelta(hours=1)

    # The params below return the latest (i.e. newest) record from the last hour
    payload = {
        'data_set': 'vehicle',
        'interval': 1,  # one minute
        'limit': 1, # return max 3 records
        'order_by': 'desc', # newest first
        'start_date': sooner.strftime('%Y-%m-%dT%H:%M:%S%Z'),
        'end_date': now.strftime('%Y-%m-%dT%H:%M:%S%Z')
    }

    r = requests.get(f'{url_endpoint_timeseries}{drone_id}', headers={"authorization": cfg.api_token}, params=payload)

    if not r.ok:
        raise SaildroneConnectPullError(f"get timeseries data request failed for drone {drone_id} "
                                        f"with status {r.status_code}; {r.text}")
    else:
        return r.json()