Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# Copyright 2020 TrilioData Inc.
# All Rights Reserved.

"""Implementation of SQLAlchemy backend."""

import collections
import copy
import datetime
import functools
import inspect
import sys

from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import update_match
from oslo_db.sqlalchemy import utils as sqlalchemyutils
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import importutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
import six
from six.moves import range
import sqlalchemy as sa
from sqlalchemy import and_
from sqlalchemy import Boolean
from sqlalchemy.exc import NoSuchTableError
from sqlalchemy.ext.compiler import compiles
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import or_
from sqlalchemy.orm import aliased
from sqlalchemy.orm import contains_eager
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import noload
from sqlalchemy.orm import undefer
from sqlalchemy.schema import Table
from sqlalchemy import sql
from sqlalchemy.sql.expression import asc
from sqlalchemy.sql.expression import cast
from sqlalchemy.sql.expression import desc
from sqlalchemy.sql.expression import UpdateBase
from sqlalchemy.sql import false
from sqlalchemy.sql import func
from sqlalchemy.sql import null
from sqlalchemy.sql import true

#from Contegoserver import block_device
import contego.conf
import contego.context
from contego.db.sqlalchemy import models
from contego import exception
from contego.i18n import _
from contego import safe_utils

profiler_sqlalchemy = importutils.try_import('osprofiler.sqlalchemy')

CONF = contego.conf.CONF


LOG = logging.getLogger(__name__)

main_context_manager = enginefacade.transaction_context()


def get_backend():
    """The backend is this module itself."""
    return sys.modules[__name__]

def _get_db_conf(conf_group, connection=None):
    kw = dict(list(conf_group.items()))
    if connection is not None:
        kw['connection'] = connection
    return kw

def configure(conf):
    main_context_manager.configure(**_get_db_conf(conf.dmapi_database))
    if profiler_sqlalchemy and CONF.profiler.enabled \
           and CONF.profiler.trace_sqlalchemy:
        main_context_manager.append_on_engine_create(
            lambda eng: profiler_sqlalchemy.add_tracing(sa, eng, "db"))


def model_query(context, model,
                args=None,
                read_deleted=None,
                project_only=False):
    """Query helper that accounts for context's `read_deleted` field.

    :param context:     ContegoserverContext of the query.
    :param model:       Model to query. Must be a subclass of ModelBase.
    :param args:        Arguments to query. If None - model is used.
    :param read_deleted: If not None, overrides context's read_deleted field.
                        Permitted values are 'no', which does not return
                        deleted values; 'only', which only returns deleted
                        values; and 'yes', which does not filter deleted
                        values.
    :param project_only: If set and context is user-type, then restrict
                        query to match the context's project_id. If set to
                        'allow_none', restriction includes project_id = None.
    """

    if read_deleted is None:
        read_deleted = context.read_deleted

    query_kwargs = {}
    if 'no' == read_deleted:
        query_kwargs['deleted'] = False
    elif 'only' == read_deleted:
        query_kwargs['deleted'] = True
    elif 'yes' == read_deleted:
        pass
    else:
        raise ValueError(_("Unrecognized read_deleted value '%s'")
                         % read_deleted)

    query = sqlalchemyutils.model_query(
        model, context.session, args, **query_kwargs)

    # We can't use oslo.db model_query's project_id here, as it doesn't allow
    # us to return both our projects and unowned projects.
    if contego.context.is_user_context(context) and project_only:
        if project_only == 'allow_none':
            query = query.\
                filter(or_(model.project_id == context.project_id,
                           model.project_id == null()))
        else:
            query = query.filter_by(project_id=context.project_id)

    return query


def _build_instance_get(context, columns_to_join=None):
    #
    query = model_query(context, models.Instance, project_only=True).\
            options(joinedload('security_groups.rules')).\
            options(joinedload('info_cache'))
    if columns_to_join is None:
        columns_to_join = ['metadata', 'system_metadata']
    for column in columns_to_join:
        if column in ['info_cache', 'security_groups']:
            # Already always joined above
            continue
        if 'extra.' in column:
            query = query.options(undefer(column))
        else:
            query = query.options(joinedload(column))
    # NOTE(alaski) Stop lazy loading of columns not needed.
    for col in ['metadata', 'system_metadata']:
        if col not in columns_to_join:
            query = query.options(noload(col))
    return query

def pick_context_manager_reader_allow_async(f):
    """Decorator to use a reader.allow_async db context manager.

    The db context manager will be picked from the RequestContext.

    Wrapped function must have a RequestContext in the arguments.
    """
    @functools.wraps(f)
    def wrapped(context, *args, **kwargs):
        ctxt_mgr = get_context_manager(context)
        with ctxt_mgr.reader.allow_async.using(context):
            return f(context, *args, **kwargs)
    return wrapped

def get_context_manager(context):
    """Get a database context manager object.

    :param context: The request context that can contain a context manager
    """
    return _context_manager_from_context(context) or main_context_manager
def _context_manager_from_context(context):
    if context:
        try:
            return context.db_connection
        except AttributeError:
            pass

def require_context(f):
    """Decorator to require *any* user or admin context.

    This does no authorization for user or project access matching, see
    :py:func:`Contegoserver.context.authorize_project_context` and
    :py:func:`Contegoserver.context.authorize_user_context`.

    The first argument to the wrapped function must be the context.

    """

    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        contego.context.require_context(args[0])
        return f(*args, **kwargs)
    return wrapper

@require_context
@pick_context_manager_reader_allow_async
def instance_get_by_uuid(context, uuid, columns_to_join=None):
    return _instance_get_by_uuid(context, uuid,
                                 columns_to_join=columns_to_join)


def _instance_get_by_uuid(context, uuid, columns_to_join=None):
    result = _build_instance_get(context, columns_to_join=columns_to_join).\
                filter_by(uuid=uuid).\
                first()

    if not result:
        raise exception.InstanceNotFound(instance_id=uuid)

    return result


def create_context_manager(connection=None):
    """Create a database context manager object.

    : param connection: The database connection string
    """
    ctxt_mgr = enginefacade.transaction_context()
    ctxt_mgr.configure(**_get_db_conf(CONF.dmapi_database, connection=connection))
    return ctxt_mgr


def select_db_reader_mode(f):
    """Decorator to select synchronous or asynchronous reader mode.
    The kwarg argument 'use_slave' defines reader mode. Asynchronous reader
    will be used if 'use_slave' is True and synchronous reader otherwise.
    If 'use_slave' is not specified default value 'False' will be used.
    Wrapped function must have a context in the arguments.
    """

    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        wrapped_func = safe_utils.get_wrapped_function(f)
        keyed_args = inspect.getcallargs(wrapped_func, *args, **kwargs)

        context = keyed_args['context']
        use_slave = keyed_args.get('use_slave', False)

        if use_slave:
            reader_mode = get_context_manager(context).async_
        else:
            reader_mode = get_context_manager(context).reader

        with reader_mode.using(context):
            return f(*args, **kwargs)
    return wrapper


def pick_context_manager_writer(f):
    """Decorator to use a writer db context manager.
    The db context manager will be picked from the RequestContext.
    Wrapped function must have a RequestContext in the arguments.
    """
    @functools.wraps(f)
    def wrapped(context, *args, **kwargs):
        ctxt_mgr = get_context_manager(context)
        with ctxt_mgr.writer.using(context):
            return f(context, *args, **kwargs)
    return wrapped


def pick_context_manager_reader(f):
    """Decorator to use a reader db context manager.
    The db context manager will be picked from the RequestContext.
    Wrapped function must have a RequestContext in the arguments.
    """
    @functools.wraps(f)
    def wrapped(context, *args, **kwargs):
        ctxt_mgr = get_context_manager(context)
        with ctxt_mgr.reader.using(context):
            return f(context, *args, **kwargs)
    return wrapped


def pick_context_manager_reader_allow_async(f):
    """Decorator to use a reader.allow_async db context manager.
    The db context manager will be picked from the RequestContext.
    Wrapped function must have a RequestContext in the arguments.
    """
    @functools.wraps(f)
    def wrapped(context, *args, **kwargs):
        ctxt_mgr = get_context_manager(context)
        with ctxt_mgr.reader.allow_async.using(context):
            return f(context, *args, **kwargs)
    return wrapped


###################


@pick_context_manager_writer
def service_destroy(context, service_id):
    service = service_get(context, service_id)

    model_query(context, models.Service).\
                filter_by(id=service_id).\
                soft_delete(synchronize_session=False)

    if service.binary == 'tvault-contego':
        # TODO(sbauza): Remove the service_id filter in a later release
        # once we are sure that all compute nodes report the host field
        model_query(context, models.ComputeNode).\
                    filter(or_(models.ComputeNode.service_id == service_id,
                               models.ComputeNode.host == service['host'])).\
                    soft_delete(synchronize_session=False)


@pick_context_manager_reader
def service_get(context, service_id):
    query = model_query(context, models.Service).filter_by(id=service_id)

    result = query.first()
    if not result:
        raise exception.ServiceNotFound(service_id=service_id)

    return result


@pick_context_manager_reader
def service_get_by_uuid(context, service_uuid):
    query = model_query(context, models.Service).filter_by(uuid=service_uuid)

    result = query.first()
    if not result:
        raise exception.ServiceNotFound(service_id=service_uuid)

    return result


@pick_context_manager_reader_allow_async
def service_get_minimum_version(context, binaries):
    min_versions = context.session.query(
        models.Service.binary,
        func.min(models.Service.version)).\
                         filter(models.Service.binary.in_(binaries)).\
                         filter(models.Service.deleted == 0).\
                         filter(models.Service.forced_down == false()).\
                         group_by(models.Service.binary)
    return dict(min_versions)


@pick_context_manager_reader
def service_get_all(context, disabled=None):
    query = model_query(context, models.Service)

    if disabled is not None:
        query = query.filter_by(disabled=disabled)

    return query.all()


@pick_context_manager_reader
def service_get_all_by_topic(context, topic):
    return model_query(context, models.Service, read_deleted="no").\
                filter_by(disabled=False).\
                filter_by(topic=topic).\
                all()


@pick_context_manager_reader
def service_get_by_host_and_topic(context, host, topic):
    return model_query(context, models.Service, read_deleted="no").\
                filter_by(disabled=False).\
                filter_by(host=host).\
                filter_by(topic=topic).\
                first()


@pick_context_manager_reader
def service_get_all_by_binary(context, binary, include_disabled=False):
    query = model_query(context, models.Service).filter_by(binary=binary)
    if not include_disabled:
        query = query.filter_by(disabled=False)
    return query.all()


@pick_context_manager_reader
def service_get_all_computes_by_hv_type(context, hv_type,
                                        include_disabled=False):
    query = model_query(context, models.Service, read_deleted="no").\
                    filter_by(binary='tvault-contego')
    if not include_disabled:
        query = query.filter_by(disabled=False)
    query = query.join(models.ComputeNode,
                       models.Service.host == models.ComputeNode.host).\
                  filter(models.ComputeNode.hypervisor_type == hv_type).\
                  distinct('host')
    return query.all()


@pick_context_manager_reader
def service_get_by_host_and_binary(context, host, binary):
    result = model_query(context, models.Service, read_deleted="no").\
                    filter_by(host=host).\
                    filter_by(binary=binary).\
                    first()

    if not result:
        raise exception.HostBinaryNotFound(host=host, binary=binary)

    return result


@pick_context_manager_reader
def service_get_all_by_host(context, host):
    return model_query(context, models.Service, read_deleted="no").\
                filter_by(host=host).\
                all()


@pick_context_manager_reader_allow_async
def service_get_by_compute_host(context, host):
    result = model_query(context, models.Service, read_deleted="no").\
                filter_by(host=host).\
                filter_by(binary='tvault-contego').\
                first()

    if not result:
        raise exception.ComputeHostNotFound(host=host)

    return result


@pick_context_manager_writer
def service_create(context, values):
    service_ref = models.Service()
    service_ref.update(values)
    # We only auto-disable tvault-contego services since those are the only
    # ones that can be enabled using the os-services REST API and they are
    # the only ones where being disabled means anything. It does
    # not make sense to be able to disable non-compute services like
    # nova-scheduler or nova-osapi_compute since that does nothing.
    if not CONF.enable_new_services and values.get('binary') == 'tvault-contego':
        msg = _("New compute service disabled due to config option.")
        service_ref.disabled = True
        service_ref.disabled_reason = msg
    try:
        service_ref.save(context.session)
    except db_exc.DBDuplicateEntry as e:
        if 'binary' in e.columns:
            raise exception.ServiceBinaryExists(host=values.get('host'),
                        binary=values.get('binary'))
        raise exception.ServiceTopicExists(host=values.get('host'),
                        topic=values.get('topic'))
    return service_ref


@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
@pick_context_manager_writer
def service_update(context, service_id, values):
    service_ref = service_get(context, service_id)
    # Only servicegroup.drivers.db.DbDriver._report_state() updates
    # 'report_count', so if that value changes then store the timestamp
    # as the last time we got a state report.
    if 'report_count' in values:
        if values['report_count'] > service_ref.report_count:
            service_ref.last_seen_up = timeutils.utcnow()
    service_ref.update(values)

    return service_ref


###################