Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

sentry / @nestjs/microservices   js

Repository URL to install this package:

Version: 7.0.10 

/ server / server-rmq.js

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
const constants_1 = require("../constants");
const ctx_host_1 = require("../ctx-host");
const server_1 = require("./server");
const enums_1 = require("../enums");
let rqmPackage = {};
class ServerRMQ extends server_1.Server {
    constructor(options) {
        super();
        this.options = options;
        this.server = null;
        this.channel = null;
        this.urls = this.getOptionsProp(this.options, 'urls') || [constants_1.RQM_DEFAULT_URL];
        this.queue =
            this.getOptionsProp(this.options, 'queue') || constants_1.RQM_DEFAULT_QUEUE;
        this.prefetchCount =
            this.getOptionsProp(this.options, 'prefetchCount') ||
                constants_1.RQM_DEFAULT_PREFETCH_COUNT;
        this.isGlobalPrefetchCount =
            this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
                constants_1.RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
        this.queueOptions =
            this.getOptionsProp(this.options, 'queueOptions') ||
                constants_1.RQM_DEFAULT_QUEUE_OPTIONS;
        this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
        rqmPackage = this.loadPackage('amqp-connection-manager', ServerRMQ.name, () => require('amqp-connection-manager'));
        this.initializeSerializer(options);
        this.initializeDeserializer(options);
    }
    getTransport() {
        return enums_1.Transport.RMQ;
    }
    async listen(callback) {
        await this.start(callback);
    }
    close() {
        this.channel && this.channel.close();
        this.server && this.server.close();
    }
    async start(callback) {
        this.server = this.createClient();
        this.server.on(constants_1.CONNECT_EVENT, () => {
            if (this.channel) {
                return;
            }
            this.channel = this.server.createChannel({
                json: false,
                setup: (channel) => this.setupChannel(channel, callback),
            });
        });
        this.server.on(constants_1.DISCONNECT_EVENT, () => {
            this.logger.error(constants_1.DISCONNECTED_RMQ_MESSAGE);
        });
    }
    createClient() {
        const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
        return rqmPackage.connect(this.urls, { connectionOptions: socketOptions });
    }
    async setupChannel(channel, callback) {
        const noAck = this.getOptionsProp(this.options, 'noAck', constants_1.RQM_DEFAULT_NOACK);
        await channel.assertQueue(this.queue, this.queueOptions);
        await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
        channel.consume(this.queue, (msg) => this.handleMessage(msg, channel), {
            noAck,
        });
        callback();
    }
    async handleMessage(message, channel) {
        const { content, properties } = message;
        const rawMessage = JSON.parse(content.toString());
        const packet = this.deserializer.deserialize(rawMessage);
        const pattern = shared_utils_1.isString(packet.pattern)
            ? packet.pattern
            : JSON.stringify(packet.pattern);
        const rmqContext = new ctx_host_1.RmqContext([message, channel, pattern]);
        if (shared_utils_1.isUndefined(packet.id)) {
            return this.handleEvent(pattern, packet, rmqContext);
        }
        const handler = this.getHandlerByPattern(pattern);
        if (!handler) {
            const status = 'error';
            const noHandlerPacket = {
                id: packet.id,
                err: constants_1.NO_MESSAGE_HANDLER,
                status,
            };
            return this.sendMessage(noHandlerPacket, properties.replyTo, properties.correlationId);
        }
        const response$ = this.transformToObservable(await handler(packet.data, rmqContext));
        const publish = (data) => this.sendMessage(data, properties.replyTo, properties.correlationId);
        response$ && this.send(response$, publish);
    }
    sendMessage(message, replyTo, correlationId) {
        const outgoingResponse = this.serializer.serialize(message);
        const buffer = Buffer.from(JSON.stringify(outgoingResponse));
        this.channel.sendToQueue(replyTo, buffer, { correlationId });
    }
}
exports.ServerRMQ = ServerRMQ;