Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

sentry / @nestjs/microservices   js

Repository URL to install this package:

Version: 7.0.10 

/ server / server.js

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const logger_service_1 = require("@nestjs/common/services/logger.service");
const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
const rxjs_1 = require("rxjs");
const operators_1 = require("rxjs/operators");
const incoming_request_deserializer_1 = require("../deserializers/incoming-request.deserializer");
const identity_serializer_1 = require("../serializers/identity.serializer");
const utils_1 = require("../utils");
const constants_1 = require("../constants");
class Server {
    constructor() {
        this.messageHandlers = new Map();
        this.logger = new logger_service_1.Logger(Server.name);
    }
    getTransport() {
        return -1;
    }
    addHandler(pattern, callback, isEventHandler = false) {
        const route = utils_1.transformPatternToRoute(pattern);
        callback.isEventHandler = isEventHandler;
        this.messageHandlers.set(route, callback);
    }
    getHandlers() {
        return this.messageHandlers;
    }
    getHandlerByPattern(pattern) {
        const route = this.getRouteFromPattern(pattern);
        return this.messageHandlers.has(route)
            ? this.messageHandlers.get(route)
            : null;
    }
    send(stream$, respond) {
        let dataBuffer = null;
        const scheduleOnNextTick = (data) => {
            if (!dataBuffer) {
                dataBuffer = [data];
                process.nextTick(() => {
                    dataBuffer.forEach(buffer => respond(buffer));
                    dataBuffer = null;
                });
            }
            else if (!data.isDisposed) {
                dataBuffer = dataBuffer.concat(data);
            }
            else {
                dataBuffer[dataBuffer.length - 1].isDisposed = data.isDisposed;
            }
        };
        return stream$
            .pipe(operators_1.catchError((err) => {
            scheduleOnNextTick({ err });
            return rxjs_1.EMPTY;
        }), operators_1.finalize(() => scheduleOnNextTick({ isDisposed: true })))
            .subscribe((response) => scheduleOnNextTick({ response }));
    }
    async handleEvent(pattern, packet, context) {
        const handler = this.getHandlerByPattern(pattern);
        if (!handler) {
            return this.logger.error(constants_1.NO_EVENT_HANDLER);
        }
        const resultOrStream = await handler(packet.data, context);
        if (this.isObservable(resultOrStream)) {
            resultOrStream.pipe(operators_1.publish()).connect();
        }
    }
    transformToObservable(resultOrDeffered) {
        if (resultOrDeffered instanceof Promise) {
            return rxjs_1.from(resultOrDeffered);
        }
        else if (!this.isObservable(resultOrDeffered)) {
            return rxjs_1.of(resultOrDeffered);
        }
        return resultOrDeffered;
    }
    getOptionsProp(obj, prop, defaultValue = undefined) {
        return obj && prop in obj ? obj[prop] : defaultValue;
    }
    handleError(error) {
        this.logger.error(error);
    }
    loadPackage(name, ctx, loader) {
        return load_package_util_1.loadPackage(name, ctx, loader);
    }
    initializeSerializer(options) {
        this.serializer =
            (options &&
                options.serializer) ||
                new identity_serializer_1.IdentitySerializer();
    }
    initializeDeserializer(options) {
        this.deserializer =
            (options &&
                options.deserializer) ||
                new incoming_request_deserializer_1.IncomingRequestDeserializer();
    }
    isObservable(input) {
        return input && shared_utils_1.isFunction(input.subscribe);
    }
    /**
     * Transforms the server Pattern to valid type and returns a route for him.
     *
     * @param  {string} pattern - server pattern
     * @returns string
     */
    getRouteFromPattern(pattern) {
        let validPattern;
        try {
            validPattern = JSON.parse(pattern);
        }
        catch (error) {
            // Uses a fundamental object (`pattern` variable without any conversion)
            validPattern = pattern;
        }
        return utils_1.transformPatternToRoute(validPattern);
    }
}
exports.Server = Server;