Repository URL to install this package:
|
Version:
0.7.0 ▾
|
// (c) Copyright 2023 Supertenant Ltd. - all rights reserved.
// See LICENSE file in project root for license terms.
"use strict";const requireHook=require("../../../util/requireHook"),tracingUtil=require("../../tracingUtil"),limitTraceId=require("../../tracingHeaders")["limitTraceId"],leftPad=require("../../leftPad"),constants=require("../../constants"),cls=require("../../cls"),shimmer=require("shimmer"),getFunctionArguments=require("../../../util/function_arguments")["getFunctionArguments"];let traceCorrelationEnabled=constants.kafkaTraceCorrelationDefault,logger,isActive=(logger=require("../../../logger").getLogger("tracing/rdkafka",newLogger=>{logger=newLogger}),!1);function logWarningForKafkaHeaderFormat(headerFormat){"binary"===headerFormat?logger.warn("Ignoring configuration value 'binary' for Kafka header format in node-rdkafka instrumentation, using header format 'string' instead. Binary headers do not work with node-rdkafka, see https://github.com/Blizzard/node-rdkafka/pull/968."):"both"===headerFormat&&logger.debug("Ignoring configuration or default value 'both' for Kafka header format in node-rdkafka instrumentation, using header format 'string' instead. Binary headers do not work with node-rdkafka, see https://github.com/Blizzard/node-rdkafka/pull/968.")}function instrumentProducer(ProducerClass){shimmer.wrap(ProducerClass.prototype,"produce",shimProduce)}function instrumentConsumer(module){const originalKafkaConsumer=module.KafkaConsumer;module.KafkaConsumer=function(){var that=new originalKafkaConsumer(...arguments);return shimmer.wrap(that,"emit",shimConsumerStreamEmit),that},Object.keys(originalKafkaConsumer).forEach(key=>{module.KafkaConsumer[key]=originalKafkaConsumer[key]})}function instrumentConsumerAsStream(KafkaConsumerStream){shimmer.wrap(KafkaConsumerStream.prototype,"emit",shimConsumerStreamEmit)}function shimProduce(originalProduce){return function(){var originalArgs=getFunctionArguments(arguments);return instrumentedProduce(this,originalProduce,originalArgs)}}function shimConsumerStreamEmit(originalEmit){return function(){var originalArgs=getFunctionArguments(arguments);return isActive?instrumentedConsumerEmit(this,originalEmit,originalArgs):originalEmit.apply(this,originalArgs)}}function instrumentedProduce(ctx,originalProduce,originalArgs){if(!originalArgs[2])return originalProduce.apply(this,originalArgs);var skipTracingResult=cls.skipExitTracing({isActive:isActive,extendedResponse:!0});if(skipTracingResult.skip)return skipTracingResult.suppressed&&(skipTracingResult=addTraceLevelSuppression(originalArgs[6]),originalArgs[6]=skipTracingResult),originalProduce.apply(ctx,originalArgs);const deliveryCb=ctx._cb_configs&&ctx._cb_configs.event&&ctx._cb_configs.event.delivery_cb&&"function"==typeof ctx._cb_configs.event.delivery_cb;return cls.ns.runAndReturn(()=>{const span=cls.startSpan("kafka",constants.EXIT);var topic=originalArgs[0],topic=(span.stack=tracingUtil.getStackTrace(instrumentedProduce,1),span.data.kafka={service:topic,access:"send"},addTraceContextHeader(originalArgs[6],span));originalArgs[6]=topic,deliveryCb&&ctx.once("delivery-report",function(err){span.d=Date.now()-span.ts,err&&(span.ec=1,span.data.kafka.error=err.message),span.transmit()});try{var result=originalProduce.apply(ctx,originalArgs);return deliveryCb||(span.d=Date.now()-span.ts,span.transmit()),result}catch(error){throw span.ec=1,span.data.kafka.error=error.message,deliveryCb||(span.d=Date.now()-span.ts,span.transmit()),error}})}function removeInstanaHeadersFromMessage(messageData){if(messageData.headers&&messageData.headers.length)for(let i=messageData.headers.length-1;0<=i;i--){var headerObject=messageData.headers[i],headerObject=Object.keys(headerObject)[0].toUpperCase();constants.allInstanaKafkaHeaders.includes(headerObject)&&messageData.headers.splice(i,1)}}function instrumentedConsumerEmit(ctx,originalEmit,originalArgs){let[event,eventData]=originalArgs;return Array.isArray(eventData)||(eventData=[eventData]),!isActive||cls.getCurrentSpan()||"data"!==event&&"error"!==event?originalEmit.apply(ctx,originalArgs):void eventData.forEach(messageData=>{var _traceId,_longTraceId,_parentSpanId,instanaHeaders=(messageData.headers||[]).filter(headerObject=>{headerObject=Object.keys(headerObject)[0].toUpperCase();return constants.allInstanaKafkaHeaders.includes(headerObject)});const instanaHeadersAsObject={};instanaHeaders.forEach(instanaHeader=>{var key=Object.keys(instanaHeader)[0].toUpperCase();instanaHeadersAsObject[key]=instanaHeader[key]});let traceId,longTraceId,parentSpanId,level;instanaHeaders.length&&(({level:instanaHeaders,traceId:_traceId,longTraceId:_longTraceId,parentSpanId:_parentSpanId}=findInstanaHeaderValues(instanaHeadersAsObject)),traceId=_traceId,longTraceId=_longTraceId,parentSpanId=_parentSpanId,level=instanaHeaders,removeInstanaHeadersFromMessage(messageData)),cls.ns.runAndReturn(function(){if(level&&"0"===level)cls.setTracingLevel("0");else{const span=cls.startSpan("kafka",constants.ENTRY,traceId,parentSpanId);longTraceId&&(span.lt=longTraceId),span.stack=tracingUtil.getStackTrace(instrumentedConsumerEmit,1),span.data.kafka={access:"consume",service:messageData.topic||"empty"},"error"===event&&(delete messageData.headers,span.ec=1,span.data.kafka.error=messageData.message),setImmediate(()=>{span.d=Date.now()-span.ts,span.transmit()})}return originalEmit.apply(ctx,originalArgs)})})}function readTraceLevelBinary(instanaHeadersAsObject){if(instanaHeadersAsObject[constants.kafkaLegacyTraceLevelHeaderName]){instanaHeadersAsObject=instanaHeadersAsObject[constants.kafkaLegacyTraceLevelHeaderName];if(Buffer.isBuffer(instanaHeadersAsObject)&&1<=instanaHeadersAsObject.length)return String(instanaHeadersAsObject.readInt8())}return"1"}function addTraceContextHeader(headers,span){return traceCorrelationEnabled&&(null==headers?headers=[{[constants.kafkaTraceIdHeaderName]:leftPad(span.t,32)},{[constants.kafkaSpanIdHeaderName]:span.s},{[constants.kafkaTraceLevelHeaderName]:"1"}]:headers&&Array.isArray(headers)&&(headers.push({[constants.kafkaTraceIdHeaderName]:leftPad(span.t,32)}),headers.push({[constants.kafkaSpanIdHeaderName]:span.s}),headers.push({[constants.kafkaTraceLevelHeaderName]:"1"}))),headers}function addTraceLevelSuppression(headers){return traceCorrelationEnabled&&(null==headers?headers=[{[constants.kafkaTraceLevelHeaderName]:"0"}]:headers&&Array.isArray(headers)&&headers.push({[constants.kafkaTraceLevelHeaderName]:"0"})),headers}function findInstanaHeaderValues(instanaHeadersAsObject){let traceId,longTraceId,parentSpanId,level;var limited;return instanaHeadersAsObject[constants.kafkaTraceIdHeaderName]&&(traceId=String(instanaHeadersAsObject[constants.kafkaTraceIdHeaderName]))&&(limited=limitTraceId({traceId:traceId}),traceId=limited.traceId,longTraceId=limited.longTraceId),instanaHeadersAsObject[constants.kafkaSpanIdHeaderName]&&(parentSpanId=String(instanaHeadersAsObject[constants.kafkaSpanIdHeaderName])),instanaHeadersAsObject[constants.kafkaTraceLevelHeaderName]&&(level=String(instanaHeadersAsObject[constants.kafkaTraceLevelHeaderName])),null==traceId&&null==parentSpanId&&null==level&&(instanaHeadersAsObject[constants.kafkaLegacyTraceContextHeaderName]&&(limited=instanaHeadersAsObject[constants.kafkaLegacyTraceContextHeaderName],Buffer.isBuffer(limited))&&24===limited.length&&(limited=tracingUtil.readTraceContextFromBuffer(limited),traceId=limited.t,parentSpanId=limited.s),level=readTraceLevelBinary(instanaHeadersAsObject)),{level:level,traceId:traceId,longTraceId:longTraceId,parentSpanId:parentSpanId}}exports.init=function(config){requireHook.onFileLoad(/\/node-rdkafka\/lib\/producer\.js/,instrumentProducer),requireHook.onFileLoad(/\/node-rdkafka\/lib\/kafka-consumer-stream\.js/,instrumentConsumerAsStream),requireHook.onModuleLoad("node-rdkafka",instrumentConsumer),traceCorrelationEnabled=config.tracing.kafka.traceCorrelation,logWarningForKafkaHeaderFormat(config.tracing.kafka.headerFormat)},exports.updateConfig=function(config){traceCorrelationEnabled=config.tracing.kafka.traceCorrelation,logWarningForKafkaHeaderFormat(config.tracing.kafka.headerFormat)},exports.activate=function(extraConfig){extraConfig&&extraConfig.tracing&&extraConfig.tracing.kafka&&(null!=extraConfig.tracing.kafka.traceCorrelation&&(traceCorrelationEnabled=extraConfig.tracing.kafka.traceCorrelation),logWarningForKafkaHeaderFormat(extraConfig.tracing.kafka.headerFormat)),isActive=!0},exports.deactivate=function(){isActive=!1};