Repository URL to install this package:
|
Version:
0.6.13 ▾
|
// (c) Copyright 2023 Supertenant Ltd. - all rights reserved.
// See LICENSE file in project root for license terms.
"use strict";const shimmer=require("shimmer"),requireHook=require("../../../util/requireHook"),tracingUtil=require("../../tracingUtil"),constants=require("../../constants"),cls=require("../../cls");let logger,isActive=(logger=require("../../../logger").getLogger("tracing/kafka-node",newLogger=>{logger=newLogger}),!1);function instrument(kafka){shimmer.wrap(Object.getPrototypeOf(kafka.Producer.prototype),"send",shimSend),shimmer.wrap(kafka.Consumer.prototype,"emit",shimEmit),kafka.HighLevelConsumer?shimmer.wrap(kafka.HighLevelConsumer.prototype,"emit",shimEmit):shimmer.wrap(kafka.ConsumerGroup.prototype,"emit",shimEmit)}function shimSend(original){return function(){return instrumentedSend(this,original,arguments[0],arguments[1])}}function instrumentedSend(ctx,originalSend,produceRequests,cb){const args=[produceRequests];return cls.skipExitTracing({isActive:isActive})||!produceRequests||0===produceRequests.length?(cb&&args.push(cb),originalSend.apply(ctx,args)):cls.ns.runAndReturn(()=>{const span=cls.startSpan("kafka",constants.EXIT);var produceRequest=produceRequests[0];return span.b={s:produceRequests.length},span.stack=tracingUtil.getStackTrace(instrumentedSend),span.data.kafka={service:produceRequest.topic,access:"send"},args.push(cls.ns.bind(function(err){if(err&&(span.ec=1,span.data.kafka.error=err.message),span.d=Date.now()-span.ts,span.transmit(),cb)return cb.apply(this,arguments)})),originalSend.apply(ctx,args)})}function shimEmit(original){return function(eventType,message){if(!isActive||"message"!==eventType)return original.apply(this,arguments);const originalThis=this,originalArgs=arguments;eventType=cls.getCurrentSpan();return eventType?(logger.warn("Cannot start a Kafka entry span when another span is already active. Currently, the following span is active: "+JSON.stringify(eventType)),original.apply(originalThis,originalArgs)):cls.ns.runAndReturn(()=>{const span=cls.startSpan("kafka",constants.ENTRY);span.stack=[],span.data.kafka={access:"consume",service:message.topic};try{return original.apply(originalThis,originalArgs)}finally{setImmediate(()=>{span.d=Date.now()-span.ts,span.transmit()})}})}}exports.init=function(){requireHook.onModuleLoad("kafka-node",instrument)},exports.activate=function(){isActive=!0},exports.deactivate=function(){isActive=!1};