Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

sentry / @nestjs/microservices   js

Repository URL to install this package:

Version: 7.0.10 

/ client / client-kafka.js

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const logger_service_1 = require("@nestjs/common/services/logger.service");
const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
const constants_1 = require("../constants");
const kafka_response_deserializer_1 = require("../deserializers/kafka-response.deserializer");
const enums_1 = require("../enums");
const invalid_kafka_client_topic_partition_exception_1 = require("../errors/invalid-kafka-client-topic-partition.exception");
const invalid_kafka_client_topic_exception_1 = require("../errors/invalid-kafka-client-topic.exception");
const helpers_1 = require("../helpers");
const kafka_request_serializer_1 = require("../serializers/kafka-request.serializer");
const client_proxy_1 = require("./client-proxy");
let kafkaPackage = {};
class ClientKafka extends client_proxy_1.ClientProxy {
    constructor(options) {
        super();
        this.options = options;
        this.client = null;
        this.consumer = null;
        this.producer = null;
        this.logger = new logger_service_1.Logger(ClientKafka.name);
        this.responsePatterns = [];
        this.consumerAssignments = {};
        const clientOptions = this.getOptionsProp(this.options, 'client') || {};
        const consumerOptions = this.getOptionsProp(this.options, 'consumer') || {};
        this.brokers = clientOptions.brokers || [constants_1.KAFKA_DEFAULT_BROKER];
        // Append a unique id to the clientId and groupId
        // so they don't collide with a microservices client
        this.clientId =
            (clientOptions.clientId || constants_1.KAFKA_DEFAULT_CLIENT) + '-client';
        this.groupId = (consumerOptions.groupId || constants_1.KAFKA_DEFAULT_GROUP) + '-client';
        kafkaPackage = load_package_util_1.loadPackage('kafkajs', ClientKafka.name, () => require('kafkajs'));
        this.initializeSerializer(options);
        this.initializeDeserializer(options);
    }
    subscribeToResponseOf(pattern) {
        const request = this.normalizePattern(pattern);
        this.responsePatterns.push(this.getResponsePatternName(request));
    }
    close() {
        this.producer && this.producer.disconnect();
        this.consumer && this.consumer.disconnect();
        this.producer = null;
        this.consumer = null;
        this.client = null;
    }
    async connect() {
        if (this.client) {
            return this.producer;
        }
        this.client = this.createClient();
        const consumerOptions = Object.assign({
            partitionAssigners: [
                (config) => new helpers_1.KafkaRoundRobinPartitionAssigner(config),
            ],
        }, this.options.consumer || {}, {
            groupId: this.groupId,
        });
        this.producer = this.client.producer(this.options.producer || {});
        this.consumer = this.client.consumer(consumerOptions);
        // set member assignments on join and rebalance
        this.consumer.on(this.consumer.events.GROUP_JOIN, this.setConsumerAssignments.bind(this));
        await this.producer.connect();
        await this.consumer.connect();
        await this.bindTopics();
        return this.producer;
    }
    async bindTopics() {
        const consumerSubscribeOptions = this.options.subscribe || {};
        const subscribeTo = async (responsePattern) => this.consumer.subscribe(Object.assign({ topic: responsePattern }, consumerSubscribeOptions));
        await Promise.all(this.responsePatterns.map(subscribeTo));
        await this.consumer.run(Object.assign(this.options.run || {}, {
            eachMessage: this.createResponseCallback(),
        }));
    }
    createClient() {
        return new kafkaPackage.Kafka(Object.assign(this.options.client || {}, {
            clientId: this.clientId,
            brokers: this.brokers,
            logCreator: helpers_1.KafkaLogger.bind(null, this.logger),
        }));
    }
    createResponseCallback() {
        return (payload) => {
            const rawMessage = helpers_1.KafkaParser.parse(Object.assign(payload.message, {
                topic: payload.topic,
                partition: payload.partition,
            }));
            if (shared_utils_1.isUndefined(rawMessage.headers[enums_1.KafkaHeaders.CORRELATION_ID])) {
                return;
            }
            const { err, response, isDisposed, id } = this.deserializer.deserialize(rawMessage);
            const callback = this.routingMap.get(id);
            if (!callback) {
                return;
            }
            if (err || isDisposed) {
                return callback({
                    err,
                    response,
                    isDisposed,
                });
            }
            callback({
                err,
                response,
            });
        };
    }
    dispatchEvent(packet) {
        const pattern = this.normalizePattern(packet.pattern);
        const outgoingEvent = this.serializer.serialize(packet.data);
        const message = Object.assign({
            topic: pattern,
            messages: [outgoingEvent],
        }, this.options.send || {});
        return this.producer.send(message);
    }
    getReplyTopicPartition(topic) {
        const topicAssignments = this.consumerAssignments[topic];
        if (shared_utils_1.isUndefined(topicAssignments)) {
            throw new invalid_kafka_client_topic_exception_1.InvalidKafkaClientTopicException(topic);
        }
        // if the current member isn't listening to
        // any partitions on the topic then throw an error.
        if (shared_utils_1.isUndefined(topicAssignments[0])) {
            throw new invalid_kafka_client_topic_partition_exception_1.InvalidKafkaClientTopicPartitionException(topic);
        }
        return topicAssignments[0].toString();
    }
    publish(partialPacket, callback) {
        try {
            const packet = this.assignPacketId(partialPacket);
            const pattern = this.normalizePattern(partialPacket.pattern);
            const replyTopic = this.getResponsePatternName(pattern);
            const replyPartition = this.getReplyTopicPartition(replyTopic);
            const serializedPacket = this.serializer.serialize(packet.data);
            serializedPacket.headers[enums_1.KafkaHeaders.CORRELATION_ID] = packet.id;
            serializedPacket.headers[enums_1.KafkaHeaders.REPLY_TOPIC] = replyTopic;
            serializedPacket.headers[enums_1.KafkaHeaders.REPLY_PARTITION] = replyPartition;
            this.routingMap.set(packet.id, callback);
            const message = Object.assign({
                topic: pattern,
                messages: [serializedPacket],
            }, this.options.send || {});
            this.producer.send(message);
            return () => this.routingMap.delete(packet.id);
        }
        catch (err) {
            callback({ err });
        }
    }
    getResponsePatternName(pattern) {
        return `${pattern}.reply`;
    }
    setConsumerAssignments(data) {
        this.consumerAssignments = data.payload.memberAssignment;
    }
    initializeSerializer(options) {
        this.serializer =
            (options && options.serializer) || new kafka_request_serializer_1.KafkaRequestSerializer();
    }
    initializeDeserializer(options) {
        this.deserializer =
            (options && options.deserializer) || new kafka_response_deserializer_1.KafkaResponseDeserializer();
    }
}
exports.ClientKafka = ClientKafka;