Repository URL to install this package:
|
Version:
116.0.0 ▾
|
import atexit
import json
import logging
import time
import hydra.common.utils
from datetime import datetime
from pykafka import KafkaClient
from pykafka.exceptions import InvalidMessageError
from pykafka.partitioners import random_partitioner, hashing_partitioner
from hydra.common.base_event_pb2 import BaseEvent
from hydra.settings.config import KAFKA_HOST_LIST
log = logging.getLogger(__name__)
MAX_WAIT = 10 # seconds
class KafkaProducer(object):
"""
Simple class that implements the Kafka SimpleProducer. It allow messages to be send to kafka
synchronous and asynchronous messages.
Arguments:
topic: topic name
kafka_host_list: comma separated hosts list (eg. "host1:9092,host2:9092")
message: python dictionary
synchronous: If False, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
partitioner: The partitioner to use during message production
async_queue_maxsize: The maximum number of messages the producer
can have waiting to be sent to the broker. If messages are sent
faster than they can be delivered to the broker, the producer will
either block or throw an exception based on the preference specified
with block_on_queue_full.
Methods:
send(): send messages synchronously or asynchronously depending on settings (async argument)
** the current asynchronously implementation does not guarantee message delivery on failure!
"""
def __init__(self, topic, kafka_host_list=KAFKA_HOST_LIST, synchronous=False, partitioner=random_partitioner, async_queue_maxsize=100000):
self._topic = topic
self._synchronous = synchronous
self._async_queue_maxsize = async_queue_maxsize
self._partitioner = partitioner
self._kafka = KafkaClient(kafka_host_list)
self._producer = self._get_producer()
if self._synchronous:
# the current implementation can end up on an infinite loop
# we are disabling it for now.
raise NotImplementedError
def enable_hashing_partitioner(self):
"""Sets the producer partitioner to pykafka's hashing_partitioner"""
self._partitioner = hashing_partitioner
def _get_producer(self):
"""
:return: Producer for a specific topic
"""
producer = self._kafka.topics[self._topic].get_producer(partitioner=self._partitioner,
max_queued_messages=self._async_queue_maxsize,
sync=self._synchronous)
# Register the producer cleanup handler
atexit.register(lambda p: p.stop() if p._running else None, producer)
return producer
def send(self, message, partition_key=None):
"""
send messages synchronously or asynchronously depending on settings (async argument)
:param message: python dictionary that contains keys: sender, event and payload
:param partition_key: The key to use when deciding which partition to send this
message to
:return: output of the send_messages() method
"""
try:
# Checks if the message is a compiled protobuf event
if isinstance(message, BaseEvent):
msg = message.SerializeToString()
# The default expected message format is JSON
else:
msg = json.dumps(message, cls=hydra.common.utils.DatetimeEncoder)
except Exception:
log.exception('Error serializing the object to JSON `str`')
raise
if not self._synchronous:
try:
return self._producer.produce(msg, partition_key=partition_key)
except Exception:
log.exception('Failed to send message %s', msg)
def create_kafka_msg(self, sender, event_type, payload):
"""
Generates the python dictionary that will used to send the kafka message
:param sender: Describes the application name
:param event: Describes the event action
:param payload: python dictionary that contains the meta data related to the event
:return: python dictionary with the kafka message containing its required fields
"""
if not sender:
log.exception('Missing the sender parameter.')
raise InvalidMessageError
if not event_type:
log.exception('Missing the event_type parameter.')
raise InvalidMessageError
if not payload:
log.exception('Missing the payload parameter.')
raise InvalidMessageError
return dict(sender=sender, event_type=event_type, event_time=datetime.utcnow(), payload=payload)