Repository URL to install this package:
Version:
5.2.8.4 ▾
|
# Copyright 2020 TrilioData Inc.
# All Rights Reserved.
"""Implementation of SQLAlchemy backend."""
import collections
import copy
import datetime
import functools
import inspect
import sys
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import update_match
from oslo_db.sqlalchemy import utils as sqlalchemyutils
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import importutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
import six
from six.moves import range
import sqlalchemy as sa
from sqlalchemy import and_
from sqlalchemy import Boolean
from sqlalchemy.exc import NoSuchTableError
from sqlalchemy.ext.compiler import compiles
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import or_
from sqlalchemy.orm import aliased
from sqlalchemy.orm import contains_eager
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import noload
from sqlalchemy.orm import undefer
from sqlalchemy.schema import Table
from sqlalchemy import sql
from sqlalchemy.sql.expression import asc
from sqlalchemy.sql.expression import cast
from sqlalchemy.sql.expression import desc
from sqlalchemy.sql.expression import UpdateBase
from sqlalchemy.sql import false
from sqlalchemy.sql import func
from sqlalchemy.sql import null
from sqlalchemy.sql import true
#from Contegoserver import block_device
import contego.conf
import contego.context
from contego.db.sqlalchemy import models
from contego import exception
from contego.i18n import _
from contego import safe_utils
profiler_sqlalchemy = importutils.try_import('osprofiler.sqlalchemy')
CONF = contego.conf.CONF
LOG = logging.getLogger(__name__)
main_context_manager = enginefacade.transaction_context()
def get_backend():
"""The backend is this module itself."""
return sys.modules[__name__]
def _get_db_conf(conf_group, connection=None):
kw = dict(list(conf_group.items()))
if connection is not None:
kw['connection'] = connection
return kw
def configure(conf):
main_context_manager.configure(**_get_db_conf(conf.dmapi_database))
if profiler_sqlalchemy and CONF.profiler.enabled \
and CONF.profiler.trace_sqlalchemy:
main_context_manager.append_on_engine_create(
lambda eng: profiler_sqlalchemy.add_tracing(sa, eng, "db"))
def model_query(context, model,
args=None,
read_deleted=None,
project_only=False):
"""Query helper that accounts for context's `read_deleted` field.
:param context: ContegoserverContext of the query.
:param model: Model to query. Must be a subclass of ModelBase.
:param args: Arguments to query. If None - model is used.
:param read_deleted: If not None, overrides context's read_deleted field.
Permitted values are 'no', which does not return
deleted values; 'only', which only returns deleted
values; and 'yes', which does not filter deleted
values.
:param project_only: If set and context is user-type, then restrict
query to match the context's project_id. If set to
'allow_none', restriction includes project_id = None.
"""
if read_deleted is None:
read_deleted = context.read_deleted
query_kwargs = {}
if 'no' == read_deleted:
query_kwargs['deleted'] = False
elif 'only' == read_deleted:
query_kwargs['deleted'] = True
elif 'yes' == read_deleted:
pass
else:
raise ValueError(_("Unrecognized read_deleted value '%s'")
% read_deleted)
query = sqlalchemyutils.model_query(
model, context.session, args, **query_kwargs)
# We can't use oslo.db model_query's project_id here, as it doesn't allow
# us to return both our projects and unowned projects.
if contego.context.is_user_context(context) and project_only:
if project_only == 'allow_none':
query = query.\
filter(or_(model.project_id == context.project_id,
model.project_id == null()))
else:
query = query.filter_by(project_id=context.project_id)
return query
def _build_instance_get(context, columns_to_join=None):
#
query = model_query(context, models.Instance, project_only=True).\
options(joinedload('security_groups.rules')).\
options(joinedload('info_cache'))
if columns_to_join is None:
columns_to_join = ['metadata', 'system_metadata']
for column in columns_to_join:
if column in ['info_cache', 'security_groups']:
# Already always joined above
continue
if 'extra.' in column:
query = query.options(undefer(column))
else:
query = query.options(joinedload(column))
# NOTE(alaski) Stop lazy loading of columns not needed.
for col in ['metadata', 'system_metadata']:
if col not in columns_to_join:
query = query.options(noload(col))
return query
def pick_context_manager_reader_allow_async(f):
"""Decorator to use a reader.allow_async db context manager.
The db context manager will be picked from the RequestContext.
Wrapped function must have a RequestContext in the arguments.
"""
@functools.wraps(f)
def wrapped(context, *args, **kwargs):
ctxt_mgr = get_context_manager(context)
with ctxt_mgr.reader.allow_async.using(context):
return f(context, *args, **kwargs)
return wrapped
def get_context_manager(context):
"""Get a database context manager object.
:param context: The request context that can contain a context manager
"""
return _context_manager_from_context(context) or main_context_manager
def _context_manager_from_context(context):
if context:
try:
return context.db_connection
except AttributeError:
pass
def require_context(f):
"""Decorator to require *any* user or admin context.
This does no authorization for user or project access matching, see
:py:func:`Contegoserver.context.authorize_project_context` and
:py:func:`Contegoserver.context.authorize_user_context`.
The first argument to the wrapped function must be the context.
"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
contego.context.require_context(args[0])
return f(*args, **kwargs)
return wrapper
@require_context
@pick_context_manager_reader_allow_async
def instance_get_by_uuid(context, uuid, columns_to_join=None):
return _instance_get_by_uuid(context, uuid,
columns_to_join=columns_to_join)
def _instance_get_by_uuid(context, uuid, columns_to_join=None):
result = _build_instance_get(context, columns_to_join=columns_to_join).\
filter_by(uuid=uuid).\
first()
if not result:
raise exception.InstanceNotFound(instance_id=uuid)
return result
def create_context_manager(connection=None):
"""Create a database context manager object.
: param connection: The database connection string
"""
ctxt_mgr = enginefacade.transaction_context()
ctxt_mgr.configure(**_get_db_conf(CONF.dmapi_database, connection=connection))
return ctxt_mgr
def select_db_reader_mode(f):
"""Decorator to select synchronous or asynchronous reader mode.
The kwarg argument 'use_slave' defines reader mode. Asynchronous reader
will be used if 'use_slave' is True and synchronous reader otherwise.
If 'use_slave' is not specified default value 'False' will be used.
Wrapped function must have a context in the arguments.
"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
wrapped_func = safe_utils.get_wrapped_function(f)
keyed_args = inspect.getcallargs(wrapped_func, *args, **kwargs)
context = keyed_args['context']
use_slave = keyed_args.get('use_slave', False)
if use_slave:
reader_mode = get_context_manager(context).async_
else:
reader_mode = get_context_manager(context).reader
with reader_mode.using(context):
return f(*args, **kwargs)
return wrapper
def pick_context_manager_writer(f):
"""Decorator to use a writer db context manager.
The db context manager will be picked from the RequestContext.
Wrapped function must have a RequestContext in the arguments.
"""
@functools.wraps(f)
def wrapped(context, *args, **kwargs):
ctxt_mgr = get_context_manager(context)
with ctxt_mgr.writer.using(context):
return f(context, *args, **kwargs)
return wrapped
def pick_context_manager_reader(f):
"""Decorator to use a reader db context manager.
The db context manager will be picked from the RequestContext.
Wrapped function must have a RequestContext in the arguments.
"""
@functools.wraps(f)
def wrapped(context, *args, **kwargs):
ctxt_mgr = get_context_manager(context)
with ctxt_mgr.reader.using(context):
return f(context, *args, **kwargs)
return wrapped
def pick_context_manager_reader_allow_async(f):
"""Decorator to use a reader.allow_async db context manager.
The db context manager will be picked from the RequestContext.
Wrapped function must have a RequestContext in the arguments.
"""
@functools.wraps(f)
def wrapped(context, *args, **kwargs):
ctxt_mgr = get_context_manager(context)
with ctxt_mgr.reader.allow_async.using(context):
return f(context, *args, **kwargs)
return wrapped
###################
@pick_context_manager_writer
def service_destroy(context, service_id):
service = service_get(context, service_id)
model_query(context, models.Service).\
filter_by(id=service_id).\
soft_delete(synchronize_session=False)
if service.binary == 'tvault-contego':
# TODO(sbauza): Remove the service_id filter in a later release
# once we are sure that all compute nodes report the host field
model_query(context, models.ComputeNode).\
filter(or_(models.ComputeNode.service_id == service_id,
models.ComputeNode.host == service['host'])).\
soft_delete(synchronize_session=False)
@pick_context_manager_reader
def service_get(context, service_id):
query = model_query(context, models.Service).filter_by(id=service_id)
result = query.first()
if not result:
raise exception.ServiceNotFound(service_id=service_id)
return result
@pick_context_manager_reader
def service_get_by_uuid(context, service_uuid):
query = model_query(context, models.Service).filter_by(uuid=service_uuid)
result = query.first()
if not result:
raise exception.ServiceNotFound(service_id=service_uuid)
return result
@pick_context_manager_reader_allow_async
def service_get_minimum_version(context, binaries):
min_versions = context.session.query(
models.Service.binary,
func.min(models.Service.version)).\
filter(models.Service.binary.in_(binaries)).\
filter(models.Service.deleted == 0).\
filter(models.Service.forced_down == false()).\
group_by(models.Service.binary)
return dict(min_versions)
@pick_context_manager_reader
def service_get_all(context, disabled=None):
query = model_query(context, models.Service)
if disabled is not None:
query = query.filter_by(disabled=disabled)
return query.all()
@pick_context_manager_reader
def service_get_all_by_topic(context, topic):
return model_query(context, models.Service, read_deleted="no").\
filter_by(disabled=False).\
filter_by(topic=topic).\
all()
@pick_context_manager_reader
def service_get_by_host_and_topic(context, host, topic):
return model_query(context, models.Service, read_deleted="no").\
filter_by(disabled=False).\
filter_by(host=host).\
filter_by(topic=topic).\
first()
@pick_context_manager_reader
def service_get_all_by_binary(context, binary, include_disabled=False):
query = model_query(context, models.Service).filter_by(binary=binary)
if not include_disabled:
query = query.filter_by(disabled=False)
return query.all()
@pick_context_manager_reader
def service_get_all_computes_by_hv_type(context, hv_type,
include_disabled=False):
query = model_query(context, models.Service, read_deleted="no").\
filter_by(binary='tvault-contego')
if not include_disabled:
query = query.filter_by(disabled=False)
query = query.join(models.ComputeNode,
models.Service.host == models.ComputeNode.host).\
filter(models.ComputeNode.hypervisor_type == hv_type).\
distinct('host')
return query.all()
@pick_context_manager_reader
def service_get_by_host_and_binary(context, host, binary):
result = model_query(context, models.Service, read_deleted="no").\
filter_by(host=host).\
filter_by(binary=binary).\
first()
if not result:
raise exception.HostBinaryNotFound(host=host, binary=binary)
return result
@pick_context_manager_reader
def service_get_all_by_host(context, host):
return model_query(context, models.Service, read_deleted="no").\
filter_by(host=host).\
all()
@pick_context_manager_reader_allow_async
def service_get_by_compute_host(context, host):
result = model_query(context, models.Service, read_deleted="no").\
filter_by(host=host).\
filter_by(binary='tvault-contego').\
first()
if not result:
raise exception.ComputeHostNotFound(host=host)
return result
@pick_context_manager_writer
def service_create(context, values):
service_ref = models.Service()
service_ref.update(values)
# We only auto-disable tvault-contego services since those are the only
# ones that can be enabled using the os-services REST API and they are
# the only ones where being disabled means anything. It does
# not make sense to be able to disable non-compute services like
# nova-scheduler or nova-osapi_compute since that does nothing.
if not CONF.enable_new_services and values.get('binary') == 'tvault-contego':
msg = _("New compute service disabled due to config option.")
service_ref.disabled = True
service_ref.disabled_reason = msg
try:
service_ref.save(context.session)
except db_exc.DBDuplicateEntry as e:
if 'binary' in e.columns:
raise exception.ServiceBinaryExists(host=values.get('host'),
binary=values.get('binary'))
raise exception.ServiceTopicExists(host=values.get('host'),
topic=values.get('topic'))
return service_ref
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
@pick_context_manager_writer
def service_update(context, service_id, values):
service_ref = service_get(context, service_id)
# Only servicegroup.drivers.db.DbDriver._report_state() updates
# 'report_count', so if that value changes then store the timestamp
# as the last time we got a state report.
if 'report_count' in values:
if values['report_count'] > service_ref.report_count:
service_ref.last_seen_up = timeutils.utcnow()
service_ref.update(values)
return service_ref
###################