Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
launchdarkly-server-sdk / event_processor.py
Size: Mime:
"""
Implementation details of the analytics event delivery component.
"""
# currently excluded from documentation - see docs/README.md

from collections import namedtuple
from email.utils import parsedate
import errno
import json
from threading import Event, Lock, Thread
import six
import time
import urllib3

# noinspection PyBroadException
try:
    import queue
except:
    # noinspection PyUnresolvedReferences,PyPep8Naming
    import Queue as queue

from ldclient.event_summarizer import EventSummarizer
from ldclient.fixed_thread_pool import FixedThreadPool
from ldclient.lru_cache import SimpleLRUCache
from ldclient.user_filter import UserFilter
from ldclient.interfaces import EventProcessor
from ldclient.repeating_timer import RepeatingTimer
from ldclient.util import UnsuccessfulResponseException
from ldclient.util import _headers
from ldclient.util import create_http_pool_manager
from ldclient.util import log
from ldclient.util import http_error_message, is_http_error_recoverable, stringify_attrs, throw_if_unsuccessful_response


__MAX_FLUSH_THREADS__ = 5
__CURRENT_EVENT_SCHEMA__ = 3
__USER_ATTRS_TO_STRINGIFY_FOR_EVENTS__ = [ "key", "secondary", "ip", "country", "email", "firstName", "lastName", "avatar", "name" ]


EventProcessorMessage = namedtuple('EventProcessorMessage', ['type', 'param'])


class EventOutputFormatter(object):
    def __init__(self, config):
        self._inline_users = config.inline_users_in_events
        self._user_filter = UserFilter(config)

    def make_output_events(self, events, summary):
        events_out = [ self.make_output_event(e) for e in events ]
        if len(summary.counters) > 0:
            events_out.append(self.make_summary_event(summary))
        return events_out
    
    def make_output_event(self, e):
        kind = e['kind']
        if kind == 'feature':
            is_debug = e.get('debug')
            out = {
                'kind': 'debug' if is_debug else 'feature',
                'creationDate': e['creationDate'],
                'key': e['key'],
                'version': e.get('version'),
                'variation': e.get('variation'),
                'value': e.get('value'),
                'default': e.get('default'),
                'prereqOf': e.get('prereqOf')
            }
            if self._inline_users or is_debug:
                out['user'] = self._process_user(e)
            else:
                out['userKey'] = self._get_userkey(e)
            if e.get('reason'):
                out['reason'] = e.get('reason')
            return out
        elif kind == 'identify':
            return {
                'kind': 'identify',
                'creationDate': e['creationDate'],
                'key': self._get_userkey(e),
                'user': self._process_user(e)
            }
        elif kind == 'custom':
            out = {
                'kind': 'custom',
                'creationDate': e['creationDate'],
                'key': e['key'],
                'data': e.get('data')
            }
            if self._inline_users:
                out['user'] = self._process_user(e)
            else:
                out['userKey'] = self._get_userkey(e)
            return out
        elif kind == 'index':
            return {
                'kind': 'index',
                'creationDate': e['creationDate'],
                'user': self._process_user(e)
            }
        else:
            return e

    """
    Transform summarizer data into the format used for the event payload.
    """
    def make_summary_event(self, summary):
        flags_out = dict()
        for ckey, cval in summary.counters.items():
            flag_key, variation, version = ckey
            flag_data = flags_out.get(flag_key)
            if flag_data is None:
                flag_data = { 'default': cval['default'], 'counters': [] }
                flags_out[flag_key] = flag_data
            counter = {
                'count': cval['count'],
                'value': cval['value']
            }
            if variation is not None:
                counter['variation'] = variation
            if version is None:
                counter['unknown'] = True
            else:
                counter['version'] = version
            flag_data['counters'].append(counter)
        return {
            'kind': 'summary',
            'startDate': summary.start_date,
            'endDate': summary.end_date,
            'features': flags_out
        }
    
    def _process_user(self, event):
        filtered = self._user_filter.filter_user_props(event['user'])
        return stringify_attrs(filtered, __USER_ATTRS_TO_STRINGIFY_FOR_EVENTS__)
    
    def _get_userkey(self, event):
        return str(event['user'].get('key'))


class EventPayloadSendTask(object):
    def __init__(self, http, config, formatter, payload, response_fn):
        self._http = http
        self._config = config
        self._formatter = formatter
        self._payload = payload
        self._response_fn = response_fn

    def run(self):
        try:
            output_events = self._formatter.make_output_events(self._payload.events, self._payload.summary)
            resp = self._do_send(output_events)
        except Exception:
            log.warning(
                'Unhandled exception in event processor. Analytics events were not processed.',
                exc_info=True)

    def _do_send(self, output_events):
        # noinspection PyBroadException
        try:
            json_body = json.dumps(output_events)
            log.debug('Sending events payload: ' + json_body)
            hdrs = _headers(self._config.sdk_key)
            hdrs['X-LaunchDarkly-Event-Schema'] = str(__CURRENT_EVENT_SCHEMA__)
            uri = self._config.events_uri
            r = self._http.request('POST', uri,
                                   headers=hdrs,
                                   timeout=urllib3.Timeout(connect=self._config.connect_timeout, read=self._config.read_timeout),
                                   body=json_body,
                                   retries=1)
            self._response_fn(r)
            return r
        except Exception as e:
            log.warning(
                'Unhandled exception in event processor. Analytics events were not processed. [%s]', e)


FlushPayload = namedtuple('FlushPayload', ['events', 'summary'])


class EventBuffer(object):
    def __init__(self, capacity):
        self._capacity = capacity
        self._events = []
        self._summarizer = EventSummarizer()
        self._exceeded_capacity = False
    
    def add_event(self, event):
        if len(self._events) >= self._capacity:
            if not self._exceeded_capacity:
                log.warning("Event queue is full-- dropped an event")
                self._exceeded_capacity = True
        else:
            self._events.append(event)
            self._exceeded_capacity = False
    
    def add_to_summary(self, event):
        self._summarizer.summarize_event(event)
    
    def get_payload(self):
        return FlushPayload(self._events, self._summarizer.snapshot())
    
    def clear(self):
        self._events = []
        self._summarizer.clear()


class EventDispatcher(object):
    def __init__(self, queue, config, http_client):
        self._queue = queue
        self._config = config
        self._http = create_http_pool_manager(num_pools=1, verify_ssl=config.verify_ssl) if http_client is None else http_client
        self._close_http = (http_client is None)  # so we know whether to close it later
        self._disabled = False
        self._buffer = EventBuffer(config.events_max_pending)
        self._user_keys = SimpleLRUCache(config.user_keys_capacity)
        self._formatter = EventOutputFormatter(config)
        self._last_known_past_time = 0

        self._flush_workers = FixedThreadPool(__MAX_FLUSH_THREADS__, "ldclient.flush")

        self._main_thread = Thread(target=self._run_main_loop)
        self._main_thread.daemon = True
        self._main_thread.start()

    def _run_main_loop(self):
        log.info("Starting event processor")
        while True:
            try:
                message = self._queue.get(block=True)
                if message.type == 'event':
                    self._process_event(message.param)
                elif message.type == 'flush':
                    self._trigger_flush()
                elif message.type == 'flush_users':
                    self._user_keys.clear()
                elif message.type == 'test_sync':
                    self._flush_workers.wait()
                    message.param.set()
                elif message.type == 'stop':
                    self._do_shutdown()
                    message.param.set()
                    return
            except Exception:
                log.error('Unhandled exception in event processor', exc_info=True)
    
    def _process_event(self, event):
        if self._disabled:
            return

        # Always record the event in the summarizer.
        self._buffer.add_to_summary(event)

        # Decide whether to add the event to the payload. Feature events may be added twice, once for
        # the event (if tracked) and once for debugging.
        add_full_event = False
        add_debug_event = False
        add_index_event = False
        if event['kind'] == "feature":
            add_full_event = event['trackEvents']
            add_debug_event = self._should_debug_event(event)
        else:
            add_full_event = True

        # For each user we haven't seen before, we add an index event - unless this is already
        # an identify event for that user.
        if not (add_full_event and self._config.inline_users_in_events):
            user = event.get('user')
            if user and not self.notice_user(user):
                if event['kind'] != 'identify':
                    add_index_event = True

        if add_index_event:
            ie = { 'kind': 'index', 'creationDate': event['creationDate'], 'user': user }
            self._buffer.add_event(ie)
        if add_full_event:
            self._buffer.add_event(event)
        if add_debug_event:
            debug_event = event.copy()
            debug_event['debug'] = True
            self._buffer.add_event(debug_event)

    # Add to the set of users we've noticed, and return true if the user was already known to us.
    def notice_user(self, user):
        if user is None or 'key' not in user:
            return False
        key = user['key']
        return self._user_keys.put(key, True)

    def _should_debug_event(self, event):
        debug_until = event.get('debugEventsUntilDate')
        if debug_until is not None:
            last_past = self._last_known_past_time
            now = int(time.time() * 1000)
            if debug_until > last_past and debug_until > now:
                return True
        return False

    def _trigger_flush(self):
        if self._disabled:
            return
        payload = self._buffer.get_payload()
        if len(payload.events) > 0 or len(payload.summary.counters) > 0:
            task = EventPayloadSendTask(self._http, self._config, self._formatter, payload,
                self._handle_response)
            if self._flush_workers.execute(task.run):
                # The events have been handed off to a flush worker; clear them from our buffer.
                self._buffer.clear()
            else:
                # We're already at our limit of concurrent flushes; leave the events in the buffer.
                pass

    def _handle_response(self, r):
        server_date_str = r.getheader('Date')
        if server_date_str is not None:
            server_date = parsedate(server_date_str)
            if server_date is not None:
                timestamp = int(time.mktime(server_date) * 1000)
                self._last_known_past_time = timestamp
        if r.status > 299:
            log.error(http_error_message(r.status, "event delivery", "some events were dropped"))
            if not is_http_error_recoverable(r.status):
                self._disabled = True
                return

    def _do_shutdown(self):
        self._flush_workers.stop()
        self._flush_workers.wait()
        if self._close_http:
            self._http.clear()


class DefaultEventProcessor(EventProcessor):
    def __init__(self, config, http=None):
        self._queue = queue.Queue(config.events_max_pending)
        self._flush_timer = RepeatingTimer(config.flush_interval, self.flush)
        self._users_flush_timer = RepeatingTimer(config.user_keys_flush_interval, self._flush_users)
        self._flush_timer.start()
        self._users_flush_timer.start()
        self._close_lock = Lock()
        self._closed = False
        EventDispatcher(self._queue, config, http)

    def send_event(self, event):
        event['creationDate'] = int(time.time() * 1000)
        self._queue.put(EventProcessorMessage('event', event))

    def flush(self):
        self._queue.put(EventProcessorMessage('flush', None))

    def stop(self):
        with self._close_lock:
            if self._closed:
                return
            self._closed = True
        self._flush_timer.stop()
        self._users_flush_timer.stop()
        self.flush()
        self._post_message_and_wait('stop')

    def _flush_users(self):
        self._queue.put(EventProcessorMessage('flush_users', None))

    # Used only in tests
    def _wait_until_inactive(self):
        self._post_message_and_wait('test_sync')

    def _post_message_and_wait(self, type):
        reply = Event()
        self._queue.put(EventProcessorMessage(type, reply))
        reply.wait()