Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

sentry / @nestjs/microservices   js

Repository URL to install this package:

Version: 7.0.10 

/ server / server-redis.js

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
const constants_1 = require("../constants");
const ctx_host_1 = require("../ctx-host");
const server_1 = require("./server");
const enums_1 = require("../enums");
let redisPackage = {};
class ServerRedis extends server_1.Server {
    constructor(options) {
        super();
        this.options = options;
        this.isExplicitlyTerminated = false;
        this.url = this.getOptionsProp(this.options, 'url') || constants_1.REDIS_DEFAULT_URL;
        redisPackage = this.loadPackage('redis', ServerRedis.name, () => require('redis'));
        this.initializeSerializer(options);
        this.initializeDeserializer(options);
    }
    getTransport() {
        return enums_1.Transport.REDIS;
    }
    listen(callback) {
        this.subClient = this.createRedisClient();
        this.pubClient = this.createRedisClient();
        this.handleError(this.pubClient);
        this.handleError(this.subClient);
        this.start(callback);
    }
    start(callback) {
        this.bindEvents(this.subClient, this.pubClient);
        this.subClient.on(constants_1.CONNECT_EVENT, callback);
    }
    bindEvents(subClient, pubClient) {
        subClient.on(constants_1.MESSAGE_EVENT, this.getMessageHandler(pubClient).bind(this));
        const subscribePatterns = [...this.messageHandlers.keys()];
        subscribePatterns.forEach(pattern => {
            const { isEventHandler } = this.messageHandlers.get(pattern);
            subClient.subscribe(isEventHandler ? pattern : this.getRequestPattern(pattern));
        });
    }
    close() {
        this.isExplicitlyTerminated = true;
        this.pubClient && this.pubClient.quit();
        this.subClient && this.subClient.quit();
    }
    createRedisClient() {
        return redisPackage.createClient(Object.assign(Object.assign({}, this.getClientOptions()), { url: this.url }));
    }
    getMessageHandler(pub) {
        return async (channel, buffer) => this.handleMessage(channel, buffer, pub);
    }
    async handleMessage(channel, buffer, pub) {
        const rawMessage = this.parseMessage(buffer);
        const packet = this.deserializer.deserialize(rawMessage, { channel });
        const redisCtx = new ctx_host_1.RedisContext([channel]);
        if (shared_utils_1.isUndefined(packet.id)) {
            return this.handleEvent(channel, packet, redisCtx);
        }
        const publish = this.getPublisher(pub, channel, packet.id);
        const handler = this.getHandlerByPattern(channel);
        if (!handler) {
            const status = 'error';
            const noHandlerPacket = {
                id: packet.id,
                status,
                err: constants_1.NO_MESSAGE_HANDLER,
            };
            return publish(noHandlerPacket);
        }
        const response$ = this.transformToObservable(await handler(packet.data, redisCtx));
        response$ && this.send(response$, publish);
    }
    getPublisher(pub, pattern, id) {
        return (response) => {
            Object.assign(response, { id });
            const outgoingResponse = this.serializer.serialize(response);
            return pub.publish(this.getReplyPattern(pattern), JSON.stringify(outgoingResponse));
        };
    }
    parseMessage(content) {
        try {
            return JSON.parse(content);
        }
        catch (e) {
            return content;
        }
    }
    getRequestPattern(pattern) {
        return pattern;
    }
    getReplyPattern(pattern) {
        return `${pattern}.reply`;
    }
    handleError(stream) {
        stream.on(constants_1.ERROR_EVENT, (err) => this.logger.error(err));
    }
    getClientOptions() {
        // eslint-disable-next-line @typescript-eslint/camelcase
        const retry_strategy = (options) => this.createRetryStrategy(options);
        return {
            // eslint-disable-next-line @typescript-eslint/camelcase
            retry_strategy,
        };
    }
    createRetryStrategy(options) {
        if (options.error && options.error.code === 'ECONNREFUSED') {
            this.logger.error(`Error ECONNREFUSED: ${this.url}`);
        }
        if (this.isExplicitlyTerminated) {
            return undefined;
        }
        if (!this.getOptionsProp(this.options, 'retryAttempts') ||
            options.attempt > this.getOptionsProp(this.options, 'retryAttempts')) {
            this.logger.error(`Retry time exhausted: ${this.url}`);
            throw new Error('Retry time exhausted');
        }
        return this.getOptionsProp(this.options, 'retryDelay') || 0;
    }
}
exports.ServerRedis = ServerRedis;