Repository URL to install this package:
Version:
4.2.61 ▾
|
# Copyright 2018 TrilioData Inc.
# All Rights Reserved.
from oslo_log import log as logging
from oslo_utils import versionutils
from dmapi import exception
from dmapi.i18n import _LW
from dmapi.objects import base
from dmapi.objects import fields
from dmapi import db
from dmapi import objects
LOG = logging.getLogger(__name__)
# NOTE(danms): This is the global service version counter
SERVICE_VERSION = 15
SERVICE_VERSION_HISTORY = (
{'compute_rpc': '4.13'},
)
# TODO(berrange): Remove ContegoObjectDictCompat
#@base.ContegoObjectRegistry.register
class Service():
# Version 1.20: Added get_minimum_version_multi()
VERSION = '1.20'
_MIN_VERSION_CACHE = {}
_SERVICE_VERSION_CACHING = False
SERVICE_VERSION = 2
def __init__(self, *args, **kwargs):
# NOTE(danms): We're going against the rules here and overriding
# init. The reason is that we want to *ensure* that we're always
# setting the current service version on our objects, overriding
# whatever else might be set in the database, or otherwise (which
# is the normal reason not to override init).
#
# We also need to do this here so that it's set on the client side
# all the time, such that create() and save() operations will
# include the current service version.
if 'version' in kwargs:
raise exception.ObjectActionError(
action='init',
reason='Version field is immutable')
#super(Service, self).__init__(*args, **kwargs)
self.version = SERVICE_VERSION
@base.ContegoObjectRegistry.register
class Service(base.ContegoPersistentObject, base.ContegoObject,
base.ContegoObjectDictCompat):
VERSION = '2.0'
fields = {
'id': fields.IntegerField(read_only=True),
'uuid': fields.UUIDField(),
'host': fields.StringField(nullable=True),
'binary': fields.StringField(nullable=True),
'topic': fields.StringField(nullable=True),
'report_count': fields.IntegerField(),
'disabled': fields.BooleanField(),
'disabled_reason': fields.StringField(nullable=True),
'availability_zone': fields.StringField(nullable=True),
'compute_node': fields.ObjectField('ComputeNode'),
'last_seen_up': fields.DateTimeField(nullable=True),
'forced_down': fields.BooleanField(),
'version': fields.StringField(),
}
_MIN_VERSION_CACHE = {}
_SERVICE_VERSION_CACHING = False
def __init__(self, *args, **kwargs):
if 'version' in kwargs:
raise exception.ObjectActionError(
action='init',
reason='Version field is immutable')
super(Service, self).__init__(*args, **kwargs)
self.version = SERVICE_VERSION
def obj_make_compatible_from_manifest(self, primitive, target_version,
version_manifest):
super(Service, self).obj_make_compatible_from_manifest(
primitive, target_version, version_manifest)
_target_version = versionutils.convert_version_to_tuple(target_version)
if _target_version < (1, 21) and 'uuid' in primitive:
del primitive['uuid']
if _target_version < (1, 16) and 'version' in primitive:
del primitive['version']
if _target_version < (1, 14) and 'forced_down' in primitive:
del primitive['forced_down']
if _target_version < (1, 13) and 'last_seen_up' in primitive:
del primitive['last_seen_up']
if _target_version < (1, 10):
self._do_compute_node(self._context, primitive,
version_manifest)
def _do_compute_node(self, context, primitive, version_manifest):
try:
target_version = version_manifest['ComputeNode']
compute = objects.ComputeNodeList.get_all_by_host(
context, primitive['host'])[0]
except Exception:
return
primitive['compute_node'] = compute.obj_to_primitive(
target_version=target_version,
version_manifest=version_manifest)
@staticmethod
def _from_db_object(context, service, db_service):
allow_missing = ('availability_zone',)
for key in service.fields:
if key in allow_missing and key not in db_service:
continue
if key == 'compute_node':
continue
elif key == 'version':
setattr(service, base.get_attrname(key), db_service[key])
elif key == 'uuid' and not db_service.get(key):
continue
else:
service[key] = db_service[key]
service._context = context
service.obj_reset_changes()
return service
def obj_load_attr(self, attrname):
if not self._context:
raise exception.OrphanedObjectError(method='obj_load_attr',
objtype=self.obj_name())
LOG.debug("Lazy-loading '%(attr)s' on %(name)s id %(id)s",
{'attr': attrname,
'name': self.obj_name(),
'id': self.id,
})
if attrname != 'compute_node':
raise exception.ObjectActionError(
action='obj_load_attr',
reason='attribute %s not lazy-loadable' % attrname)
if self.binary == 'tvault-contego':
compute_nodes = objects.ComputeNodeList.get_all_by_host(
self._context, self.host)
else:
raise exception.ServiceNotFound(service_id=self.id)
self.compute_node = compute_nodes[0]
@base.remotable_classmethod
def get_by_id(cls, context, service_id):
db_service = db.service_get(context, service_id)
return cls._from_db_object(context, cls(), db_service)
@base.remotable_classmethod
def get_by_uuid(cls, context, service_uuid):
db_service = db.service_get_by_uuid(context, service_uuid)
return cls._from_db_object(context, cls(), db_service)
@base.remotable_classmethod
def get_by_host_and_topic(cls, context, host, topic):
db_service = db.service_get_by_host_and_topic(context, host, topic)
return cls._from_db_object(context, cls(), db_service)
@base.remotable_classmethod
def get_by_host_and_binary(cls, context, host, binary):
try:
db_service = db.service_get_by_host_and_binary(context,
host, binary)
except exception.HostBinaryNotFound:
return
return cls._from_db_object(context, cls(), db_service)
@staticmethod
@db.select_db_reader_mode
def _db_service_get_by_compute_host(context, host, use_slave=False):
return db.service_get_by_compute_host(context, host)
@base.remotable_classmethod
def get_by_compute_host(cls, context, host, use_slave=False):
db_service = cls._db_service_get_by_compute_host(context, host,
use_slave=use_slave)
return cls._from_db_object(context, cls(), db_service)
@base.remotable_classmethod
def get_by_args(cls, context, host, binary):
db_service = db.service_get_by_host_and_binary(context, host, binary)
return cls._from_db_object(context, cls(), db_service)
def _check_minimum_version(self):
"""Enforce that we are not older that the minimum version.
This is a loose check to avoid creating or updating our service
record if we would do so with a version that is older that the current
minimum of all services. This could happen if we were started with
older code by accident, either due to a rollback or an old and
un-updated node suddenly coming back onto the network.
There is technically a race here between the check and the update,
but since the minimum version should always roll forward and never
backwards, we don't need to worry about doing it atomically. Further,
the consequence for getting this wrong is minor, in that we'll just
fail to send messages that other services understand.
"""
if not self.obj_attr_is_set('version'):
return
if not self.obj_attr_is_set('binary'):
return
minver = self.get_minimum_version(self._context, self.binary)
if minver > self.version:
raise exception.ServiceTooOld(thisver=self.version,
minver=minver)
@base.remotable
def create(self):
if self.obj_attr_is_set('id'):
raise exception.ObjectActionError(action='create',
reason='already created')
self._check_minimum_version()
updates = self.obj_get_changes()
if 'uuid' not in updates:
updates['uuid'] = uuidutils.generate_uuid()
self.uuid = updates['uuid']
db_service = db.service_create(self._context, updates)
self._from_db_object(self._context, self, db_service)
self._send_notification(fields.NotificationAction.CREATE)
@base.remotable
def save(self):
updates = self.obj_get_changes()
updates.pop('id', None)
self._check_minimum_version()
db_service = db.service_update(self._context, self.id, updates)
self._from_db_object(self._context, self, db_service)
self._send_status_update_notification(updates)
def _send_status_update_notification(self, updates):
if set(updates.keys()).intersection(
{'disabled', 'disabled_reason', 'forced_down'}):
self._send_notification(fields.NotificationAction.UPDATE)
def _send_notification(self, action):
payload = service_notification.ServiceStatusPayload(self)
service_notification.ServiceStatusNotification(
publisher=notification.NotificationPublisher.from_service_obj(
self),
event_type=notification.EventType(
object='service',
action=action),
priority=fields.NotificationPriority.INFO,
payload=payload).emit(self._context)
@base.remotable
def destroy(self):
db.service_destroy(self._context, self.id)
self._send_notification(fields.NotificationAction.DELETE)
@classmethod
def enable_min_version_cache(cls):
cls.clear_min_version_cache()
cls._SERVICE_VERSION_CACHING = True
@classmethod
def clear_min_version_cache(cls):
cls._MIN_VERSION_CACHE = {}
@staticmethod
@db.select_db_reader_mode
def _db_service_get_minimum_version(context, binaries, use_slave=False):
return db.service_get_minimum_version(context, binaries)
@base.remotable_classmethod
def get_minimum_version_multi(cls, context, binaries, use_slave=False):
if not all(binary.startswith('nova-') for binary in binaries):
LOG.warning('get_minimum_version called with likely-incorrect '
'binaries `%s\'', ','.join(binaries))
raise exception.ObjectActionError(action='get_minimum_version',
reason='Invalid binary prefix')
if (not cls._SERVICE_VERSION_CACHING or
any(binary not in cls._MIN_VERSION_CACHE
for binary in binaries)):
min_versions = cls._db_service_get_minimum_version(
context, binaries, use_slave=use_slave)
if min_versions:
min_versions = {binary: version or 0
for binary, version in
min_versions.items()}
cls._MIN_VERSION_CACHE.update(min_versions)
else:
min_versions = {binary: cls._MIN_VERSION_CACHE[binary]
for binary in binaries}
if min_versions:
version = min(min_versions.values())
else:
version = 0
version = int(version)
return version
@base.remotable_classmethod
def get_minimum_version(cls, context, binary, use_slave=False):
return cls.get_minimum_version_multi(context, [binary],
use_slave=use_slave)
@base.ContegoObjectRegistry.register
class ServiceList(base.ObjectListBase, base.ContegoObject):
VERSION = '1.0'
fields = {
'objects': fields.ListOfObjectsField('Service'),
}
@base.remotable_classmethod
def get_by_topic(cls, context, topic):
db_services = db.service_get_all_by_topic(context, topic)
return base.obj_make_list(context, cls(context), objects.Service,
db_services)
@base.remotable_classmethod
def get_by_binary(cls, context, binary, include_disabled=False):
db_services = db.service_get_all_by_binary(
context, binary, include_disabled=include_disabled)
return base.obj_make_list(context, cls(context), objects.Service,
db_services)
@base.remotable_classmethod
def get_by_host(cls, context, host):
db_services = db.service_get_all_by_host(context, host)
return base.obj_make_list(context, cls(context), objects.Service,
db_services)
@base.remotable_classmethod
def get_all(cls, context, disabled=None, set_zones=False):
db_services = db.service_get_all(context, disabled=disabled)
if set_zones:
db_services = availability_zones.set_availability_zones(
context, db_services)
return base.obj_make_list(context, cls(context), objects.Service,
db_services)
@base.remotable_classmethod
def get_all_computes_by_hv_type(cls, context, hv_type):
db_services = db.service_get_all_computes_by_hv_type(
context, hv_type, include_disabled=False)
return base.obj_make_list(context, cls(context), objects.Service,
db_services)