Repository URL to install this package:
Version:
4.2.61 ▾
|
# Copyright 2018 TrilioData Inc.
# All Rights Reserved.
"""SQLAlchemy storage backend."""
from __future__ import absolute_import
import copy
import datetime
import os.path
from alembic import (
command,
config,
migration
)
from oslo_db.sqlalchemy import (
session as db_session,
utils as oslo_sql_utils
)
from oslo_log import log
from oslo_utils import (
importutils,
timeutils
)
import sqlalchemy
from sqlalchemy import (
asc,
desc,
func
)
from sqlalchemy.engine import url as sqlalchemy_url
from sqlalchemy.orm import exc
import dmapi
from dmapi import storage
from dmapi.storage import (
base,
models as alarm_api_models,
)
from dmapi.storage.sqlalchemy import (
models,
utils as sql_utils
)
osprofiler_sqlalchemy = importutils.try_import('osprofiler.sqlalchemy')
LOG = log.getLogger(__name__)
def apply_filters(query, model, **filters):
filter_dict = {}
for key, value in filters.items():
column_attr = getattr(model, key)
if isinstance(value, dict):
if 'in' in value:
query = query.filter(column_attr.in_(value['in']))
elif 'nin' in value:
query = query.filter(~column_attr.in_(value['nin']))
elif 'ne' in value:
query = query.filter(column_attr != value['ne'])
elif 'gt' in value:
query = query.filter(column_attr > value['gt'])
elif 'ge' in value:
query = query.filter(column_attr >= value['ge'])
elif 'lt' in value:
query = query.filter(column_attr < value['lt'])
elif 'le' in value:
query = query.filter(column_attr <= value['le'])
elif 'eq' in value:
query = query.filter(column_attr == value['eq'])
elif 'has' in value:
like_pattern = '%{0}%'.format(value['has'])
query = query.filter(column_attr.like(like_pattern))
else:
filter_dict[key] = value
if filter_dict:
query = query.filter_by(**filter_dict)
return query
class Connection(base.Connection):
"""Put the data into a SQLAlchemy database. """
def __init__(self, conf, url):
# Set max_retries to 0, since oslo.db in certain cases may attempt
# to retry making the db connection retried max_retries ^ 2 times
# in failure case and db reconnection has already been implemented
# in storage.__init__.get_connection_from_config function
options = dict(conf.database.items())
options['max_retries'] = 0
# oslo.db doesn't support options defined by Aodh
for opt in storage.OPTS:
options.pop(opt.name, None)
self._engine_facade = db_session.EngineFacade(self.dress_url(url),
**options)
if osprofiler_sqlalchemy:
osprofiler_sqlalchemy.add_tracing(sqlalchemy,
self._engine_facade.get_engine(),
'db')
self.conf = conf
@staticmethod
def dress_url(url):
# If no explicit driver has been set, we default to pymysql
if url.startswith("mysql://"):
url = sqlalchemy_url.make_url(url)
url.drivername = "mysql+pymysql"
return str(url)
return url
def disconnect(self):
self._engine_facade.get_engine().dispose()
def _get_alembic_config(self):
cfg = config.Config(
"%s/sqlalchemy/alembic/alembic.ini" % os.path.dirname(__file__))
cfg.set_main_option('sqlalchemy.url',
self.conf.database.connection.replace("%", "%%"))
return cfg
def upgrade(self, nocreate=False):
cfg = self._get_alembic_config()
cfg.conf = self.conf
if nocreate:
command.upgrade(cfg, "head")
else:
engine = self._engine_facade.get_engine()
ctxt = migration.MigrationContext.configure(engine.connect())
current_version = ctxt.get_current_revision()
if current_version is None:
models.Base.metadata.create_all(engine, checkfirst=False)
command.stamp(cfg, "head")
else:
command.upgrade(cfg, "head")
def clear(self):
engine = self._engine_facade.get_engine()
for table in reversed(models.Base.metadata.sorted_tables):
engine.execute(table.delete())
engine.dispose()
def _retrieve_data(self, filter_expr, orderby, limit, table):
if limit == 0:
return []
session = self._engine_facade.get_session()
engine = self._engine_facade.get_engine()
query = session.query(table)
transformer = sql_utils.QueryTransformer(table, query,
dialect=engine.dialect.name)
if filter_expr is not None:
transformer.apply_filter(filter_expr)
transformer.apply_options(orderby,
limit)
retrieve = {models.Alarm: self._retrieve_alarms,
models.AlarmChange: self._retrieve_alarm_history}
return retrieve[table](transformer.get_query())
def conditional_update(self, model, values, expected_values, filters=None):
"""Compare-and-swap conditional update SQLAlchemy implementation."""
filters = filters or {}
filters.update(expected_values)
session = self._engine_facade.get_session()
query = session.query(model)
if filters:
query = query.filter_by(**filters)
update_args = {'synchronize_session': False}
result = query.update(values, **update_args)
return 0 != result