Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# Copyright 2018 TrilioData Inc.
# All Rights Reserved.

from oslo_log import log as logging
from oslo_utils import versionutils

from dmapi import exception
from dmapi.i18n import _LW
from dmapi.objects import base
from dmapi.objects import fields
from dmapi import db
from dmapi import objects

LOG = logging.getLogger(__name__)


# NOTE(danms): This is the global service version counter
SERVICE_VERSION = 15

SERVICE_VERSION_HISTORY = (

    {'compute_rpc': '4.13'},
)


# TODO(berrange): Remove ContegoObjectDictCompat
#@base.ContegoObjectRegistry.register
class Service():

    # Version 1.20: Added get_minimum_version_multi()
    VERSION = '1.20'


    _MIN_VERSION_CACHE = {}
    _SERVICE_VERSION_CACHING = False
    SERVICE_VERSION = 2

    def __init__(self, *args, **kwargs):
        # NOTE(danms): We're going against the rules here and overriding
        # init. The reason is that we want to *ensure* that we're always
        # setting the current service version on our objects, overriding
        # whatever else might be set in the database, or otherwise (which
        # is the normal reason not to override init).
        #
        # We also need to do this here so that it's set on the client side
        # all the time, such that create() and save() operations will
        # include the current service version.
        if 'version' in kwargs:
            raise exception.ObjectActionError(
                action='init',
                reason='Version field is immutable')

        #super(Service, self).__init__(*args, **kwargs)
        self.version = SERVICE_VERSION


@base.ContegoObjectRegistry.register
class Service(base.ContegoPersistentObject, base.ContegoObject,
               base.ContegoObjectDictCompat):
    VERSION = '2.0'

    fields = {
        'id': fields.IntegerField(read_only=True),
        'uuid': fields.UUIDField(),
        'host': fields.StringField(nullable=True),
        'binary': fields.StringField(nullable=True),
        'topic': fields.StringField(nullable=True),
        'report_count': fields.IntegerField(),
        'disabled': fields.BooleanField(),
        'disabled_reason': fields.StringField(nullable=True),
        'availability_zone': fields.StringField(nullable=True),
        'compute_node': fields.ObjectField('ComputeNode'),
        'last_seen_up': fields.DateTimeField(nullable=True),
        'forced_down': fields.BooleanField(),
        'version': fields.StringField(),
    }

    _MIN_VERSION_CACHE = {}
    _SERVICE_VERSION_CACHING = False

    def __init__(self, *args, **kwargs):
        if 'version' in kwargs:
            raise exception.ObjectActionError(
                action='init',
                reason='Version field is immutable')

        super(Service, self).__init__(*args, **kwargs)
        self.version = SERVICE_VERSION

    def obj_make_compatible_from_manifest(self, primitive, target_version,
                                          version_manifest):
        super(Service, self).obj_make_compatible_from_manifest(
            primitive, target_version, version_manifest)
        _target_version = versionutils.convert_version_to_tuple(target_version)
        if _target_version < (1, 21) and 'uuid' in primitive:
            del primitive['uuid']
        if _target_version < (1, 16) and 'version' in primitive:
            del primitive['version']
        if _target_version < (1, 14) and 'forced_down' in primitive:
            del primitive['forced_down']
        if _target_version < (1, 13) and 'last_seen_up' in primitive:
            del primitive['last_seen_up']
        if _target_version < (1, 10):
            self._do_compute_node(self._context, primitive,
                                  version_manifest)

    def _do_compute_node(self, context, primitive, version_manifest):
        try:
            target_version = version_manifest['ComputeNode']
            compute = objects.ComputeNodeList.get_all_by_host(
                context, primitive['host'])[0]
        except Exception:
            return
        primitive['compute_node'] = compute.obj_to_primitive(
            target_version=target_version,
            version_manifest=version_manifest)

    @staticmethod
    def _from_db_object(context, service, db_service):
        allow_missing = ('availability_zone',)
        for key in service.fields:
            if key in allow_missing and key not in db_service:
                continue
            if key == 'compute_node':
                continue
            elif key == 'version':
                setattr(service, base.get_attrname(key), db_service[key])
            elif key == 'uuid' and not db_service.get(key):
                continue
            else:
                service[key] = db_service[key]

        service._context = context
        service.obj_reset_changes()

        return service

    def obj_load_attr(self, attrname):
        if not self._context:
            raise exception.OrphanedObjectError(method='obj_load_attr',
                                                objtype=self.obj_name())

        LOG.debug("Lazy-loading '%(attr)s' on %(name)s id %(id)s",
                  {'attr': attrname,
                   'name': self.obj_name(),
                   'id': self.id,
                   })
        if attrname != 'compute_node':
            raise exception.ObjectActionError(
                action='obj_load_attr',
                reason='attribute %s not lazy-loadable' % attrname)
        if self.binary == 'tvault-contego':
            compute_nodes = objects.ComputeNodeList.get_all_by_host(
                self._context, self.host)
        else:
            raise exception.ServiceNotFound(service_id=self.id)
        self.compute_node = compute_nodes[0]

    @base.remotable_classmethod
    def get_by_id(cls, context, service_id):
        db_service = db.service_get(context, service_id)
        return cls._from_db_object(context, cls(), db_service)

    @base.remotable_classmethod
    def get_by_uuid(cls, context, service_uuid):
        db_service = db.service_get_by_uuid(context, service_uuid)
        return cls._from_db_object(context, cls(), db_service)

    @base.remotable_classmethod
    def get_by_host_and_topic(cls, context, host, topic):
        db_service = db.service_get_by_host_and_topic(context, host, topic)
        return cls._from_db_object(context, cls(), db_service)

    @base.remotable_classmethod
    def get_by_host_and_binary(cls, context, host, binary):
        try:
            db_service = db.service_get_by_host_and_binary(context,
                                                           host, binary)
        except exception.HostBinaryNotFound:
            return
        return cls._from_db_object(context, cls(), db_service)

    @staticmethod
    @db.select_db_reader_mode
    def _db_service_get_by_compute_host(context, host, use_slave=False):
        return db.service_get_by_compute_host(context, host)

    @base.remotable_classmethod
    def get_by_compute_host(cls, context, host, use_slave=False):
        db_service = cls._db_service_get_by_compute_host(context, host,
                                                         use_slave=use_slave)
        return cls._from_db_object(context, cls(), db_service)

    @base.remotable_classmethod
    def get_by_args(cls, context, host, binary):
        db_service = db.service_get_by_host_and_binary(context, host, binary)
        return cls._from_db_object(context, cls(), db_service)

    def _check_minimum_version(self):
        """Enforce that we are not older that the minimum version.
        This is a loose check to avoid creating or updating our service
        record if we would do so with a version that is older that the current
        minimum of all services. This could happen if we were started with
        older code by accident, either due to a rollback or an old and
        un-updated node suddenly coming back onto the network.
        There is technically a race here between the check and the update,
        but since the minimum version should always roll forward and never
        backwards, we don't need to worry about doing it atomically. Further,
        the consequence for getting this wrong is minor, in that we'll just
        fail to send messages that other services understand.
        """
        if not self.obj_attr_is_set('version'):
            return
        if not self.obj_attr_is_set('binary'):
            return
        minver = self.get_minimum_version(self._context, self.binary)
        if minver > self.version:
            raise exception.ServiceTooOld(thisver=self.version,
                                          minver=minver)

    @base.remotable
    def create(self):
        if self.obj_attr_is_set('id'):
            raise exception.ObjectActionError(action='create',
                                              reason='already created')
        self._check_minimum_version()
        updates = self.obj_get_changes()

        if 'uuid' not in updates:
            updates['uuid'] = uuidutils.generate_uuid()
            self.uuid = updates['uuid']

        db_service = db.service_create(self._context, updates)
        self._from_db_object(self._context, self, db_service)
        self._send_notification(fields.NotificationAction.CREATE)

    @base.remotable
    def save(self):
        updates = self.obj_get_changes()
        updates.pop('id', None)
        self._check_minimum_version()
        db_service = db.service_update(self._context, self.id, updates)
        self._from_db_object(self._context, self, db_service)

        self._send_status_update_notification(updates)

    def _send_status_update_notification(self, updates):
        if set(updates.keys()).intersection(
                {'disabled', 'disabled_reason', 'forced_down'}):
            self._send_notification(fields.NotificationAction.UPDATE)

    def _send_notification(self, action):
        payload = service_notification.ServiceStatusPayload(self)
        service_notification.ServiceStatusNotification(
            publisher=notification.NotificationPublisher.from_service_obj(
                self),
            event_type=notification.EventType(
                object='service',
                action=action),
            priority=fields.NotificationPriority.INFO,
            payload=payload).emit(self._context)

    @base.remotable
    def destroy(self):
        db.service_destroy(self._context, self.id)
        self._send_notification(fields.NotificationAction.DELETE)

    @classmethod
    def enable_min_version_cache(cls):
        cls.clear_min_version_cache()
        cls._SERVICE_VERSION_CACHING = True

    @classmethod
    def clear_min_version_cache(cls):
        cls._MIN_VERSION_CACHE = {}

    @staticmethod
    @db.select_db_reader_mode
    def _db_service_get_minimum_version(context, binaries, use_slave=False):
        return db.service_get_minimum_version(context, binaries)

    @base.remotable_classmethod
    def get_minimum_version_multi(cls, context, binaries, use_slave=False):
        if not all(binary.startswith('nova-') for binary in binaries):
            LOG.warning('get_minimum_version called with likely-incorrect '
                        'binaries `%s\'', ','.join(binaries))
            raise exception.ObjectActionError(action='get_minimum_version',
                                              reason='Invalid binary prefix')

        if (not cls._SERVICE_VERSION_CACHING or
              any(binary not in cls._MIN_VERSION_CACHE
                  for binary in binaries)):
            min_versions = cls._db_service_get_minimum_version(
                context, binaries, use_slave=use_slave)
            if min_versions:
                min_versions = {binary: version or 0
                                for binary, version in
                                min_versions.items()}
                cls._MIN_VERSION_CACHE.update(min_versions)
        else:
            min_versions = {binary: cls._MIN_VERSION_CACHE[binary]
                            for binary in binaries}

        if min_versions:
            version = min(min_versions.values())
        else:
            version = 0
        version = int(version)

        return version

    @base.remotable_classmethod
    def get_minimum_version(cls, context, binary, use_slave=False):
        return cls.get_minimum_version_multi(context, [binary],
                                             use_slave=use_slave)



@base.ContegoObjectRegistry.register
class ServiceList(base.ObjectListBase, base.ContegoObject):
    VERSION = '1.0'

    fields = {
        'objects': fields.ListOfObjectsField('Service'),
        }

    @base.remotable_classmethod
    def get_by_topic(cls, context, topic):
        db_services = db.service_get_all_by_topic(context, topic)
        return base.obj_make_list(context, cls(context), objects.Service,
                                  db_services)

    @base.remotable_classmethod
    def get_by_binary(cls, context, binary, include_disabled=False):
        db_services = db.service_get_all_by_binary(
            context, binary, include_disabled=include_disabled)
        return base.obj_make_list(context, cls(context), objects.Service,
                                  db_services)

    @base.remotable_classmethod
    def get_by_host(cls, context, host):
        db_services = db.service_get_all_by_host(context, host)
        return base.obj_make_list(context, cls(context), objects.Service,
                                  db_services)

    @base.remotable_classmethod
    def get_all(cls, context, disabled=None, set_zones=False):
        db_services = db.service_get_all(context, disabled=disabled)
        if set_zones:
            db_services = availability_zones.set_availability_zones(
                context, db_services)
        return base.obj_make_list(context, cls(context), objects.Service,
                                  db_services)

    @base.remotable_classmethod
    def get_all_computes_by_hv_type(cls, context, hv_type):
        db_services = db.service_get_all_computes_by_hv_type(
            context, hv_type, include_disabled=False)
        return base.obj_make_list(context, cls(context), objects.Service,
                                  db_services)