Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

sentry / @nestjs/microservices   js

Repository URL to install this package:

Version: 7.0.10 

/ server / server-kafka.js

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const logger_service_1 = require("@nestjs/common/services/logger.service");
const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
const constants_1 = require("../constants");
const ctx_host_1 = require("../ctx-host");
const enums_1 = require("../enums");
const helpers_1 = require("../helpers");
const kafka_request_serializer_1 = require("../serializers/kafka-request.serializer");
const server_1 = require("./server");
let kafkaPackage = {};
class ServerKafka extends server_1.Server {
    constructor(options) {
        super();
        this.options = options;
        this.logger = new logger_service_1.Logger(ServerKafka.name);
        this.client = null;
        this.consumer = null;
        this.producer = null;
        const clientOptions = this.getOptionsProp(this.options, 'client') || {};
        const consumerOptions = this.getOptionsProp(this.options, 'consumer') || {};
        this.brokers = clientOptions.brokers || [constants_1.KAFKA_DEFAULT_BROKER];
        // append a unique id to the clientId and groupId
        // so they don't collide with a microservices client
        this.clientId =
            (clientOptions.clientId || constants_1.KAFKA_DEFAULT_CLIENT) + '-server';
        this.groupId = (consumerOptions.groupId || constants_1.KAFKA_DEFAULT_GROUP) + '-server';
        kafkaPackage = this.loadPackage('kafkajs', ServerKafka.name, () => require('kafkajs'));
        this.initializeSerializer(options);
        this.initializeDeserializer(options);
    }
    getTransport() {
        return enums_1.Transport.KAFKA;
    }
    async listen(callback) {
        this.client = this.createClient();
        await this.start(callback);
    }
    close() {
        this.consumer && this.consumer.disconnect();
        this.producer && this.producer.disconnect();
        this.consumer = null;
        this.producer = null;
        this.client = null;
    }
    async start(callback) {
        const consumerOptions = Object.assign(this.options.consumer || {}, {
            groupId: this.groupId,
        });
        this.consumer = this.client.consumer(consumerOptions);
        this.producer = this.client.producer(this.options.producer);
        await this.consumer.connect();
        await this.producer.connect();
        await this.bindEvents(this.consumer);
        callback();
    }
    createClient() {
        return new kafkaPackage.Kafka(Object.assign(this.options.client || {}, {
            clientId: this.clientId,
            brokers: this.brokers,
            logCreator: helpers_1.KafkaLogger.bind(null, this.logger),
        }));
    }
    async bindEvents(consumer) {
        const registeredPatterns = [...this.messageHandlers.keys()];
        const consumerSubscribeOptions = this.options.subscribe || {};
        const subscribeToPattern = async (pattern) => consumer.subscribe(Object.assign({ topic: pattern }, consumerSubscribeOptions));
        await Promise.all(registeredPatterns.map(subscribeToPattern));
        const consumerRunOptions = Object.assign(this.options.run || {}, {
            eachMessage: this.getMessageHandler(),
        });
        await consumer.run(consumerRunOptions);
    }
    getMessageHandler() {
        return async (payload) => this.handleMessage(payload);
    }
    getPublisher(replyTopic, replyPartition, correlationId) {
        return (data) => this.sendMessage(data, replyTopic, replyPartition, correlationId);
    }
    async handleMessage(payload) {
        const channel = payload.topic;
        const rawMessage = helpers_1.KafkaParser.parse(Object.assign(payload.message, {
            topic: payload.topic,
            partition: payload.partition,
        }));
        const headers = rawMessage.headers;
        const correlationId = headers[enums_1.KafkaHeaders.CORRELATION_ID];
        const replyTopic = headers[enums_1.KafkaHeaders.REPLY_TOPIC];
        const replyPartition = headers[enums_1.KafkaHeaders.REPLY_PARTITION];
        const packet = this.deserializer.deserialize(rawMessage, { channel });
        const kafkaContext = new ctx_host_1.KafkaContext([
            rawMessage,
            payload.partition,
            payload.topic,
        ]);
        // if the correlation id or reply topic is not set
        // then this is an event (events could still have correlation id)
        if (!correlationId || !replyTopic) {
            return this.handleEvent(packet.pattern, packet, kafkaContext);
        }
        const publish = this.getPublisher(replyTopic, replyPartition, correlationId);
        const handler = this.getHandlerByPattern(packet.pattern);
        if (!handler) {
            return publish({
                id: correlationId,
                err: constants_1.NO_MESSAGE_HANDLER,
            });
        }
        const response$ = this.transformToObservable(await handler(packet.data, kafkaContext));
        response$ && this.send(response$, publish);
    }
    sendMessage(message, replyTopic, replyPartition, correlationId) {
        const outgoingMessage = this.serializer.serialize(message.response);
        this.assignReplyPartition(replyPartition, outgoingMessage);
        this.assignCorrelationIdHeader(correlationId, outgoingMessage);
        this.assignErrorHeader(message, outgoingMessage);
        this.assignIsDisposedHeader(message, outgoingMessage);
        const replyMessage = Object.assign({
            topic: replyTopic,
            messages: [outgoingMessage],
        }, this.options.send || {});
        this.producer.send(replyMessage);
    }
    assignIsDisposedHeader(outgoingResponse, outgoingMessage) {
        if (!outgoingResponse.isDisposed) {
            return;
        }
        outgoingMessage.headers[enums_1.KafkaHeaders.NEST_IS_DISPOSED] = Buffer.alloc(1);
    }
    assignErrorHeader(outgoingResponse, outgoingMessage) {
        if (!outgoingResponse.err) {
            return;
        }
        outgoingMessage.headers[enums_1.KafkaHeaders.NEST_ERR] = Buffer.from(outgoingResponse.err);
    }
    assignCorrelationIdHeader(correlationId, outgoingMessage) {
        outgoingMessage.headers[enums_1.KafkaHeaders.CORRELATION_ID] = Buffer.from(correlationId);
    }
    assignReplyPartition(replyPartition, outgoingMessage) {
        if (shared_utils_1.isNil(replyPartition)) {
            return;
        }
        outgoingMessage.partition = parseFloat(replyPartition);
    }
    initializeSerializer(options) {
        this.serializer =
            (options && options.serializer) || new kafka_request_serializer_1.KafkaRequestSerializer();
    }
}
exports.ServerKafka = ServerKafka;