Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
python3-dmapi / usr / lib / python3 / dist-packages / dmapi / storage / impl_sqlalchemy.py
Size: Mime:
# Copyright 2018 TrilioData Inc.
# All Rights Reserved.

"""SQLAlchemy storage backend."""
from __future__ import absolute_import
import copy
import datetime
import os.path

from alembic import (
    command,
    config,
    migration
)
from oslo_db.sqlalchemy import (
    session as db_session,
    utils as oslo_sql_utils
)
from oslo_log import log
from oslo_utils import (
    importutils,
    timeutils
)
import sqlalchemy
from sqlalchemy import (
    asc,
    desc,
    func
)
from sqlalchemy.engine import url as sqlalchemy_url
from sqlalchemy.orm import exc

import dmapi
from dmapi import storage
from dmapi.storage import (
    base,
    models as alarm_api_models,
)
from dmapi.storage.sqlalchemy import (
    models,
    utils as sql_utils
)


osprofiler_sqlalchemy = importutils.try_import('osprofiler.sqlalchemy')
LOG = log.getLogger(__name__)


def apply_filters(query, model, **filters):
    filter_dict = {}

    for key, value in filters.items():
        column_attr = getattr(model, key)

        if isinstance(value, dict):
            if 'in' in value:
                query = query.filter(column_attr.in_(value['in']))
            elif 'nin' in value:
                query = query.filter(~column_attr.in_(value['nin']))
            elif 'ne' in value:
                query = query.filter(column_attr != value['ne'])
            elif 'gt' in value:
                query = query.filter(column_attr > value['gt'])
            elif 'ge' in value:
                query = query.filter(column_attr >= value['ge'])
            elif 'lt' in value:
                query = query.filter(column_attr < value['lt'])
            elif 'le' in value:
                query = query.filter(column_attr <= value['le'])
            elif 'eq' in value:
                query = query.filter(column_attr == value['eq'])
            elif 'has' in value:
                like_pattern = '%{0}%'.format(value['has'])
                query = query.filter(column_attr.like(like_pattern))
        else:
            filter_dict[key] = value

    if filter_dict:
        query = query.filter_by(**filter_dict)

    return query


class Connection(base.Connection):
    """Put the data into a SQLAlchemy database. """

    def __init__(self, conf, url):
        # Set max_retries to 0, since oslo.db in certain cases may attempt
        # to retry making the db connection retried max_retries ^ 2 times
        # in failure case and db reconnection has already been implemented
        # in storage.__init__.get_connection_from_config function
        options = dict(conf.database.items())
        options['max_retries'] = 0
        # oslo.db doesn't support options defined by Aodh
        for opt in storage.OPTS:
            options.pop(opt.name, None)
        self._engine_facade = db_session.EngineFacade(self.dress_url(url),
                                                      **options)

        if osprofiler_sqlalchemy:
            osprofiler_sqlalchemy.add_tracing(sqlalchemy,
                                              self._engine_facade.get_engine(),
                                              'db')
        self.conf = conf

    @staticmethod
    def dress_url(url):
        # If no explicit driver has been set, we default to pymysql
        if url.startswith("mysql://"):
            url = sqlalchemy_url.make_url(url)
            url.drivername = "mysql+pymysql"
            return str(url)
        return url

    def disconnect(self):
        self._engine_facade.get_engine().dispose()

    def _get_alembic_config(self):
        cfg = config.Config(
            "%s/sqlalchemy/alembic/alembic.ini" % os.path.dirname(__file__))
        cfg.set_main_option('sqlalchemy.url',
                            self.conf.database.connection.replace("%", "%%"))
        return cfg

    def upgrade(self, nocreate=False):
        cfg = self._get_alembic_config()
        cfg.conf = self.conf
        if nocreate:
            command.upgrade(cfg, "head")
        else:
            engine = self._engine_facade.get_engine()
            ctxt = migration.MigrationContext.configure(engine.connect())
            current_version = ctxt.get_current_revision()
            if current_version is None:
                models.Base.metadata.create_all(engine, checkfirst=False)
                command.stamp(cfg, "head")
            else:
                command.upgrade(cfg, "head")

    def clear(self):
        engine = self._engine_facade.get_engine()
        for table in reversed(models.Base.metadata.sorted_tables):
            engine.execute(table.delete())
        engine.dispose()

    def _retrieve_data(self, filter_expr, orderby, limit, table):
        if limit == 0:
            return []

        session = self._engine_facade.get_session()
        engine = self._engine_facade.get_engine()
        query = session.query(table)
        transformer = sql_utils.QueryTransformer(table, query,
                                                 dialect=engine.dialect.name)
        if filter_expr is not None:
            transformer.apply_filter(filter_expr)

        transformer.apply_options(orderby,
                                  limit)

        retrieve = {models.Alarm: self._retrieve_alarms,
                    models.AlarmChange: self._retrieve_alarm_history}
        return retrieve[table](transformer.get_query())

    def conditional_update(self, model, values, expected_values, filters=None):
        """Compare-and-swap conditional update SQLAlchemy implementation."""
        filters = filters or {}
        filters.update(expected_values)

        session = self._engine_facade.get_session()
        query = session.query(model)
        if filters:
            query = query.filter_by(**filters)

        update_args = {'synchronize_session': False}

        result = query.update(values, **update_args)
        return 0 != result