"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const logger_service_1 = require("@nestjs/common/services/logger.service");
const load_package_util_1 = require("@nestjs/common/utils/load-package.util");
const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
const rxjs_1 = require("rxjs");
const operators_1 = require("rxjs/operators");
const incoming_request_deserializer_1 = require("../deserializers/incoming-request.deserializer");
const identity_serializer_1 = require("../serializers/identity.serializer");
const utils_1 = require("../utils");
const constants_1 = require("../constants");
class Server {
constructor() {
this.messageHandlers = new Map();
this.logger = new logger_service_1.Logger(Server.name);
}
getTransport() {
return -1;
}
addHandler(pattern, callback, isEventHandler = false) {
const route = utils_1.transformPatternToRoute(pattern);
callback.isEventHandler = isEventHandler;
this.messageHandlers.set(route, callback);
}
getHandlers() {
return this.messageHandlers;
}
getHandlerByPattern(pattern) {
const route = this.getRouteFromPattern(pattern);
return this.messageHandlers.has(route)
? this.messageHandlers.get(route)
: null;
}
send(stream$, respond) {
let dataBuffer = null;
const scheduleOnNextTick = (data) => {
if (!dataBuffer) {
dataBuffer = [data];
process.nextTick(() => {
dataBuffer.forEach(buffer => respond(buffer));
dataBuffer = null;
});
}
else if (!data.isDisposed) {
dataBuffer = dataBuffer.concat(data);
}
else {
dataBuffer[dataBuffer.length - 1].isDisposed = data.isDisposed;
}
};
return stream$
.pipe(operators_1.catchError((err) => {
scheduleOnNextTick({ err });
return rxjs_1.EMPTY;
}), operators_1.finalize(() => scheduleOnNextTick({ isDisposed: true })))
.subscribe((response) => scheduleOnNextTick({ response }));
}
async handleEvent(pattern, packet, context) {
const handler = this.getHandlerByPattern(pattern);
if (!handler) {
return this.logger.error(constants_1.NO_EVENT_HANDLER);
}
const resultOrStream = await handler(packet.data, context);
if (this.isObservable(resultOrStream)) {
resultOrStream.pipe(operators_1.publish()).connect();
}
}
transformToObservable(resultOrDeffered) {
if (resultOrDeffered instanceof Promise) {
return rxjs_1.from(resultOrDeffered);
}
else if (!this.isObservable(resultOrDeffered)) {
return rxjs_1.of(resultOrDeffered);
}
return resultOrDeffered;
}
getOptionsProp(obj, prop, defaultValue = undefined) {
return obj && prop in obj ? obj[prop] : defaultValue;
}
handleError(error) {
this.logger.error(error);
}
loadPackage(name, ctx, loader) {
return load_package_util_1.loadPackage(name, ctx, loader);
}
initializeSerializer(options) {
this.serializer =
(options &&
options.serializer) ||
new identity_serializer_1.IdentitySerializer();
}
initializeDeserializer(options) {
this.deserializer =
(options &&
options.deserializer) ||
new incoming_request_deserializer_1.IncomingRequestDeserializer();
}
isObservable(input) {
return input && shared_utils_1.isFunction(input.subscribe);
}
/**
* Transforms the server Pattern to valid type and returns a route for him.
*
* @param {string} pattern - server pattern
* @returns string
*/
getRouteFromPattern(pattern) {
let validPattern;
try {
validPattern = JSON.parse(pattern);
}
catch (error) {
// Uses a fundamental object (`pattern` variable without any conversion)
validPattern = pattern;
}
return utils_1.transformPatternToRoute(validPattern);
}
}
exports.Server = Server;