Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
hydra / kafka_python / hydra / kafka_producers / producer.py
Size: Mime:
import atexit
import json
import logging
import time
import hydra.common.utils

from datetime import datetime

from pykafka import KafkaClient
from pykafka.exceptions import InvalidMessageError
from pykafka.partitioners import random_partitioner, hashing_partitioner

from hydra.common.base_event_pb2 import BaseEvent

from hydra.settings.config import KAFKA_HOST_LIST

log = logging.getLogger(__name__)

MAX_WAIT = 10  # seconds


class KafkaProducer(object):
    """
    Simple class that implements the Kafka SimpleProducer. It allow messages to be send to kafka
    synchronous and asynchronous messages.
    Arguments:
        topic: topic name
        kafka_host_list: comma separated hosts list (eg. "host1:9092,host2:9092")
        message: python dictionary
        synchronous: If False, the messages are sent asynchronously via another
            thread (process). We will not wait for a response to these
        partitioner: The partitioner to use during message production
        async_queue_maxsize: The maximum number of messages the producer
            can have waiting to be sent to the broker. If messages are sent
            faster than they can be delivered to the broker, the producer will
            either block or throw an exception based on the preference specified
            with block_on_queue_full.
    Methods:
        send(): send messages synchronously or asynchronously depending on settings (async argument)
        ** the current asynchronously implementation does not guarantee message delivery on failure!
    """

    def __init__(self, topic, kafka_host_list=KAFKA_HOST_LIST, synchronous=False, partitioner=random_partitioner, async_queue_maxsize=100000):
        self._topic = topic
        self._synchronous = synchronous
        self._async_queue_maxsize = async_queue_maxsize
        self._partitioner = partitioner
        self._kafka = KafkaClient(kafka_host_list)
        self._producer = self._get_producer()

        if self._synchronous:
            # the current implementation can end up on an infinite loop
            # we are disabling it for now.
            raise NotImplementedError

    def enable_hashing_partitioner(self):
        """Sets the producer partitioner to pykafka's hashing_partitioner"""
        self._partitioner = hashing_partitioner

    def _get_producer(self):
        """
        :return: Producer for a specific topic
        """
        producer = self._kafka.topics[self._topic].get_producer(partitioner=self._partitioner,
                                                                max_queued_messages=self._async_queue_maxsize,
                                                                sync=self._synchronous)
        # Register the producer cleanup handler
        atexit.register(lambda p: p.stop() if p._running else None, producer)
        return producer

    def send(self, message, partition_key=None):
        """
        send messages synchronously or asynchronously depending on settings (async argument)
        :param message: python dictionary that contains keys: sender, event and payload
        :param partition_key: The key to use when deciding which partition to send this
            message to
        :return: output of the send_messages() method
        """
        try:
            # Checks if the message is a compiled protobuf event
            if isinstance(message, BaseEvent):
                msg = message.SerializeToString()
            # The default expected message format is JSON
            else:
                msg = json.dumps(message, cls=hydra.common.utils.DatetimeEncoder)
        except Exception:
            log.exception('Error serializing the object to JSON `str`')
            raise

        if not self._synchronous:
            try:
                return self._producer.produce(msg, partition_key=partition_key)
            except Exception:
                log.exception('Failed to send message %s', msg)

    def create_kafka_msg(self, sender, event_type, payload):
        """
        Generates the python dictionary that will used to send the kafka message
        :param sender: Describes the application name
        :param event: Describes the event action
        :param payload: python dictionary that contains the meta data related to the event
        :return: python dictionary with the kafka message containing its required fields
        """

        if not sender:
            log.exception('Missing the sender parameter.')
            raise InvalidMessageError

        if not event_type:
            log.exception('Missing the event_type parameter.')
            raise InvalidMessageError

        if not payload:
            log.exception('Missing the payload parameter.')
            raise InvalidMessageError

        return dict(sender=sender, event_type=event_type, event_time=datetime.utcnow(), payload=payload)