Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
dship2postgis / osis_service.py
Size: Mime:
# -*- coding: utf-8 -*-
from dship2postgis.config import Config as cfg
from dship2postgis.db_service import DbService
from datetime import date, timedelta

from sqlalchemy import text

"""
Collection of service methods which query the metadata/ OSIS DB
"""


class OsisService():

    sql_query_find_leg_label = """
    SELECT 
        leg_departure_return.leg
    FROM 
        {db_schema}.leg_departure_return
    WHERE 
        leg_departure_return.platform='{pf_token}' AND
        '{row_ts}' BETWEEN leg_departure_return.date_departure AND leg_departure_return.date_return
    ORDER BY leg DESC 
    LIMIT 1;
    """

    sql_query_find_leg_id = """
    SELECT 
        leg_departure_return.osis_leg_id
    FROM 
        {db_schema}.leg_departure_return
    WHERE 
        leg_departure_return.platform='{pf_token}' AND
        '{row_ts}' BETWEEN leg_departure_return.date_departure AND leg_departure_return.date_return
    ORDER BY leg DESC 
    LIMIT 1;
    """

    sql_query_find_platform_id = """
    SELECT 
        metadata_platform.id
    FROM 
        {db_schema}.metadata_platform
    WHERE 
        metadata_platform.shortname='{pf_token}'
    ORDER BY id 
    LIMIT 1;
    """

    def __init__(self):
        pass

    @staticmethod
    def find_leg_label(db_connection, platform_token, timestamp):
        """Searches db for leg label for a given platform at a given time

        MAKE SURE TO OPEN DB CONNECTION FIRST
        """

        the_query = OsisService.sql_query_find_leg_label.format(db_schema = cfg.DB_SCHEMA,
                                                                pf_token=platform_token, 
                                                                row_ts=timestamp)

        result = db_connection.execute(text(the_query))
        ret = result.fetchone()

        if ret:
            return ret[0]
        else:  # nothing found in db...
            return 'NA'

    @staticmethod
    def find_leg_id(db_connection, platform_token, timestamp):
        """Searches db for leg id for a given platform at a given time

        MAKE SURE TO OPEN DB CONNECTION FIRST
        """

        the_query = OsisService.sql_query_find_leg_id.format(db_schema = cfg.DB_SCHEMA,
                                                                pf_token=platform_token, 
                                                                row_ts=timestamp)

        result = db_connection.execute(text(the_query))
        ret = result.fetchone()

        if ret:
            return int(ret[0])
        else:  # nothing found in db...
            return 'NA'

    @staticmethod
    def find_platform_id(db_connection, platform_token):
        """Searches db for platform id for a given platform, returns lowest id if multiple results are in db

        MAKE SURE TO OPEN DB CONNECTION FIRST
        """

        the_query = OsisService.sql_query_find_platform_id.format(db_schema = cfg.DB_SCHEMA,
                                                                pf_token=platform_token)

        result = db_connection.execute(text(the_query))
        ret = result.fetchone()

        if ret:
            return str(ret[0])
        else:  # nothing found in db...
            return 'NA'

    def generate_leg_column(self, df):
        """generates column fitting passed dataframe, containing leg label for platform and timestamp of row"""

        # her is some performance tweaking going on:
        # - date_departure and date_return for each leg is taken from a view which gets the information
        #   from port_call table
        # - timestamp in port_call is always at 00:00 of a given day
        # - i.e. since legs change only once per day, there is no need to query DB for each row
        # - this reduces db calls from 60*24=1440 calls per day in data to 1 fro minute resolution

        df['leg'] = None  # create empty column for leg label

        # first get index of first entry for each day in the df
        first_of_day = self.get_days_first(df)
        # query DB only once for each day in data
        from dship2postgis.db_service import engine
        with engine.connect() as connection:
            df.loc[first_of_day, 'leg'] = [OsisService.find_leg_label(connection, df.loc[idx, 'platform'], idx) for idx in first_of_day]
        # forward fill data to fill in leg for each day:
        #  previous value (leg for first row of a day) will be used until
        #  new value (first row of next day) is encountered
        df['leg'] = df['leg'].ffill().astype('string')
        return df

    def generate_leg_id_column(self, df):
        """generates column fitting passed dataframe, containing leg id for platform and timestamp of row"""

        # her is some performance tweaking going on:
        # - date_departure and date_return for each leg is taken from a view which gets the information
        #   from port_call table
        # - timestamp in port_call is always at 00:00 of a given day
        # - i.e. since legs change only once per day, there is no need to query DB for each row
        # - this reduces db calls from 60*24=1440 calls per day in data to 1 fro minute resolution

        df['osis_leg_id'] = None  # create empty column for leg id
        # first get index of first entry for each day in the df
        first_of_day = self.get_days_first(df)
        # query DB only once for each day in data
        from dship2postgis.db_service import engine
        with engine.connect() as connection:
            df.loc[first_of_day, 'osis_leg_id'] = [OsisService.find_leg_id(connection, df.loc[idx, 'platform'], idx) for idx in first_of_day]
        # forward fill data to fill in leg_id for each day:
        #  previous value (leg_id for first row of a day) will be used until
        #  new value (first row of next day) is encountered
        df['osis_leg_id'] = df['osis_leg_id'].ffill().astype('int').astype('string')
        df['osis_leg_id'] = df['osis_leg_id'].astype('int').astype('string')  # convert to int, but keep NA values as string 'NA'
        return df

    def generate_platform_id_column(self, df):
        """generates column fitting passed dataframe, containing platform id for platform and timestamp of row"""

        # her is some performance tweaking going on:
        # - date_departure and date_return for each leg is taken from a view which gets the information
        #   from port_call table
        # - timestamp in port_call is always at 00:00 of a given day
        # - i.e. since legs change only once per day, there is no need to query DB for each row
        # - this reduces db calls from 60*24=1440 calls per day in data to 1 fro minute resolution

        df['osis_platform_id'] = None  # create empty column for platform id
        # first get index of first entry for each day in the df
        first_of_day = self.get_days_first(df)
        # query DB only once for each day in data
        from dship2postgis.db_service import engine
        with engine.connect() as connection:
            df.loc[first_of_day, 'osis_platform_id'] = [OsisService.find_platform_id(connection, df.loc[idx, 'platform']) for idx in first_of_day]
        # forward fill data to fill in platform_id for each day:
        #  previous value (platform_id for first row of a day) will be used until
        #  new value (first row of next day) is encountered
        df['osis_platform_id'] = df['osis_platform_id'].ffill().astype('int').astype('string')
        return df

    def get_days_first(self, df):
        """
        Returns timestamps with the first entry for each day found in the dfs index.
        --> removes all but the first entry for each day found in the df
        :param df:
        :return:
        """
        d1 = df.index.min().replace(hour=0, minute=0, second=0)  # start date
        d2 = df.index.max()  # end date

        delta = d2 - d1  # timedelta

        first_of_day = []
        for i in range(delta.days + 1):
            day_start = d1 + timedelta(days=i)
            matching = df.index.get_indexer([day_start], method='bfill')
            first_of_day.append(df.iloc[matching].index[0])

        return first_of_day


def add_leg_column(df):
    """Adds a column with leg label to the passed dataframe. Changes are made in place, but df is returned just in case"""
    osis_service = OsisService()
    osis_service.generate_leg_column(df)

    return df


def add_leg_id_column(df):
    """Adds a column with leg id to the passed dataframe. Changes are made in place, but df is returned just in case"""
    osis_service = OsisService()
    osis_service.generate_leg_id_column(df)

    return df


def add_platform_id_column(df):
    """Adds a column with platform id to the passed dataframe. Changes are made in place, but df is returned just in case"""
    osis_service = OsisService()
    osis_service.generate_platform_id_column(df)

    return df