Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
dship2postgis / db_service.py
Size: Mime:
# -*- coding: utf-8 -*-
import datetime
import uuid

from geoalchemy2 import Geometry

from .config import Config as cfg
import geopandas as gpd
import pandas as pd

from sqlalchemy import create_engine, text

SQL_CREATE_POSITIONS_TABLE = """
CREATE TABLE IF NOT EXISTS {schema}.{tablename} (
      id UUID, 
	  timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL,
	  geom GEOMETRY(POINT, 4326),
      platform VARCHAR NOT NULL,
      leg VARCHAR,
      osis_leg_id int8,
      osis_platform_id int8,
      import_label VARCHAR,
      quality_flag int4 NOT NULL DEFAULT 0
	);

	CREATE INDEX IF NOT EXISTS {tablename}_timestamp_idx ON {schema}.{tablename} (timestamp);
	CREATE INDEX IF NOT EXISTS {tablename}_leg_idx ON {schema}.{tablename} (leg);
	CREATE INDEX IF NOT EXISTS {tablename}_geom_idx ON {schema}.{tablename} USING GIST(GEOM);
	CREATE UNIQUE INDEX IF NOT EXISTS {tablename}_timestamp_platform_idx ON {schema}.{tablename} (timestamp, platform);
	
	
"""

db_string = f'postgresql://{cfg.DB_USER}:{cfg.DB_PASSWD}@{cfg.DB_HOST}:5432/{cfg.DB_NAME}'
engine = create_engine(db_string)


class DbService(object):
    """This class handles database connections and requests to the DB"""

    def __init__(self):
        self.temp_tables = []

    def _positions_from_db(self, table_name):
        """
        Reads db and returns pandas dataframe with positions
        :param table_name:
        :return:
        """
        with engine.connect() as connection:
            sql = f"SELECT * FROM {cfg.DB_SCHEMA}.{table_name.lower()}"
            return gpd.GeoDataFrame.from_postgis(sql, connection, geom_col='geom')

    def drop_temp_tables(self):
        """Drops temporary tables created during import
         requires active connection, does not open connection here
        """
        with engine.connect() as connection:
            for tmp_table in self.temp_tables:
                connection.execute(text(f"DROP TABLE IF EXISTS {cfg.DB_SCHEMA}.{tmp_table}"))
                connection.commit()

    def create_positions_table(self):
        """Creates new table for positions. If it already exixts does nothing.

        Table will be named as configured in cfg.DB_PPOSITIONS_TABLE. returns table name
        """
        return self._create_positions_table(cfg.DB_POSITIONS_TABLE, temporary=False)

    def _create_positions_table(self, table_name, temporary=True):
        """Creates a new table for holding positions.

        Table will have the columns:
        - id UUID,
            - leg VARCHAR,
        - platform VARCHAR NOT NULL,
            - timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL,
                - geom GEOMETRY(POINT, *conf.SRID*)
        """
        table_name = table_name.lower()  # postgres doesn't do upper case table names
        the_query = SQL_CREATE_POSITIONS_TABLE.format(schema=cfg.DB_SCHEMA, tablename=table_name, srid=cfg.SRID)
        with engine.connect() as connection:
            result = connection.execute(text(the_query))
            connection.commit()
        if temporary:
            self.temp_tables.append(table_name)
        return table_name

    def create_temp_table(self, geo_df, import_label, table_name=None):
        """Creates a temporary DB table from the passed geodataframe.

        The created table can be merged into the real positions table later using merge_temp_table.
        Temporary tables should be deleted using delete_temp_table
        :param import_label: import_label to attach to all data rows. If string 'PROVIDED' is passed, expects to
        find a columm 'import_label' in passed data
        """

        now = (datetime.datetime.utcnow() - datetime.datetime.fromtimestamp(0))
        now = round(now.total_seconds())

        if not table_name:
            if import_label == 'PROVIDED':
                table_name = f'TEMP_POSITIONS_{geo_df["platform"].iloc[0]}_{now}'.lower()
            else:
                table_name = f'TEMP_POSITIONS_{import_label}_{now}'.lower()

        self._create_positions_table(table_name, temporary=True)

        # hide SettingWithCopyWarning, it's OK in this case
        pd.set_option('mode.chained_assignment', None)
        nms = uuid.UUID('adb291da-187c-11e8-a6f8-f45c899fb76f')
        geo_df['timestamp'] = geo_df.index
        if import_label == 'PROVIDED':  # expect to find label for each row
            assert 'import_label' in geo_df.columns, "Expected to find column 'import_label' in passed data!"
        else:  # set import label for all rows to passed value
            geo_df['import_label'] = import_label

        to_keep = ['geom', 'platform', 'leg', 'osis_leg_id', 'osis_platform_id', 'import_label', 'timestamp']
        to_drop = [col for col in geo_df.columns if col not in to_keep]
        geo_df.drop(to_drop, axis=1, inplace=True)
        geo_df['id'] = geo_df.apply(lambda row: uuid.uuid5(nms, f"{row['platform']}{row['timestamp']}"), axis=1)

        # make sure all '_id' columns can be cast to numeric (remove string 'NA')
        for id_col in [col for col in geo_df.columns if col.endswith('_id')]:
            geo_df.loc[geo_df[id_col] == 'NA', id_col] = None
            try:
                pd.to_numeric(geo_df[id_col], errors='raise') 
            except (ValueError, TypeError):
                raise AssertionError(f"Found non-numeric value in column {id_col}!")

        with engine.connect() as connection:
            geo_df.to_sql(table_name, connection, schema=cfg.DB_SCHEMA, if_exists='append', index=False,
                          dtype={
                              'geom': Geometry('POINT', srid=cfg.SRID)}
                          )
        # re-enable SettingWithCopyWarning from this point onwards
        pd.set_option('mode.chained_assignment', 'warn')
        return table_name

    def merge_temp_table(self, temp_table_name):
        """
        Merges the temporary table into the the real positions table
        :param temp_table_name:
        :return:
        """
        self.create_positions_table()  # does nothing if table already exists
        self._merge_positions_tables(cfg.DB_POSITIONS_TABLE, temp_table_name)

    def _merge_positions_tables(self, table_a, table_b):
        """Merges tables a and b by either insert or update of b to a """

        # take note that quality_flag is overwritten on import to address issue 4
        # default of quality_flag = 0 (not checked)
        the_query = f"""
        INSERT INTO {cfg.DB_SCHEMA}.{table_a}
		    (id, timestamp, geom, platform, leg, osis_leg_id, osis_platform_id, import_label)
		
                SELECT new_data.id, new_data.timestamp, new_data.geom, new_data.platform, new_data.leg, new_data.osis_leg_id, new_data.osis_platform_id, new_data.import_label
                FROM {cfg.DB_SCHEMA}.{table_b} AS new_data
	
            ON CONFLICT (timestamp, platform) 
                DO 
                UPDATE SET 
                    geom = EXCLUDED.geom, 
                    import_label = EXCLUDED.import_label, 
                    quality_flag=EXCLUDED.quality_flag,
                    osis_leg_id=EXCLUDED.osis_leg_id,
                    osis_platform_id=EXCLUDED.osis_platform_id
        ;
        """
        with engine.connect() as connection:
            connection.execute(text(the_query))
            connection.commit()

    def get_grant_statemens(self):
        """Returns grant statements needed to give necessary permission to configured user
        Convenience method makes setting up db easier.
        """
        # TODO auto-config for databases
        # - configure db admin user
        # - create postgis_import db user
        # - grant permissions

        ret = f"""
            GRANT USAGE ON SCHEMA metadata TO {cfg.DB_USER};
            GRANT SELECT ON TABLE metadata.leg TO {cfg.DB_USER};
            GRANT SELECT ON TABLE metadata.platform TO {cfg.DB_USER};
            GRANT SELECT ON TABLE metadata.port_call TO {cfg.DB_USER};
            GRANT SELECT ON TABLE metadata.web_info_expedition TO {cfg.DB_USER};
            GRANT SELECT ON TABLE metadata.event TO {cfg.DB_USER};
            GRANT SELECT ON TABLE metadata.location TO {cfg.DB_USER};
            GRANT SELECT ON TABLE metadata.gear TO {cfg.DB_USER};
            GRANT USAGE ON SCHEMA {cfg.DB_SCHEMA} TO {cfg.DB_USER};
            GRANT ALL PRIVILEGES ON SCHEMA {cfg.DB_SCHEMA} TO {cfg.DB_USER};
            GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA {cfg.DB_SCHEMA} TO {cfg.DB_USER};
            """

        return ret