Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

sentry / @nestjs/microservices   js

Repository URL to install this package:

Version: 7.0.10 

/ server / server-mqtt.js

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
const constants_1 = require("../constants");
const mqtt_context_1 = require("../ctx-host/mqtt.context");
const server_1 = require("./server");
const enums_1 = require("../enums");
let mqttPackage = {};
class ServerMqtt extends server_1.Server {
    constructor(options) {
        super();
        this.options = options;
        this.url = this.getOptionsProp(options, 'url') || constants_1.MQTT_DEFAULT_URL;
        mqttPackage = this.loadPackage('mqtt', ServerMqtt.name, () => require('mqtt'));
        this.initializeSerializer(options);
        this.initializeDeserializer(options);
    }
    getTransport() {
        return enums_1.Transport.MQTT;
    }
    async listen(callback) {
        this.mqttClient = this.createMqttClient();
        this.start(callback);
    }
    start(callback) {
        this.handleError(this.mqttClient);
        this.bindEvents(this.mqttClient);
        this.mqttClient.on(constants_1.CONNECT_EVENT, callback);
    }
    bindEvents(mqttClient) {
        mqttClient.on(constants_1.MESSAGE_EVENT, this.getMessageHandler(mqttClient).bind(this));
        const registeredPatterns = [...this.messageHandlers.keys()];
        registeredPatterns.forEach(pattern => {
            const { isEventHandler } = this.messageHandlers.get(pattern);
            mqttClient.subscribe(isEventHandler ? pattern : this.getRequestPattern(pattern));
        });
    }
    close() {
        this.mqttClient && this.mqttClient.end();
    }
    createMqttClient() {
        return mqttPackage.connect(this.url, this.options);
    }
    getMessageHandler(pub) {
        return async (channel, buffer, originalPacket) => this.handleMessage(channel, buffer, pub, originalPacket);
    }
    async handleMessage(channel, buffer, pub, originalPacket) {
        const rawPacket = this.parseMessage(buffer.toString());
        const packet = this.deserializer.deserialize(rawPacket, { channel });
        const mqttContext = new mqtt_context_1.MqttContext([channel, originalPacket]);
        if (shared_utils_1.isUndefined(packet.id)) {
            return this.handleEvent(channel, packet, mqttContext);
        }
        const publish = this.getPublisher(pub, channel, packet.id);
        const handler = this.getHandlerByPattern(channel);
        if (!handler) {
            const status = 'error';
            const noHandlerPacket = {
                id: packet.id,
                status,
                err: constants_1.NO_MESSAGE_HANDLER,
            };
            return publish(noHandlerPacket);
        }
        const response$ = this.transformToObservable(await handler(packet.data, mqttContext));
        response$ && this.send(response$, publish);
    }
    getPublisher(client, pattern, id) {
        return (response) => {
            Object.assign(response, { id });
            const outgoingResponse = this.serializer.serialize(response);
            return client.publish(this.getReplyPattern(pattern), JSON.stringify(outgoingResponse));
        };
    }
    parseMessage(content) {
        try {
            return JSON.parse(content);
        }
        catch (e) {
            return content;
        }
    }
    matchMqttPattern(pattern, topic) {
        const patternSegments = pattern.split(constants_1.MQTT_SEPARATOR);
        const topicSegments = topic.split(constants_1.MQTT_SEPARATOR);
        const patternSegmentsLength = patternSegments.length;
        const topicSegmentsLength = topicSegments.length;
        const lastIndex = patternSegmentsLength - 1;
        for (let i = 0; i < patternSegmentsLength; i++) {
            const currentPattern = patternSegments[i];
            const patternChar = currentPattern[0];
            const currentTopic = topicSegments[i];
            if (!currentTopic && !currentPattern) {
                continue;
            }
            if (!currentTopic && currentPattern !== constants_1.MQTT_WILDCARD_ALL) {
                return false;
            }
            if (patternChar === constants_1.MQTT_WILDCARD_ALL) {
                return i === lastIndex;
            }
            if (patternChar !== constants_1.MQTT_WILDCARD_SINGLE &&
                currentPattern !== currentTopic) {
                return false;
            }
        }
        return patternSegmentsLength === topicSegmentsLength;
    }
    getHandlerByPattern(pattern) {
        const route = this.getRouteFromPattern(pattern);
        if (this.messageHandlers.has(route)) {
            return this.messageHandlers.get(route) || null;
        }
        for (const [key, value] of this.messageHandlers) {
            if (key.indexOf(constants_1.MQTT_WILDCARD_SINGLE) === -1 &&
                key.indexOf(constants_1.MQTT_WILDCARD_ALL) === -1) {
                continue;
            }
            if (this.matchMqttPattern(key, route)) {
                return value;
            }
        }
        return null;
    }
    getRequestPattern(pattern) {
        return pattern;
    }
    getReplyPattern(pattern) {
        return `${pattern}/reply`;
    }
    handleError(stream) {
        stream.on(constants_1.ERROR_EVENT, (err) => this.logger.error(err));
    }
}
exports.ServerMqtt = ServerMqtt;