Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# Copyright 2020 TrilioData Inc.
# All Rights Reserved.

__all__ = [
    'init',
    'cleanup',
    'set_defaults',
    'add_extra_exmods',
    'clear_extra_exmods',
    'get_allowed_exmods',
    'RequestContextSerializer',
    'get_client',
    'get_server',
    'get_notifier',
    'TRANSPORT_ALIASES',
]

import functools

from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from oslo_service import periodic_task
from oslo_utils import timeutils

import contego.conf
import contego.exception
from contego.i18n import _


CONF = contego.conf.CONF

LOG = logging.getLogger(__name__)

TRANSPORT = None
LEGACY_NOTIFIER = None
NOTIFICATION_TRANSPORT = None
NOTIFIER = None

ALLOWED_EXMODS = [contego.exception.__name__,]
EXTRA_EXMODS = []

TRANSPORT_ALIASES = {
    'contego.rpc.impl_kombu': 'rabbit',
    'contego.rpc.impl_qpid': 'qpid',
    'contego.rpc.impl_zmq': 'zmq',
}


def set_defaults(control_exchange):
    messaging.set_transport_defaults(control_exchange)

from oslo_config import cfg
def register_opts(conf):
    notification_opts = [
        cfg.StrOpt(
                'notification_format',
                choices=['unversioned', 'versioned', 'both'],
                default='both',
                deprecated_group='DEFAULT',
                help=""""""
            ),

    ]
    ALL_OPTS = (notification_opts)
    conf.register_opts(ALL_OPTS)


def init(conf):
    register_opts(conf)
    global TRANSPORT, NOTIFICATION_TRANSPORT, LEGACY_NOTIFIER, NOTIFIER
    exmods = get_allowed_exmods()
    TRANSPORT = messaging.get_transport(conf, allowed_remote_exmods=exmods)

    NOTIFICATION_TRANSPORT = messaging.get_notification_transport(
        conf, allowed_remote_exmods=exmods)
    serializer = RequestContextSerializer(JsonPayloadSerializer())
    if conf.notification_format == 'unversioned':
        LEGACY_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
                                             serializer=serializer)
        NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
                                      serializer=serializer, driver='noop')
    elif conf.notification_format == 'both':
        LEGACY_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
                                             serializer=serializer)
        NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
                                      serializer=serializer,
                                      topics=['versioned_notifications'])
    else:
        LEGACY_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
                                             serializer=serializer,
                                             driver='noop')
        NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
                                      serializer=serializer,
                                      topics=['versioned_notifications'])


def cleanup():
    global TRANSPORT, NOTIFICATION_TRANSPORT, LEGACY_NOTIFIER, NOTIFIER
    assert TRANSPORT is not None
    assert NOTIFICATION_TRANSPORT is not None
    assert LEGACY_NOTIFIER is not None
    assert NOTIFIER is not None
    TRANSPORT.cleanup()
    NOTIFICATION_TRANSPORT.cleanup()
    TRANSPORT = NOTIFICATION_TRANSPORT = LEGACY_NOTIFIER = NOTIFIER = None





def add_extra_exmods(*args):
    EXTRA_EXMODS.extend(args)


def clear_extra_exmods():
    del EXTRA_EXMODS[:]


def get_allowed_exmods():
    return ALLOWED_EXMODS + EXTRA_EXMODS


class JsonPayloadSerializer(messaging.NoOpSerializer):
    @staticmethod
    def serialize_entity(context, entity):
        return jsonutils.to_primitive(entity, convert_instances=True)


class RequestContextSerializer(messaging.Serializer):

    def __init__(self, base):
        self._base = base

    def serialize_entity(self, context, entity):
        if not self._base:
            return entity
        return self._base.serialize_entity(context, entity)

    def deserialize_entity(self, context, entity):
        if not self._base:
            return entity
        return self._base.deserialize_entity(context, entity)

    def serialize_context(self, context):
        return context.to_dict()

    def deserialize_context(self, context):
        return contego.context.RequestContext.from_dict(context)


def get_transport_url(url_str=None):
    return messaging.TransportURL.parse(CONF, url_str, TRANSPORT_ALIASES)


def get_client(target, version_cap=None, serializer=None):
    conf=contego.conf.CONF
    exmods = [contego.exception.__name__,] + []
    TRANSPORT = messaging.get_transport(conf, allowed_remote_exmods=exmods)
    #global TRANSPORT
    assert TRANSPORT is not None
    serializer = RequestContextSerializer(serializer)
    return messaging.RPCClient(TRANSPORT,
                               target,
                               version_cap=version_cap,
                               serializer=serializer)


def get_server(target, endpoints, serializer=None):
    assert TRANSPORT is not None
    serializer = RequestContextSerializer(serializer)
    return messaging.get_rpc_server(TRANSPORT,
                                    target,
                                    endpoints,
                                    executor='eventlet',
                                    serializer=serializer)


def get_notifier(service, host=None, publisher_id=None):
    assert LEGACY_NOTIFIER is not None
    if not publisher_id:
        publisher_id = "%s.%s" % (service, host or CONF.host)
    return LegacyValidatingNotifier(
            LEGACY_NOTIFIER.prepare(publisher_id=publisher_id))


def get_versioned_notifier(publisher_id):
    assert NOTIFIER is not None
    return NOTIFIER.prepare(publisher_id=publisher_id)


def create_transport(url):
    exmods = get_allowed_exmods()
    return messaging.get_transport(CONF,
                                   url=url,
                                   allowed_remote_exmods=exmods)


class LegacyValidatingNotifier(object):
    """Wraps an oslo.messaging Notifier and checks for allowed event_types."""

    # If true an exception is thrown if the event_type is not allowed, if false
    # then only a WARNING is logged
    fatal = False

    # This list contains the already existing therefore allowed legacy
    # notification event_types. New items shall not be added to the list as
    # Contego does not allow new legacy notifications any more. This list will be
    # removed when all the notification is transformed to versioned
    # notifications.
    allowed_legacy_notification_event_types = [
        'aggregate.addhost.end',

    ]

    message = _('%(event_type)s is not a versioned notification and not '
                'whitelisted. See ./doc/source/notification.rst')

    def __init__(self, notifier):
        self.notifier = notifier
        for priority in ['debug', 'info', 'warn', 'error', 'critical']:
            setattr(self, priority,
                    functools.partial(self._notify, priority))

    def _is_wrap_exception_notification(self, payload):
        # contegoserver.exception_wrapper.wrap_exception decorator emits notification
        # where the event_type is the name of the decorated function. This
        # is used in many places but it will be converted to versioned
        # notification in one run by updating the decorator so it is pointless
        # to white list all the function names here we white list the
        # notification itself detected by the special payload keys.
        return {'exception', 'args'} == set(payload.keys())

    def _notify(self, priority, ctxt, event_type, payload):
        if (event_type not in self.allowed_legacy_notification_event_types and
                not self._is_wrap_exception_notification(payload)):
            if self.fatal:
                raise AssertionError(self.message % {'event_type': event_type})
            else:
                LOG.warning(self.message, {'event_type': event_type})

        getattr(self.notifier, priority)(ctxt, event_type, payload)


class ClientWrapper(object):
    def __init__(self, client):
        self._client = client
        self.last_access_time = timeutils.utcnow()

    @property
    def client(self):
        self.last_access_time = timeutils.utcnow()
        return self._client


class ClientRouter(periodic_task.PeriodicTasks):
    """Creates and caches RPC clients that route to cells or the default.

    The default client connects to the API cell message queue. The rest of the
    clients connect to compute cell message queues.
    """
    def __init__(self, default_client):
        super(ClientRouter, self).__init__(CONF)
        self.clients = {}
        self.clients['default'] = ClientWrapper(default_client)
        self.target = default_client.target
        self.version_cap = default_client.version_cap
        self.serializer = getattr(default_client, 'serializer', None)
        # Prevent this empty context from overwriting the thread local copy
        #
        self.run_periodic_tasks(contego.context.RequestContext(overwrite=False))

    def _client(self, context, cell_mapping=None):
        if cell_mapping:
            client_id = cell_mapping.uuid
        else:
            client_id = 'default'

        try:
            client = self.clients[client_id].client
        except KeyError:
            transport = create_transport(cell_mapping.transport_url)
            client = messaging.RPCClient(transport, self.target,
                                         version_cap=self.version_cap,
                                         serializer=self.serializer)
            self.clients[client_id] = ClientWrapper(client)

        return client

    @periodic_task.periodic_task
    def _remove_stale_clients(self, context):
        timeout = 60

        def stale(client_id, last_access_time):
            if timeutils.is_older_than(last_access_time, timeout):
                LOG.debug('Removing stale RPC client: %s as it was last '
                          'accessed at %s', client_id, last_access_time)
                return True
            return False

        # Never expire the default client
        items_copy = list(self.clients.items())
        for client_id, client_wrapper in items_copy:
            if (client_id != 'default' and
                    stale(client_id, client_wrapper.last_access_time)):
                del self.clients[client_id]


def if_notifications_enabled(f):
    """Calls decorated method only if versioned notifications are enabled."""
    @functools.wraps(f)
    def wrapped(*args, **kwargs):
        if (NOTIFIER.is_enabled() and
                CONF.notifications.notification_format in ('both',
                                                           'versioned')):
            return f(*args, **kwargs)
        else:
            return None
    return wrapped