Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# Copyright 2020 TrilioData Inc.
# All Rights Reserved.

from oslo_log import log as logging
from oslo_utils import excutils
from oslo_versionedobjects import exception as ovo_exception

from contego import exception
from contego.objects import base
from contego.objects import fields
from contego import rpc

LOG = logging.getLogger(__name__)


@base.ContegoObjectRegistry.register_if(False)
class NotificationObject(base.ContegoObject):
    """Base class for every notification related versioned object."""
    # Version 1.0: Initial version
    VERSION = '1.0'

    def __init__(self, **kwargs):
        super(NotificationObject, self).__init__(**kwargs)
        # The notification objects are created on the fly when nova emits the
        # notification. This causes that every object shows every field as
        # changed. We don't want to send this meaningless information so we
        # reset the object after creation.
        self.obj_reset_changes(recursive=False)


@base.ContegoObjectRegistry.register_notification
class EventType(NotificationObject):
    # Version 1.0: Initial version
    VERSION = '1.0'

    fields = {
        'object': fields.StringField(nullable=False),
        'action': fields.NotificationActionField(nullable=False),
        'phase': fields.NotificationPhaseField(nullable=True),
    }

    def __init__(self, object, action, phase=None):
        super(EventType, self).__init__()
        self.object = object
        self.action = action
        self.phase = phase

    def to_notification_event_type_field(self):
        """Serialize the object to the wire format."""
        s = '%s.%s' % (self.object, self.action)
        if self.phase:
            s += '.%s' % self.phase
        return s


@base.ContegoObjectRegistry.register_if(False)
class NotificationPayloadBase(NotificationObject):
    """Base class for the payload of versioned notifications."""
    # SCHEMA defines how to populate the payload fields. It is a dictionary
    # where every key value pair has the following format:
    # <payload_field_name>: (<data_source_name>,
    #                        <field_of_the_data_source>)
    # The <payload_field_name> is the name where the data will be stored in the
    # payload object, this field has to be defined as a field of the payload.
    # The <data_source_name> shall refer to name of the parameter passed as
    # kwarg to the payload's populate_schema() call and this object will be
    # used as the source of the data. The <field_of_the_data_source> shall be
    # a valid field of the passed argument.
    # The SCHEMA needs to be applied with the populate_schema() call before the
    # notification can be emitted.
    # The value of the payload.<payload_field_name> field will be set by the
    # <data_source_name>.<field_of_the_data_source> field. The
    # <data_source_name> will not be part of the payload object internal or
    # external representation.
    # Payload fields that are not set by the SCHEMA can be filled in the same
    # way as in any versioned object.
    SCHEMA = {}
    # Version 1.0: Initial version
    VERSION = '1.0'

    def __init__(self):
        super(NotificationPayloadBase, self).__init__()
        self.populated = not self.SCHEMA

    @rpc.if_notifications_enabled
    def populate_schema(self, set_none=True, **kwargs):
        """Populate the object based on the SCHEMA and the source objects

        :param kwargs: A dict contains the source object at the key defined in
                       the SCHEMA
        """
        for key, (obj, field) in self.SCHEMA.items():
            source = kwargs[obj]
            # trigger lazy-load if possible
            try:
                setattr(self, key, getattr(source, field))
            # ObjectActionError - not lazy loadable field
            # NotImplementedError - obj_load_attr() is not even defined
            # OrphanedObjectError - lazy loadable field but context is None
            except (exception.ObjectActionError,
                    NotImplementedError,
                    exception.OrphanedObjectError,
                    ovo_exception.OrphanedObjectError):
                if set_none:
                    # If it is unset or non lazy loadable in the source object
                    # then we cannot do anything else but try to default it
                    # in the payload object we are generating here.
                    # NOTE(gibi): This will fail if the payload field is not
                    # nullable, but that means that either the source object
                    # is not properly initialized or the payload field needs
                    # to be defined as nullable
                    setattr(self, key, None)
            except Exception:
                with excutils.save_and_reraise_exception():
                    LOG.error('Failed trying to populate attribute "%s" '
                              'using field: %s', key, field)

        self.populated = True

        # the schema population will create changed fields but we don't need
        # this information in the notification
        self.obj_reset_changes(recursive=True)


@base.ContegoObjectRegistry.register_notification
class NotificationPublisher(NotificationObject):
    # Version 1.0: Initial version
    VERSION = '1.0'

    fields = {
        'host': fields.StringField(nullable=False),
        'source': fields.NotificationSourceField(nullable=False),
    }

    def __init__(self, host, source):
        super(NotificationPublisher, self).__init__()
        self.host = host
        self.source = source

    @classmethod
    def from_service_obj(cls, service):
        source = fields.NotificationSource.get_source_by_binary(service.binary)
        return cls(host=service.host, source=source)


@base.ContegoObjectRegistry.register_if(False)
class NotificationBase(NotificationObject):
    """Base class for versioned notifications.

    Every subclass shall define a 'payload' field.
    """
    # Version 1.0: Initial version
    VERSION = '1.0'

    fields = {
        'priority': fields.NotificationPriorityField(),
        'event_type': fields.ObjectField('EventType'),
        'publisher': fields.ObjectField('NotificationPublisher'),
    }

    def _emit(self, context, event_type, publisher_id, payload):
        notifier = rpc.get_versioned_notifier(publisher_id)
        notify = getattr(notifier, self.priority)
        notify(context, event_type=event_type, payload=payload)

    @rpc.if_notifications_enabled
    def emit(self, context):
        """Send the notification."""
        assert self.payload.populated

        self.payload.obj_reset_changes(recursive=True)

        self._emit(context,
                   event_type=
                   self.event_type.to_notification_event_type_field(),
                   publisher_id='%s:%s' %
                                (self.publisher.source,
                                 self.publisher.host),
                   payload=self.payload.obj_to_primitive())


def notification_sample(sample):
    """Class decorator to attach the notification sample information
    to the notification object for documentation generation purposes.

    :param sample: the path of the sample json file relative to the
                   doc/notification_samples/ directory in the nova repository
                   root.
    """
    def wrap(cls):
        if not getattr(cls, 'samples', None):
            cls.samples = [sample]
        else:
            cls.samples.append(sample)
        return cls
    return wrap