Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
// (c) Copyright 2023 Supertenant Ltd. - all rights reserved.
// See LICENSE file in project root for license terms.
"use strict";const requireHook=require("../../../util/requireHook"),tracingUtil=require("../../tracingUtil"),limitTraceId=require("../../tracingHeaders")["limitTraceId"],leftPad=require("../../leftPad"),constants=require("../../constants"),cls=require("../../cls");let logger,traceCorrelationEnabled=(logger=require("../../../logger").getLogger("tracing/kafkajs",newLogger=>{logger=newLogger}),constants.kafkaTraceCorrelationDefault),headerFormat=constants.kafkaHeaderFormatDefault,isActive=!1;function instrumentProducer(createProducer){return"function"!=typeof createProducer?createProducer:function(){var producer=createProducer.apply(this,arguments);return producer.send=shimmedSend(producer.send),producer.sendBatch=shimmedSendBatch(producer.sendBatch),producer}}function shimmedSend(originalSend){return async function(config){var topic=config.topic,config=config.messages,skipTracingResult=cls.skipExitTracing({isActive:isActive,extendedResponse:!0});if(skipTracingResult.skip||!config||0===config.length)return skipTracingResult.suppressed&&addTraceLevelSuppressionToAllMessages(config),originalSend.apply(this,arguments);var originalArgs=new Array(arguments.length);for(let argsIdx=0;argsIdx<arguments.length;argsIdx++)originalArgs[argsIdx]=arguments[argsIdx];return instrumentedSend(this,originalSend,originalArgs,topic,config)}}function instrumentedSend(ctx,originalSend,originalArgs,topic,messages){return cls.ns.runAndReturn(()=>{const span=cls.startSpan("kafka",constants.EXIT);return Array.isArray(messages)&&(span.b={s:messages.length},addTraceContextHeaderToAllMessages(messages,span)),span.data.kafka={service:topic,access:"send"},originalSend.apply(ctx,originalArgs).then(result=>(span.d=Date.now()-span.ts,span.transmit(),result)).catch(error=>{throw span.ec=1,span.data.kafka.error=error.message,span.d=Date.now()-span.ts,span.transmit(),error})})}function shimmedSendBatch(originalSendBatch){return async function(config){var config=config.topicMessages,skipTracingResult=cls.skipExitTracing({isActive:isActive,extendedResponse:!0});if(skipTracingResult.skip||!config||0===config.length)return skipTracingResult.suppressed&&config.forEach(topicMessage=>{addTraceLevelSuppressionToAllMessages(topicMessage.messages)}),originalSendBatch.apply(this,arguments);var originalArgs=new Array(arguments.length);for(let i=0;i<arguments.length;i++)originalArgs[i]=arguments[i];return instrumentedSendBatch(this,originalSendBatch,originalArgs,config)}}function instrumentedSendBatch(ctx,originalSendBatch,originalArgs,topicMessages){const topics=[];let messageCount=0;return topicMessages.forEach(topicMessage=>{topicMessage.topic&&topics.indexOf(topicMessage.topic)<0&&topics.push(topicMessage.topic),topicMessage.messages&&Array.isArray(topicMessage.messages)&&(messageCount+=topicMessage.messages.length)}),cls.ns.runAndReturn(()=>{const span=cls.startSpan("kafka",constants.EXIT);return topicMessages.forEach(topicMessage=>{addTraceContextHeaderToAllMessages(topicMessage.messages,span)}),span.data.kafka={service:topics.join(","),access:"send"},0<messageCount&&(span.b={s:messageCount}),originalSendBatch.apply(ctx,originalArgs).then(result=>(span.d=Date.now()-span.ts,span.transmit(),result)).catch(error=>{throw span.ec=1,span.data.kafka.error=error.message,span.d=Date.now()-span.ts,span.transmit(),error})})}function instrumentConsumer(Runner){return"function"!=typeof Runner?Runner:function(){var originalEachMessage,argObject=Array.prototype.slice.call(arguments)[0];return argObject&&argObject.eachMessage?(originalEachMessage=argObject.eachMessage,argObject.eachMessage=instrumentedEachMessage(originalEachMessage)):argObject&&argObject.eachBatch&&(originalEachMessage=argObject.eachBatch,argObject.eachBatch=instrumentedEachBatch(originalEachMessage)),new Runner(...arguments)}}function instrumentedEachMessage(originalEachMessage){return function(config){const topic=config.topic;config=config.message;if(!isActive)return originalEachMessage.apply(this,arguments);var parentSpan=cls.getCurrentSpan();if(parentSpan)return logger.warn("Cannot start a Kafka entry span when another span is already active. Currently, the following span is active: "+JSON.stringify(parentSpan)),originalEachMessage.apply(this,arguments);const ctx=this,originalArgs=new Array(arguments.length);for(let i=0;i<arguments.length;i++)originalArgs[i]=arguments[i];let traceId,longTraceId,parentSpanId,level;return config&&config.headers&&(config.headers[constants.kafkaTraceIdHeaderName]&&(traceId=String(config.headers[constants.kafkaTraceIdHeaderName]))&&(parentSpan=limitTraceId({traceId:traceId}),traceId=parentSpan.traceId,longTraceId=parentSpan.longTraceId),config.headers[constants.kafkaSpanIdHeaderName]&&(parentSpanId=String(config.headers[constants.kafkaSpanIdHeaderName])),config.headers[constants.kafkaTraceLevelHeaderName]&&(level=String(config.headers[constants.kafkaTraceLevelHeaderName])),null==traceId)&&null==parentSpanId&&null==level&&(config.headers[constants.kafkaLegacyTraceContextHeaderName]&&(parentSpan=config.headers[constants.kafkaLegacyTraceContextHeaderName],Buffer.isBuffer(parentSpan))&&24===parentSpan.length&&(parentSpan=tracingUtil.readTraceContextFromBuffer(parentSpan),traceId=parentSpan.t,parentSpanId=parentSpan.s),level=readTraceLevelBinary(config)),removeInstanaHeadersFromMessage(config),cls.ns.runAndReturn(()=>{if(isSuppressed(level))return cls.setTracingLevel("0"),originalEachMessage.apply(ctx,originalArgs);const span=cls.startSpan("kafka",constants.ENTRY,traceId,parentSpanId);longTraceId&&(span.lt=longTraceId),span.stack=[],span.data.kafka={access:"consume",service:topic};try{return originalEachMessage.apply(ctx,originalArgs)}finally{setImmediate(()=>{span.d=Date.now()-span.ts,span.transmit()})}})}}function instrumentedEachBatch(originalEachBatch){return function(config){const batch=config.batch;if(!isActive)return originalEachBatch.apply(this,arguments);var limited,traceContextBuffer,config=cls.getCurrentSpan();if(config)return logger.warn("Cannot start a Kafka entry span when another span is already active. Currently, the following span is active: "+JSON.stringify(config)),originalEachBatch.apply(this,arguments);const ctx=this,originalArgs=new Array(arguments.length);for(let argsIdx=0;argsIdx<arguments.length;argsIdx++)originalArgs[argsIdx]=arguments[argsIdx];let traceId,longTraceId,parentSpanId,level;if(batch.messages){for(let msgIdx=0;msgIdx<batch.messages.length;msgIdx++)if(batch.messages[msgIdx].headers&&batch.messages[msgIdx].headers[constants.kafkaTraceIdHeaderName]&&(traceId=String(batch.messages[msgIdx].headers[constants.kafkaTraceIdHeaderName]))&&(limited=limitTraceId({traceId:traceId}),traceId=limited.traceId,longTraceId=limited.longTraceId),batch.messages[msgIdx].headers&&batch.messages[msgIdx].headers[constants.kafkaSpanIdHeaderName]&&(parentSpanId=String(batch.messages[msgIdx].headers[constants.kafkaSpanIdHeaderName])),batch.messages[msgIdx].headers&&batch.messages[msgIdx].headers[constants.kafkaTraceLevelHeaderName]&&(level=String(batch.messages[msgIdx].headers[constants.kafkaTraceLevelHeaderName])),null!=traceId||null!=parentSpanId||null!=level)break;if(null==traceId&&null==parentSpanId&&null==level)for(let msgIdx=0;msgIdx<batch.messages.length;msgIdx++)if(batch.messages[msgIdx].headers&&batch.messages[msgIdx].headers[constants.kafkaLegacyTraceContextHeaderName]&&(traceContextBuffer=batch.messages[msgIdx].headers[constants.kafkaLegacyTraceContextHeaderName],Buffer.isBuffer(traceContextBuffer))&&24===traceContextBuffer.length&&(traceContextBuffer=tracingUtil.readTraceContextFromBuffer(traceContextBuffer),traceId=traceContextBuffer.t,parentSpanId=traceContextBuffer.s),level=readTraceLevelBinary(batch.messages[msgIdx]),null!=traceId||null!=parentSpanId||null!=level)break;for(let msgIdx=0;msgIdx<batch.messages.length;msgIdx++)removeInstanaHeadersFromMessage(batch.messages[msgIdx])}return cls.ns.runAndReturn(()=>{if(batch.messages&&isSuppressed(level))return cls.setTracingLevel("0"),originalEachBatch.apply(ctx,originalArgs);const span=cls.startSpan("kafka",constants.ENTRY,traceId,parentSpanId);longTraceId&&(span.lt=longTraceId),span.stack=[],span.data.kafka={access:"consume",service:batch?batch.topic:void 0},batch&&batch.messages&&(span.b={s:batch.messages.length});try{return originalEachBatch.apply(ctx,originalArgs)}finally{setImmediate(()=>{span.d=Date.now()-span.ts,span.transmit()})}})}}function isSuppressed(level){return"0"===level}function readTraceLevelBinary(message){if(message.headers[constants.kafkaLegacyTraceLevelHeaderName]){message=message.headers[constants.kafkaLegacyTraceLevelHeaderName];if(Buffer.isBuffer(message)&&1<=message.length)return String(message.readInt8())}return"1"}function addTraceContextHeaderToAllMessages(messages,span){if(traceCorrelationEnabled)switch(headerFormat){case"binary":addLegacyTraceContextHeaderToAllMessages(messages,span);break;case"string":addTraceIdSpanIdToAllMessages(messages,span);break;default:addLegacyTraceContextHeaderToAllMessages(messages,span),addTraceIdSpanIdToAllMessages(messages,span)}}function addLegacyTraceContextHeaderToAllMessages(messages,span){if(Array.isArray(messages))for(let msgIdx=0;msgIdx<messages.length;msgIdx++)null==messages[msgIdx].headers?messages[msgIdx].headers={[constants.kafkaLegacyTraceContextHeaderName]:tracingUtil.renderTraceContextToBuffer(span),[constants.kafkaLegacyTraceLevelHeaderName]:constants.kafkaLegacyTraceLevelValueInherit}:messages[msgIdx].headers&&"object"==typeof messages[msgIdx].headers&&(messages[msgIdx].headers[constants.kafkaLegacyTraceContextHeaderName]=tracingUtil.renderTraceContextToBuffer(span),messages[msgIdx].headers[constants.kafkaLegacyTraceLevelHeaderName]=constants.kafkaLegacyTraceLevelValueInherit)}function addTraceIdSpanIdToAllMessages(messages,span){if(Array.isArray(messages))for(let msgIdx=0;msgIdx<messages.length;msgIdx++)null==messages[msgIdx].headers?messages[msgIdx].headers={[constants.kafkaTraceIdHeaderName]:leftPad(span.t,32),[constants.kafkaSpanIdHeaderName]:span.s}:messages[msgIdx].headers&&"object"==typeof messages[msgIdx].headers&&(messages[msgIdx].headers[constants.kafkaTraceIdHeaderName]=leftPad(span.t,32),messages[msgIdx].headers[constants.kafkaSpanIdHeaderName]=span.s)}function addTraceLevelSuppressionToAllMessages(messages){if(traceCorrelationEnabled)switch(headerFormat){case"binary":addTraceLevelSuppressionToAllMessagesBinary(messages);break;case"string":addTraceLevelSuppressionToAllMessagesString(messages);break;default:addTraceLevelSuppressionToAllMessagesBinary(messages),addTraceLevelSuppressionToAllMessagesString(messages)}}function addTraceLevelSuppressionToAllMessagesBinary(messages){if(Array.isArray(messages))for(let msgIdx=0;msgIdx<messages.length;msgIdx++)null==messages[msgIdx].headers?messages[msgIdx].headers={[constants.kafkaLegacyTraceLevelHeaderName]:constants.kafkaLegacyTraceLevelValueSuppressed}:messages[msgIdx].headers&&"object"==typeof messages[msgIdx].headers&&(messages[msgIdx].headers[constants.kafkaLegacyTraceLevelHeaderName]=constants.kafkaLegacyTraceLevelValueSuppressed)}function addTraceLevelSuppressionToAllMessagesString(messages){if(Array.isArray(messages))for(let msgIdx=0;msgIdx<messages.length;msgIdx++)null==messages[msgIdx].headers?messages[msgIdx].headers={[constants.kafkaTraceLevelHeaderName]:"0"}:messages[msgIdx].headers&&"object"==typeof messages[msgIdx].headers&&(messages[msgIdx].headers[constants.kafkaTraceLevelHeaderName]="0")}function removeInstanaHeadersFromMessage(message){message.headers&&"object"==typeof message.headers&&constants.allInstanaKafkaHeaders.forEach(name=>{delete message.headers[name]})}exports.init=function(config){requireHook.onFileLoad(/\/kafkajs\/src\/producer\/messageProducer\.js/,instrumentProducer),requireHook.onFileLoad(/\/kafkajs\/src\/consumer\/runner\.js/,instrumentConsumer),traceCorrelationEnabled=config.tracing.kafka.traceCorrelation,headerFormat=config.tracing.kafka.headerFormat},exports.updateConfig=function(config){traceCorrelationEnabled=config.tracing.kafka.traceCorrelation,headerFormat=config.tracing.kafka.headerFormat},exports.activate=function(extraConfig){extraConfig&&extraConfig.tracing&&extraConfig.tracing.kafka&&(null!=extraConfig.tracing.kafka.traceCorrelation&&(traceCorrelationEnabled=extraConfig.tracing.kafka.traceCorrelation),"string"==typeof extraConfig.tracing.kafka.headerFormat)&&(headerFormat=extraConfig.tracing.kafka.headerFormat),isActive=!0},exports.deactivate=function(){isActive=!1};