Repository URL to install this package:
|
Version:
0.7.0 ▾
|
// (c) Copyright 2023 Supertenant Ltd. - all rights reserved.
// See LICENSE file in project root for license terms.
"use strict";const shimmer=require("shimmer"),cls=require("../../cls"),{ENTRY,EXIT}=require("../../constants"),requireHook=require("../../../util/requireHook"),tracingUtil=require("../../tracingUtil"),getFunctionArguments=require("../../../util/function_arguments")["getFunctionArguments"];let logger=require("../../../logger").getLogger("tracing/bull",newLogger=>{logger=newLogger}),isActive=!1;function instrumentBull(Bull){shimmer.wrap(Bull.Job,"create",shimJobCreate),shimmer.wrap(Bull.Job,"createBulk",shimJobCreateBulk),shimmer.wrap(Bull.prototype,"processJob",shimProcessJob)}function shimJobCreate(originalJobCreate){return function(){var originalArgs=getFunctionArguments(arguments),options=originalArgs[3],repeatableJob=options&&"string"==typeof options.jobId&&0===options.jobId.indexOf("repeat"),repeatableJobIsSuppressed=repeatableJob&&"0"===options.X_SUPERTENANT_L,skipIsTracing=!!repeatableJob,parentSpan=cls.getCurrentSpan(),skipIsTracing=cls.skipExitTracing({isActive:isActive,extendedResponse:!0,skipParentSpanCheck:!0,skipIsTracing:skipIsTracing});return skipIsTracing.skip||skipIsTracing.isExitSpan||!parentSpan&&!repeatableJob||repeatableJobIsSuppressed?((skipIsTracing.suppressed||repeatableJobIsSuppressed)&&propagateSuppression(options),originalJobCreate.apply(this,arguments)):instrumentedJobCreate(this,originalJobCreate,originalArgs,options)}}function instrumentedJobCreate(ctx,originalJobCreate,originalArgs,options){const queueName=originalArgs[0]&&originalArgs[0].name||"name not found";return cls.ns.runAndReturn(()=>{const span=cls.startSpan(exports.spanName,EXIT);return span.ts=Date.now(),span.stack=tracingUtil.getStackTrace(instrumentedJobCreate,1),span.data.bull={sort:"exit",queue:queueName},propagateTraceContext(options,span),originalJobCreate.apply(ctx,originalArgs).then(job=>(finishSpan(null,job,span),job)).catch(err=>(finishSpan(err,null,span),err))})}function shimJobCreateBulk(originalJobCreateBulk){return function(){var originalArgs=getFunctionArguments(arguments),skipTracingResult=cls.skipExitTracing({isActive:isActive,extendedResponse:!0});return skipTracingResult.skip?(skipTracingResult.suppressed&&(originalArgs[1]||[]).forEach(job=>{propagateSuppression(job.opts)}),originalJobCreateBulk.apply(this,originalArgs)):instrumentedJobCreateBulk(this,originalJobCreateBulk,originalArgs)}}function instrumentedJobCreateBulk(ctx,originalJobCreateBulk,originalArgs){const queueName=originalArgs[0]&&originalArgs[0].name||"name not found";return(originalArgs[1]||[]).forEach(job=>{cls.ns.run(()=>{var span=cls.startSpan(exports.spanName,EXIT),options=(span.ts=Date.now(),span.stack=tracingUtil.getStackTrace(instrumentedJobCreateBulk,2),span.data.bull={sort:"exit",queue:queueName},job.opts);propagateTraceContext(options,span),finishSpan(null,job.data,span)})}),originalJobCreateBulk.apply(ctx,originalArgs)}function shimProcessJob(originalProcessJob){return function(){var originalArgs;return isActive?(originalArgs=getFunctionArguments(arguments),instrumentedProcessJob(this,originalProcessJob,originalArgs)):originalProcessJob.apply(this,arguments)}}function instrumentedProcessJob(ctx,originalProcessJob,originalArgs){const job=originalArgs[0];if(!job)return originalProcessJob.apply(ctx,originalArgs);const options=job.opts||{},jobId=options.jobId,queueName=job.queue&&job.queue.name;return cls.ns.runPromise(()=>{let attributes={};if(options[jobId]?(attributes=options[jobId],delete options[jobId],options.repeat&&removeOriginalJobInstanaData(options)):options.instanaTracingContext&&(attributes=options.instanaTracingContext,delete options.instanaTracingContext),"0"===options.X_SUPERTENANT_L)return cls.setTracingLevel("0"),delete options.X_SUPERTENANT_L,originalProcessJob.apply(ctx,originalArgs);delete options.X_SUPERTENANT_L;var spanT=attributes.X_SUPERTENANT_T,spanP=attributes.X_SUPERTENANT_S,parentSpan=cls.getCurrentSpan();if(parentSpan&&parentSpan.p===spanP&&parentSpan.t===spanT||parentSpan&&"bull"!==parentSpan.n)return logger.warn("Cannot start a Bull entry span when another span is already active: "+JSON.stringify(parentSpan)),originalProcessJob.apply(ctx,originalArgs);const span=cls.startSpan(exports.spanName,ENTRY,spanT,spanP);return span.ts=Date.now(),span.stack=tracingUtil.getStackTrace(instrumentedProcessJob,1),span.data.bull={sort:"entry",queue:queueName},originalProcessJob.apply(ctx,originalArgs).then(data=>(finishSpan(job.failedReason,data,span),data)).catch(err=>{throw addErrorToSpan(err,span),finishSpan(null,null,span),err}).finally(()=>{delete options.X_SUPERTENANT_L})})}function removeOriginalJobInstanaData(options){Object.keys(options).filter(key=>0===key.indexOf("repeat")).forEach(k=>{delete options[k]})}function propagateSuppression(options){options.X_SUPERTENANT_L="0"}function propagateTraceContext(options,span){options.X_SUPERTENANT_L="1",options.jobId?options[options.jobId]={X_SUPERTENANT_T:span.t,X_SUPERTENANT_S:span.s}:options.instanaTracingContext={X_SUPERTENANT_T:span.t,X_SUPERTENANT_S:span.s}}function finishSpan(err,data,span){err&&addErrorToSpan(err,span),"string"==typeof data&&(span.data.bull.messageId=data),span.d=Date.now()-span.ts,span.transmit()}function addErrorToSpan(err,span){err&&(span.ec=1,err.code?span.data.bull.error=err.code:"string"==typeof err&&(span.data.bull.error=err))}exports.spanName="bull",exports.init=function(){requireHook.onModuleLoad("bull",instrumentBull)},exports.activate=function(){isActive=!0},exports.deactivate=function(){isActive=!1};