Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
// (c) Copyright 2023 Supertenant Ltd. - all rights reserved.
// See LICENSE file in project root for license terms.
"use strict";const shimmer=require("shimmer"),requireHook=require("../../../util/requireHook"),tracingUtil=require("../../tracingUtil"),constants=require("../../constants"),cls=require("../../cls");let logger,isActive=(logger=require("../../../logger").getLogger("tracing/amqp",newLogger=>{logger=newLogger}),!1);function instrumentChannel(channelModule){shimmer.wrap(channelModule.Channel.prototype,"sendMessage",shimSendMessage),shimmer.wrap(channelModule.BaseChannel.prototype,"dispatchMessage",shimDispatchMessage)}function instrumentChannelModel(channelModelModule){shimmer.wrap(channelModelModule.ConfirmChannel.prototype,"publish",shimChannelModelPublish),shimmer.wrap(channelModelModule.Channel.prototype,"get",shimChannelModelGet)}function instrumentCallbackModel(callbackModelModule){shimmer.wrap(callbackModelModule.ConfirmChannel.prototype,"publish",shimCallbackModelPublish),shimmer.wrap(callbackModelModule.Channel.prototype,"get",shimCallbackModelGet)}function shimSendMessage(originalFunction){return function(){var originalArgs=new Array(arguments.length);for(let i=0;i<arguments.length;i++)originalArgs[i]=arguments[i];return instrumentedSendMessage(this,originalFunction,originalArgs)}}function instrumentedSendMessage(ctx,originalSendMessage,originalArgs){var skipTracingResult=cls.skipExitTracing({isActive:isActive,extendedResponse:!0,skipParentSpanCheck:!0}),parentSpan=cls.getCurrentSpan(),isExitSpan=skipTracingResult.isExitSpan;return skipTracingResult.skip?(skipTracingResult.suppressed&&(propagateSuppression(originalArgs[0]),propagateSuppression(originalArgs[1])),originalSendMessage.apply(ctx,originalArgs)):!parentSpan||isExitSpan&&"rabbitmq"!==parentSpan.n?originalSendMessage.apply(ctx,originalArgs):isExitSpan&&"rabbitmq"===parentSpan.n?(processExitSpan(ctx,parentSpan,originalArgs),originalSendMessage.apply(ctx,originalArgs)):cls.ns.runAndReturn(()=>{var span=cls.startSpan("rabbitmq",constants.EXIT);processExitSpan(ctx,span,originalArgs);try{return originalSendMessage.apply(ctx,originalArgs)}finally{span.d=Date.now()-span.ts,span.transmit()}})}function processExitSpan(ctx,span,originalArgs){span.ts=Date.now(),span.stack=tracingUtil.getStackTrace(instrumentedSendMessage),span.data.rabbitmq={sort:"publish"},ctx.connection.stream&&(span.data.rabbitmq.address=("function"==typeof ctx.connection.stream.getProtocol?"amqps://":"amqp://")+ctx.connection.stream.remoteAddress+":"+ctx.connection.stream.remotePort);ctx=originalArgs[0]||{};ctx.exchange&&(span.data.rabbitmq.exchange=ctx.exchange),ctx.routingKey&&(span.data.rabbitmq.key=ctx.routingKey),propagateTraceContext(originalArgs[0],span),propagateTraceContext(originalArgs[1],span)}function propagateSuppression(map){map&&map.headers&&(map.headers[constants.traceLevelHeaderName]="0")}function propagateTraceContext(map,span){map&&map.headers&&(map.headers[constants.traceIdHeaderName]=span.t,map.headers[constants.spanIdHeaderName]=span.s,map.headers[constants.traceLevelHeaderName]="1")}function shimDispatchMessage(originalFunction){return function(){if(isActive){var originalArgs=new Array(arguments.length);for(let i=0;i<arguments.length;i++)originalArgs[i]=arguments[i];return instrumentedDispatchMessage(this,originalFunction,originalArgs)}return originalFunction.apply(this,arguments)}}function instrumentedDispatchMessage(ctx,originalDispatchMessage,originalArgs){const fields=originalArgs[0]||{};var consumerTag=fields.consumerTag;if(!ctx.consumers[consumerTag])return originalDispatchMessage.apply(ctx,originalArgs);consumerTag=cls.getCurrentSpan();if(consumerTag)return logger.warn("Cannot start an AMQP entry span when another span is already active. Currently, the following span is active: "+JSON.stringify(consumerTag)),originalDispatchMessage.apply(ctx,originalArgs);const headers=originalArgs[1]&&originalArgs[1].properties&&originalArgs[1].properties.headers?originalArgs[1].properties.headers:{};return cls.ns.runAndReturn(()=>{if("0"===tracingUtil.readAttribCaseInsensitive(headers,constants.traceLevelHeaderName))return cls.setTracingLevel("0"),originalDispatchMessage.apply(ctx,originalArgs);const span=cls.startSpan("rabbitmq",constants.ENTRY,tracingUtil.readAttribCaseInsensitive(headers,constants.traceIdHeaderName),tracingUtil.readAttribCaseInsensitive(headers,constants.spanIdHeaderName));span.ts=Date.now(),span.stack=tracingUtil.getStackTrace(instrumentedDispatchMessage),span.data.rabbitmq={sort:"consume"},ctx.connection.stream&&(span.data.rabbitmq.address=("function"==typeof ctx.connection.stream.getProtocol?"amqps://":"amqp://")+ctx.connection.stream.remoteAddress+":"+ctx.connection.stream.remotePort),fields.exchange&&(span.data.rabbitmq.exchange=fields.exchange),fields.routingKey&&(span.data.rabbitmq.key=fields.routingKey);try{return originalDispatchMessage.apply(ctx,originalArgs)}finally{setImmediate(()=>{span.d=Date.now()-span.ts,span.transmit()})}})}function shimChannelModelGet(originalFunction){return function(){if(isActive){var originalArgs=new Array(arguments.length);for(let i=0;i<arguments.length;i++)originalArgs[i]=arguments[i];return instrumentedChannelModelGet(this,originalFunction,originalArgs)}return originalFunction.apply(this,arguments)}}function instrumentedChannelModelGet(ctx,originalGet,originalArgs){return cls.ns.runPromise(()=>{const span=cls.startSpan("rabbitmq",constants.ENTRY);return originalGet.apply(ctx,originalArgs).then(result=>{var fields,traceId,headers;return result?(fields=result.fields||{},headers=result.properties&&result.properties.headers?result.properties.headers:{},"0"===tracingUtil.readAttribCaseInsensitive(headers,constants.traceLevelHeaderName)?(cls.setTracingLevel("0"),span.cancel()):(traceId=tracingUtil.readAttribCaseInsensitive(headers,constants.traceIdHeaderName),headers=tracingUtil.readAttribCaseInsensitive(headers,constants.spanIdHeaderName),traceId&&headers&&(span.t=traceId,span.p=headers),span.ts=Date.now(),span.stack=tracingUtil.getStackTrace(instrumentedChannelModelGet),span.data.rabbitmq={sort:"consume"},ctx.connection.stream&&(span.data.rabbitmq.address="function"==typeof ctx.connection.stream.getProtocol?"amqps://":`amqp://${ctx.connection.stream.remoteAddress}:`+ctx.connection.stream.remotePort),fields.exchange&&(span.data.rabbitmq.exchange=fields.exchange),fields.routingKey&&(span.data.rabbitmq.key=fields.routingKey),setImmediate(()=>{span.d=Date.now()-span.ts,span.transmit()}))):span.cancel(),result})})}function shimCallbackModelGet(originalFunction){return function(){if(isActive){var originalArgs=new Array(arguments.length);for(let i=0;i<arguments.length;i++)originalArgs[i]=arguments[i];return instrumentedCallbackModelGet(this,originalFunction,originalArgs)}return originalFunction.apply(this,arguments)}}function instrumentedCallbackModelGet(ctx,originalGet,originalArgs){let originalCallback=null;return 3<=originalArgs.length&&"function"==typeof originalArgs[2]&&(originalCallback=originalArgs[2]),originalArgs[2]=(err,result)=>{var parentSpan;return err||!result?originalCallback?originalCallback(err,result):void 0:(parentSpan=cls.getCurrentSpan())?(logger.warn("Cannot start an AMQP entry span when another span is already active. Currently, the following span is active: "+JSON.stringify(parentSpan)),originalCallback(err,result)):cls.ns.runAndReturn(()=>{var fields=result.fields||{},headers=result.properties&&result.properties.headers?result.properties.headers:{};if("0"===tracingUtil.readAttribCaseInsensitive(headers,constants.traceLevelHeaderName))cls.setTracingLevel("0");else{const span=cls.startSpan("rabbitmq",constants.ENTRY,tracingUtil.readAttribCaseInsensitive(headers,constants.traceIdHeaderName),tracingUtil.readAttribCaseInsensitive(headers,constants.spanIdHeaderName));span.ts=Date.now(),span.stack=tracingUtil.getStackTrace(instrumentedChannelModelGet),span.data.rabbitmq={sort:"consume"},ctx.connection.stream&&(span.data.rabbitmq.address="function"==typeof ctx.connection.stream.getProtocol?"amqps://":`amqp://${ctx.connection.stream.remoteAddress}:`+ctx.connection.stream.remotePort),fields.exchange&&(span.data.rabbitmq.exchange=fields.exchange),fields.routingKey&&(span.data.rabbitmq.key=fields.routingKey),setImmediate(()=>{span.d=Date.now()-span.ts,span.transmit()})}return originalCallback?originalCallback(err,result):void 0})},originalGet.apply(ctx,originalArgs)}function shimChannelModelPublish(originalFunction){return function(){var originalArgs=new Array(arguments.length);for(let i=0;i<arguments.length;i++)originalArgs[i]=arguments[i];return instrumentedChannelModelPublish(this,originalFunction,originalArgs)}}function instrumentedChannelModelPublish(ctx,originalFunction,originalArgs){return cls.skipExitTracing({isActive:isActive})?originalFunction.apply(ctx,originalArgs):cls.ns.runAndReturn(()=>{const span=cls.startSpan("rabbitmq",constants.EXIT);if(5<=originalArgs.length&&"function"==typeof originalArgs[4]){const originalCb=originalArgs[4];originalArgs[4]=cls.ns.bind(function(){span.d=Date.now()-span.ts,span.transmit(),originalCb.apply(this,arguments)})}return originalFunction.apply(ctx,originalArgs)})}function shimCallbackModelPublish(originalFunction){return function(){var originalArgs=new Array(arguments.length);for(let i=0;i<arguments.length;i++)originalArgs[i]=arguments[i];return instrumentedCallbackModelPublish(this,originalFunction,originalArgs)}}function instrumentedCallbackModelPublish(ctx,originalFunction,originalArgs){return cls.skipExitTracing({isActive:isActive})?originalFunction.apply(ctx,originalArgs):cls.ns.runAndReturn(()=>{const span=cls.startSpan("rabbitmq",constants.EXIT);if(5<=originalArgs.length&&"function"==typeof originalArgs[4]){const originalCb=originalArgs[4];originalArgs[4]=cls.ns.bind(function(){span.d=Date.now()-span.ts,span.transmit(),originalCb.apply(this,arguments)})}return originalFunction.apply(ctx,originalArgs)})}exports.init=function(){requireHook.onFileLoad(/\/amqplib\/lib\/channel\.js/,instrumentChannel),requireHook.onFileLoad(/\/amqplib\/lib\/channel_model\.js/,instrumentChannelModel),requireHook.onFileLoad(/\/amqplib\/lib\/callback_model\.js/,instrumentCallbackModel)},exports.activate=function(){isActive=!0},exports.deactivate=function(){isActive=!1};