Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
lib-py-b2b / fedex_ship_mgr_db.py
Size: Mime:
from py_aws_util import get_config
from os import environ
import logging
import pymysql
from typing import List, Dict
from aws_xray_sdk.core import xray_recorder

logger = logging.getLogger(__name__)


class FSMDB:
    @staticmethod
    def __get_config(config_path, environment: str=None):
        config = get_config(path=config_path,
                            environment=environment,
                            recursive=True,
                            with_decryption=True,
                            asDict=True)
        return config

    @staticmethod
    @xray_recorder.capture()
    def get_connection(config_path: str, environment: str=None):
        """
        Returns a configured cx_Oracle connection object

        :param config_path: the parameter store hierarchical path for the database
        :param environment: optionally accepts an execution environment for the configuration lookup
        :return: a configured cx_Oracle connection object
        """
        config = FSMDB.__get_config(config_path=config_path, environment=environment)
        logger.debug(config)
        conn = pymysql.connect(config['host'], user=config['username'], passwd=config['password'],
                               db=config['db'], connect_timeout=5, cursorclass=pymysql.cursors.DictCursor)
        info = {'mysql': {'protocolversion': conn.protocol_version, 'serverversion': conn.get_server_info()}}
        logger.debug(info)
        return conn


    # MARK: -- Convenience methods
    @staticmethod
    @xray_recorder.capture()
    def fetch(sql: str, parameters: Dict=None, connection=None, config_path=None, limit=None, environment: str=None, one: bool = False):
        """
        Convenience method for selecting data from the database

        :param sql: the select statement to be executed
        :param parameters: optional - any named parameters required for the statement as a dictionary
        :param connection: optional - an existing connection
        :param config_path: optional - the config path to use (either the config_path or connection is required)
        :param limit: optional - limit the number of rows retrieved
        :param environment: optional - provides mechanism to specify the deployment environment for config params
        :param one: options - if true fetch_by_container_id only the first row
        :return: List[Dict] - rows of data
        """
        if connection:
            conn = connection
        elif config_path:
            conn = FSMDB.get_connection(config_path, environment=environment)
        else:
            raise Exception("Either a connection or a config_path is required")
        try:
            with conn.cursor() as cursor:
                _params = {} if parameters is None else parameters
                cursor.execute(sql, _params)
                if one:
                    result = cursor.fetchone()
                else:
                    result = cursor.fetchall()
            return result
        except Exception as e:
            raise e
        finally:
            conn.close()

    @staticmethod
    @xray_recorder.capture()
    def execute(sql: str, parameters: Dict=None, connection=None, config_path=None, environment: str=None):
        """
        Convenience method for updating or inserting data to the database

        :param sql: the statement to be executed
        :param parameters: optional - any named parameters required for the statement as a dictionary
        :param connection: optional - an existing connection
        :param config_path: optional - the config path to use (either the config_path or connection is required)
        :param environment: optional - the environment we are executing within (development, production, etc). By
                                        default this will be pulled from env variables.or ultimately default to
                                        'development'. This decides where in the Systems Manager Parameter tree
                                        to pull the configuration values.
        :return: number of rows effected
        """
        if connection:
            conn = connection
        elif config_path:
            conn = FSMDB.get_connection(config_path=config_path, environment=environment)
        else:
            raise Exception("Either a connection or a config_path is required")
        try:
            with conn.cursor() as cursor:
                _params = {} if parameters is None else parameters
                cursor.execute(sql, _params)
                conn.commit()
                return {'row_count': cursor.rowcount}
        except Exception as e:
            raise e
        finally:
            conn.close()

    @staticmethod
    @xray_recorder.capture()
    def execute_many(sql: str, parameters: List[Dict], connection=None, config_path=None, environment: str=None):
        """
        Convenience method for updating or inserting a list of data dictionaries to the database using the same statement

        :param sql: the statement to be executed
        :param parameters: a list of dictionaries to be executed using the provided query
        :param connection: optional - an existing connection
        :param config_path: optional - the config path to use (either the config_path or connection is required)
        :return: number of rows effected
        """
        if connection:
            conn = connection
        elif config_path:
            conn = FSMDB.get_connection(config_path, environment=environment)
        else:
            raise Exception("Either a connection or a config_path is required")
        try:
            with conn.cursor() as cursor:
                _params = {} if parameters is None else parameters
                cursor.executemany(sql, _params)
                conn.commit()
                return {'row_count': cursor.rowcount}
        except Exception as e:
            raise e
        finally:
            conn.close()