Repository URL to install this package:
|
Version:
1.9.2 ▾
|
dship2postgis
/
osis_service.py
|
|---|
# -*- coding: utf-8 -*-
from dship2postgis.config import Config as cfg
from dship2postgis.db_service import DbService
from datetime import date, timedelta
from sqlalchemy import text
"""
Collection of service methods which query the metadata/ OSIS DB
"""
class OsisService():
sql_query_find_leg_label = """
SELECT
leg_departure_return.leg
FROM
{db_schema}.leg_departure_return
WHERE
leg_departure_return.platform='{pf_token}' AND
'{row_ts}' BETWEEN leg_departure_return.date_departure AND leg_departure_return.date_return
ORDER BY leg DESC
LIMIT 1;
"""
sql_query_find_leg_id = """
SELECT
leg_departure_return.osis_leg_id
FROM
{db_schema}.leg_departure_return
WHERE
leg_departure_return.platform='{pf_token}' AND
'{row_ts}' BETWEEN leg_departure_return.date_departure AND leg_departure_return.date_return
ORDER BY leg DESC
LIMIT 1;
"""
sql_query_find_platform_id = """
SELECT
metadata_platform.id
FROM
{db_schema}.metadata_platform
WHERE
metadata_platform.shortname='{pf_token}'
ORDER BY id
LIMIT 1;
"""
def __init__(self):
pass
@staticmethod
def find_leg_label(db_connection, platform_token, timestamp):
"""Searches db for leg label for a given platform at a given time
MAKE SURE TO OPEN DB CONNECTION FIRST
"""
the_query = OsisService.sql_query_find_leg_label.format(db_schema = cfg.DB_SCHEMA,
pf_token=platform_token,
row_ts=timestamp)
result = db_connection.execute(text(the_query))
ret = result.fetchone()
if ret:
return ret[0]
else: # nothing found in db...
return 'NA'
@staticmethod
def find_leg_id(db_connection, platform_token, timestamp):
"""Searches db for leg id for a given platform at a given time
MAKE SURE TO OPEN DB CONNECTION FIRST
"""
the_query = OsisService.sql_query_find_leg_id.format(db_schema = cfg.DB_SCHEMA,
pf_token=platform_token,
row_ts=timestamp)
result = db_connection.execute(text(the_query))
ret = result.fetchone()
if ret:
return int(ret[0])
else: # nothing found in db...
return 'NA'
@staticmethod
def find_platform_id(db_connection, platform_token):
"""Searches db for platform id for a given platform, returns lowest id if multiple results are in db
MAKE SURE TO OPEN DB CONNECTION FIRST
"""
the_query = OsisService.sql_query_find_platform_id.format(db_schema = cfg.DB_SCHEMA,
pf_token=platform_token)
result = db_connection.execute(text(the_query))
ret = result.fetchone()
if ret:
return str(ret[0])
else: # nothing found in db...
return 'NA'
def generate_leg_column(self, df):
"""generates column fitting passed dataframe, containing leg label for platform and timestamp of row"""
# her is some performance tweaking going on:
# - date_departure and date_return for each leg is taken from a view which gets the information
# from port_call table
# - timestamp in port_call is always at 00:00 of a given day
# - i.e. since legs change only once per day, there is no need to query DB for each row
# - this reduces db calls from 60*24=1440 calls per day in data to 1 fro minute resolution
df['leg'] = None # create empty column for leg label
# first get index of first entry for each day in the df
first_of_day = self.get_days_first(df)
# query DB only once for each day in data
from dship2postgis.db_service import engine
with engine.connect() as connection:
df.loc[first_of_day, 'leg'] = [OsisService.find_leg_label(connection, df.loc[idx, 'platform'], idx) for idx in first_of_day]
# forward fill data to fill in leg for each day:
# previous value (leg for first row of a day) will be used until
# new value (first row of next day) is encountered
df['leg'] = df['leg'].ffill().astype('string')
return df
def generate_leg_id_column(self, df):
"""generates column fitting passed dataframe, containing leg id for platform and timestamp of row"""
# her is some performance tweaking going on:
# - date_departure and date_return for each leg is taken from a view which gets the information
# from port_call table
# - timestamp in port_call is always at 00:00 of a given day
# - i.e. since legs change only once per day, there is no need to query DB for each row
# - this reduces db calls from 60*24=1440 calls per day in data to 1 fro minute resolution
df['osis_leg_id'] = None # create empty column for leg id
# first get index of first entry for each day in the df
first_of_day = self.get_days_first(df)
# query DB only once for each day in data
from dship2postgis.db_service import engine
with engine.connect() as connection:
df.loc[first_of_day, 'osis_leg_id'] = [OsisService.find_leg_id(connection, df.loc[idx, 'platform'], idx) for idx in first_of_day]
# forward fill data to fill in leg_id for each day:
# previous value (leg_id for first row of a day) will be used until
# new value (first row of next day) is encountered
df['osis_leg_id'] = df['osis_leg_id'].ffill().astype('int').astype('string')
df['osis_leg_id'] = df['osis_leg_id'].astype('int').astype('string') # convert to int, but keep NA values as string 'NA'
return df
def generate_platform_id_column(self, df):
"""generates column fitting passed dataframe, containing platform id for platform and timestamp of row"""
# her is some performance tweaking going on:
# - date_departure and date_return for each leg is taken from a view which gets the information
# from port_call table
# - timestamp in port_call is always at 00:00 of a given day
# - i.e. since legs change only once per day, there is no need to query DB for each row
# - this reduces db calls from 60*24=1440 calls per day in data to 1 fro minute resolution
df['osis_platform_id'] = None # create empty column for platform id
# first get index of first entry for each day in the df
first_of_day = self.get_days_first(df)
# query DB only once for each day in data
from dship2postgis.db_service import engine
with engine.connect() as connection:
df.loc[first_of_day, 'osis_platform_id'] = [OsisService.find_platform_id(connection, df.loc[idx, 'platform']) for idx in first_of_day]
# forward fill data to fill in platform_id for each day:
# previous value (platform_id for first row of a day) will be used until
# new value (first row of next day) is encountered
df['osis_platform_id'] = df['osis_platform_id'].ffill().astype('int').astype('string')
return df
def get_days_first(self, df):
"""
Returns timestamps with the first entry for each day found in the dfs index.
--> removes all but the first entry for each day found in the df
:param df:
:return:
"""
d1 = df.index.min().replace(hour=0, minute=0, second=0) # start date
d2 = df.index.max() # end date
delta = d2 - d1 # timedelta
first_of_day = []
for i in range(delta.days + 1):
day_start = d1 + timedelta(days=i)
matching = df.index.get_indexer([day_start], method='bfill')
first_of_day.append(df.iloc[matching].index[0])
return first_of_day
def add_leg_column(df):
"""Adds a column with leg label to the passed dataframe. Changes are made in place, but df is returned just in case"""
osis_service = OsisService()
osis_service.generate_leg_column(df)
return df
def add_leg_id_column(df):
"""Adds a column with leg id to the passed dataframe. Changes are made in place, but df is returned just in case"""
osis_service = OsisService()
osis_service.generate_leg_id_column(df)
return df
def add_platform_id_column(df):
"""Adds a column with platform id to the passed dataframe. Changes are made in place, but df is returned just in case"""
osis_service = OsisService()
osis_service.generate_platform_id_column(df)
return df