Repository URL to install this package:
|
Version:
1.9.2 ▾
|
dship2postgis
/
db_service.py
|
|---|
# -*- coding: utf-8 -*-
import datetime
import uuid
from geoalchemy2 import Geometry
from .config import Config as cfg
import geopandas as gpd
import pandas as pd
from sqlalchemy import create_engine, text
SQL_CREATE_POSITIONS_TABLE = """
CREATE TABLE IF NOT EXISTS {schema}.{tablename} (
id UUID,
timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL,
geom GEOMETRY(POINT, 4326),
platform VARCHAR NOT NULL,
leg VARCHAR,
osis_leg_id int8,
osis_platform_id int8,
import_label VARCHAR,
quality_flag int4 NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS {tablename}_timestamp_idx ON {schema}.{tablename} (timestamp);
CREATE INDEX IF NOT EXISTS {tablename}_leg_idx ON {schema}.{tablename} (leg);
CREATE INDEX IF NOT EXISTS {tablename}_geom_idx ON {schema}.{tablename} USING GIST(GEOM);
CREATE UNIQUE INDEX IF NOT EXISTS {tablename}_timestamp_platform_idx ON {schema}.{tablename} (timestamp, platform);
"""
db_string = f'postgresql://{cfg.DB_USER}:{cfg.DB_PASSWD}@{cfg.DB_HOST}:5432/{cfg.DB_NAME}'
engine = create_engine(db_string)
class DbService(object):
"""This class handles database connections and requests to the DB"""
def __init__(self):
self.temp_tables = []
def _positions_from_db(self, table_name):
"""
Reads db and returns pandas dataframe with positions
:param table_name:
:return:
"""
with engine.connect() as connection:
sql = f"SELECT * FROM {cfg.DB_SCHEMA}.{table_name.lower()}"
return gpd.GeoDataFrame.from_postgis(sql, connection, geom_col='geom')
def drop_temp_tables(self):
"""Drops temporary tables created during import
requires active connection, does not open connection here
"""
with engine.connect() as connection:
for tmp_table in self.temp_tables:
connection.execute(text(f"DROP TABLE IF EXISTS {cfg.DB_SCHEMA}.{tmp_table}"))
connection.commit()
def create_positions_table(self):
"""Creates new table for positions. If it already exixts does nothing.
Table will be named as configured in cfg.DB_PPOSITIONS_TABLE. returns table name
"""
return self._create_positions_table(cfg.DB_POSITIONS_TABLE, temporary=False)
def _create_positions_table(self, table_name, temporary=True):
"""Creates a new table for holding positions.
Table will have the columns:
- id UUID,
- leg VARCHAR,
- platform VARCHAR NOT NULL,
- timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL,
- geom GEOMETRY(POINT, *conf.SRID*)
"""
table_name = table_name.lower() # postgres doesn't do upper case table names
the_query = SQL_CREATE_POSITIONS_TABLE.format(schema=cfg.DB_SCHEMA, tablename=table_name, srid=cfg.SRID)
with engine.connect() as connection:
result = connection.execute(text(the_query))
connection.commit()
if temporary:
self.temp_tables.append(table_name)
return table_name
def create_temp_table(self, geo_df, import_label, table_name=None):
"""Creates a temporary DB table from the passed geodataframe.
The created table can be merged into the real positions table later using merge_temp_table.
Temporary tables should be deleted using delete_temp_table
:param import_label: import_label to attach to all data rows. If string 'PROVIDED' is passed, expects to
find a columm 'import_label' in passed data
"""
now = (datetime.datetime.utcnow() - datetime.datetime.fromtimestamp(0))
now = round(now.total_seconds())
if not table_name:
if import_label == 'PROVIDED':
table_name = f'TEMP_POSITIONS_{geo_df["platform"].iloc[0]}_{now}'.lower()
else:
table_name = f'TEMP_POSITIONS_{import_label}_{now}'.lower()
self._create_positions_table(table_name, temporary=True)
# hide SettingWithCopyWarning, it's OK in this case
pd.set_option('mode.chained_assignment', None)
nms = uuid.UUID('adb291da-187c-11e8-a6f8-f45c899fb76f')
geo_df['timestamp'] = geo_df.index
if import_label == 'PROVIDED': # expect to find label for each row
assert 'import_label' in geo_df.columns, "Expected to find column 'import_label' in passed data!"
else: # set import label for all rows to passed value
geo_df['import_label'] = import_label
to_keep = ['geom', 'platform', 'leg', 'osis_leg_id', 'osis_platform_id', 'import_label', 'timestamp']
to_drop = [col for col in geo_df.columns if col not in to_keep]
geo_df.drop(to_drop, axis=1, inplace=True)
geo_df['id'] = geo_df.apply(lambda row: uuid.uuid5(nms, f"{row['platform']}{row['timestamp']}"), axis=1)
# make sure all '_id' columns can be cast to numeric (remove string 'NA')
for id_col in [col for col in geo_df.columns if col.endswith('_id')]:
geo_df.loc[geo_df[id_col] == 'NA', id_col] = None
try:
pd.to_numeric(geo_df[id_col], errors='raise')
except (ValueError, TypeError):
raise AssertionError(f"Found non-numeric value in column {id_col}!")
with engine.connect() as connection:
geo_df.to_sql(table_name, connection, schema=cfg.DB_SCHEMA, if_exists='append', index=False,
dtype={
'geom': Geometry('POINT', srid=cfg.SRID)}
)
# re-enable SettingWithCopyWarning from this point onwards
pd.set_option('mode.chained_assignment', 'warn')
return table_name
def merge_temp_table(self, temp_table_name):
"""
Merges the temporary table into the the real positions table
:param temp_table_name:
:return:
"""
self.create_positions_table() # does nothing if table already exists
self._merge_positions_tables(cfg.DB_POSITIONS_TABLE, temp_table_name)
def _merge_positions_tables(self, table_a, table_b):
"""Merges tables a and b by either insert or update of b to a """
# take note that quality_flag is overwritten on import to address issue 4
# default of quality_flag = 0 (not checked)
the_query = f"""
INSERT INTO {cfg.DB_SCHEMA}.{table_a}
(id, timestamp, geom, platform, leg, osis_leg_id, osis_platform_id, import_label)
SELECT new_data.id, new_data.timestamp, new_data.geom, new_data.platform, new_data.leg, new_data.osis_leg_id, new_data.osis_platform_id, new_data.import_label
FROM {cfg.DB_SCHEMA}.{table_b} AS new_data
ON CONFLICT (timestamp, platform)
DO
UPDATE SET
geom = EXCLUDED.geom,
import_label = EXCLUDED.import_label,
quality_flag=EXCLUDED.quality_flag,
osis_leg_id=EXCLUDED.osis_leg_id,
osis_platform_id=EXCLUDED.osis_platform_id
;
"""
with engine.connect() as connection:
connection.execute(text(the_query))
connection.commit()
def get_grant_statemens(self):
"""Returns grant statements needed to give necessary permission to configured user
Convenience method makes setting up db easier.
"""
# TODO auto-config for databases
# - configure db admin user
# - create postgis_import db user
# - grant permissions
ret = f"""
GRANT USAGE ON SCHEMA metadata TO {cfg.DB_USER};
GRANT SELECT ON TABLE metadata.leg TO {cfg.DB_USER};
GRANT SELECT ON TABLE metadata.platform TO {cfg.DB_USER};
GRANT SELECT ON TABLE metadata.port_call TO {cfg.DB_USER};
GRANT SELECT ON TABLE metadata.web_info_expedition TO {cfg.DB_USER};
GRANT SELECT ON TABLE metadata.event TO {cfg.DB_USER};
GRANT SELECT ON TABLE metadata.location TO {cfg.DB_USER};
GRANT SELECT ON TABLE metadata.gear TO {cfg.DB_USER};
GRANT USAGE ON SCHEMA {cfg.DB_SCHEMA} TO {cfg.DB_USER};
GRANT ALL PRIVILEGES ON SCHEMA {cfg.DB_SCHEMA} TO {cfg.DB_USER};
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA {cfg.DB_SCHEMA} TO {cfg.DB_USER};
"""
return ret