Repository URL to install this package:
|
Version:
0.7.10 ▾
|
// (c) Copyright 2023 Supertenant Ltd. - all rights reserved.
// See LICENSE file in project root for license terms.
"use strict";const shimmer=require("shimmer"),requireHook=require("../../../util/requireHook"),tracingUtil=require("../../tracingUtil"),constants=require("../../constants"),cls=require("../../cls"),superbrain=require("@supertenant/superbrain")["superbrain"],superconsts=require("@supertenant/superconsts"),getOrCreateTask=require("../../taskManager")["getOrCreateTask"],{getWaitPollInterval,getMongodbRejectActionResult,getDelayDuration}=require("../../actions"),dedupReportError=require("../../../logger")["dedupReportError"],DefaultErrorConstants={11601:"Interrupted",262:"ExceededTimeLimit"};let ErrorConstants={},isActive=(Object.assign(ErrorConstants,DefaultErrorConstants),!0);const commands=["aggregate","count","delete","distinct","find","findAndModify","findandmodify","getMore","getmore","insert","update"];function instrumentCmapConnection(connection){connection.Connection&&connection.Connection.prototype&&(shimmer.wrap(connection.Connection.prototype,"query",shimCmapQuery),shimmer.wrap(connection.Connection.prototype,"command",shimCmapCommand),["insert","update","remove"].forEach(fnName=>{connection.Connection.prototype[fnName]&&shimmer.wrap(connection.Connection.prototype,fnName,shimCmapMethod)}),shimmer.wrap(connection.Connection.prototype,"getMore",shimCmapGetMore))}function shimCmapQuery(original){return function(){if(cls.skipExitTracing({isActive:isActive}))return original.apply(this,arguments);var originalArgs=new Array(arguments.length);for(let i=0;i<arguments.length;i++)originalArgs[i]=arguments[i];return instrumentedCmapQuery(this,original,originalArgs)}}function shimCmapCommand(original){return function(){if(cls.skipExitTracing({isActive:isActive}))return original.apply(this,arguments);var command=arguments[1]&&commands.find(c=>arguments[1][c]);if(!command)return original.apply(this,arguments);var originalArgs=new Array(arguments.length);for(let i=0;i<arguments.length;i++)originalArgs[i]=arguments[i];return instrumentedCmapMethod(this,original,originalArgs,command)}}function shimCmapMethod(original){return function(){if(cls.skipExitTracing({isActive:isActive}))return original.apply(this,arguments);var originalArgs=new Array(arguments.length);for(let i=0;i<arguments.length;i++)originalArgs[i]=arguments[i];return instrumentedCmapMethod(this,original,originalArgs,original.name)}}function shimCmapGetMore(original){return function(){if(cls.skipExitTracing({isActive:isActive}))return original.apply(this,arguments);var originalArgs=new Array(arguments.length);for(let i=0;i<arguments.length;i++)originalArgs[i]=arguments[i];return instrumentedCmapGetMore(this,original,originalArgs)}}function instrumentedCmapQuery(ctx,originalQuery,originalArgs){const{originalCallback,callbackIndex}=findCallback(originalArgs);return callbackIndex<0?originalQuery.apply(ctx,originalArgs):cls.ns.runAndReturn(()=>{var span=cls.startSpan(exports.spanName,constants.EXIT),namespace=originalArgs[0],splitNamespace=dbAndCollectionFromNamespace(namespace),cmd=originalArgs[1];let command;cmd&&(command=findCommand(cmd));let service,hostname,port;ctx.address&&(service=ctx.address,hostnameAndPort=splitIntoHostAndPort(ctx.address),hostname=hostnameAndPort.hostname,port=hostnameAndPort.port,span.data.peer=hostnameAndPort);var hostnameAndPort={command:command,service:service,namespace:namespace},namespace=(span.data.mongo=hostnameAndPort,readJsonOrFilter(cmd,span)),hostnameAndPort=namespace.stringJson,cmd=namespace.filter;let openSpanResult=null;var stSpanData,namespace=getOrCreateTask(),splitNamespace=(0!==namespace&&((stSpanData={})[superconsts.Label.SupertenantResourceType]=superconsts.ResourceType.Mongodb,stSpanData[superconsts.Label.IntegrationModuleResourceId]=hostname,stSpanData[superconsts.Label.DbDatabase]=splitNamespace.db,stSpanData[superconsts.Label.DbMongodbCollection]=splitNamespace.collection,stSpanData[superconsts.Label.DbHost]=hostname,stSpanData[superconsts.Label.DbPort]=port.toString(),stSpanData[superconsts.Label.DbCommand]=command,stSpanData[superconsts.Label.DbMongodbCommandJson]=hostnameAndPort,stSpanData[superconsts.Label.DbMongodbFilter]=cmd,openSpanResult=superbrain.openSpan(namespace,superconsts.SpanType.ClientRequest,stSpanData)),createWrappedCallback(span,openSpanResult,originalCallback));if(originalArgs[callbackIndex]=splitNamespace,!openSpanResult)return originalQuery.apply(ctx,originalArgs);callbackHandleAction(openSpanResult,ctx,originalQuery,originalArgs,splitNamespace)})}function instrumentedCmapMethod(ctx,originalMethod,originalArgs,command){const{originalCallback,callbackIndex}=findCallback(originalArgs);return callbackIndex<0?originalMethod.apply(ctx,originalArgs):cls.ns.runAndReturn(()=>{var span=cls.startSpan(exports.spanName,constants.EXIT),namespace=originalArgs[0],splitNamespace=dbAndCollectionFromNamespace(namespace);let service,hostname,port;ctx.address&&(service=ctx.address,hostnameAndPort=splitIntoHostAndPort(ctx.address),hostname=hostnameAndPort.hostname,port=hostnameAndPort.port,span.data.peer=hostnameAndPort);var hostnameAndPort={command:command,service:service,namespace:namespace};span.data.mongo=hostnameAndPort;let json,filter,openSpanResult=(command&&command.indexOf("insert")<0&&(namespace=readJsonOrFilter(originalArgs[1],span),json=namespace.stringJson,filter=namespace.filter),null);hostnameAndPort=getOrCreateTask(),0!==hostnameAndPort&&((namespace={})[superconsts.Label.SupertenantResourceType]=superconsts.ResourceType.Mongodb,namespace[superconsts.Label.IntegrationModuleResourceId]=hostname,namespace[superconsts.Label.DbDatabase]=splitNamespace.db,namespace[superconsts.Label.DbMongodbCollection]=splitNamespace.collection,namespace[superconsts.Label.DbHost]=hostname,namespace[superconsts.Label.DbPort]=port.toString(),namespace[superconsts.Label.DbCommand]=command,namespace[superconsts.Label.DbMongodbCommandJson]=json,namespace[superconsts.Label.DbMongodbFilter]=filter,openSpanResult=superbrain.openSpan(hostnameAndPort,superconsts.SpanType.ClientRequest,namespace)),splitNamespace=createWrappedCallback(span,openSpanResult,originalCallback);if(originalArgs[callbackIndex]=splitNamespace,!openSpanResult)return originalMethod.apply(ctx,originalArgs);callbackHandleAction(openSpanResult,ctx,originalMethod,originalArgs,splitNamespace)})}function instrumentedCmapGetMore(ctx,originalMethod,originalArgs){const{originalCallback,callbackIndex}=findCallback(originalArgs);return callbackIndex<0?originalMethod.apply(ctx,originalArgs):cls.ns.runAndReturn(()=>{var span=cls.startSpan(exports.spanName,constants.EXIT),namespace=originalArgs[0],splitNamespace=dbAndCollectionFromNamespace(namespace);let service,hostname,port;ctx.address&&(service=ctx.address,hostnameAndPort=splitIntoHostAndPort(ctx.address),hostname=hostnameAndPort.hostname,port=hostnameAndPort.port,span.data.peer=hostnameAndPort);var hostnameAndPort={command:"getMore",service:service,namespace:namespace};span.data.mongo=hostnameAndPort;let openSpanResult=null;namespace=getOrCreateTask(),0!==namespace&&((hostnameAndPort={})[superconsts.Label.SupertenantResourceType]=superconsts.ResourceType.Mongodb,hostnameAndPort[superconsts.Label.IntegrationModuleResourceId]=hostname,hostnameAndPort[superconsts.Label.DbDatabase]=splitNamespace.db,hostnameAndPort[superconsts.Label.DbMongodbCollection]=splitNamespace.collection,hostnameAndPort[superconsts.Label.DbHost]=hostname,hostnameAndPort[superconsts.Label.DbPort]=port,hostnameAndPort[superconsts.Label.DbCommand]="getMore",openSpanResult=superbrain.openSpan(namespace,superconsts.SpanType.ClientRequest,hostnameAndPort)),splitNamespace=createWrappedCallback(span,openSpanResult,originalCallback);if(originalArgs[callbackIndex]=splitNamespace,!openSpanResult)return originalMethod.apply(ctx,originalArgs);callbackHandleAction(openSpanResult,ctx,originalMethod,originalArgs,splitNamespace)})}function instrumentLegacyTopologyPool(Pool){shimmer.wrap(Pool.prototype,"write",shimLegacyWrite)}function shimLegacyWrite(original){return function(){if(cls.skipExitTracing({isActive:isActive}))return original.apply(this,arguments);var originalArgs=new Array(arguments.length);for(let i=0;i<arguments.length;i++)originalArgs[i]=arguments[i];return instrumentedLegacyWrite(this,original,originalArgs)}}function instrumentedLegacyWrite(ctx,originalWrite,originalArgs){const{originalCallback,callbackIndex}=findCallback(originalArgs);return callbackIndex<0?originalWrite.apply(ctx,originalArgs):cls.ns.runAndReturn(()=>{var span=cls.startSpan(exports.spanName,constants.EXIT);let hostname,port,service,command,database,collection,namespace;var message=originalArgs[0];if(message&&"object"==typeof message){message.options&&message.options.session&&message.options.session.topology&&message.options.session.topology.s&&message.options.session.topology.s&&(hostname=message.options.session.topology.s.host,port=message.options.session.topology.s.port),hostname&&port||!ctx.options||(hostname=hostname||ctx.options.host,port=port||ctx.options.port);let cmdObj=message.command;(cmdObj=cmdObj||message.query)&&(collection=(collection=cmdObj.collection?cmdObj.collection:collection)||findCollection(cmdObj),command=findCommand(cmdObj),database=cmdObj.$db),database||"string"!=typeof message.ns||(database=message.ns.split(".")[0])}database&&collection?namespace=database+"."+collection:database?namespace=database+".?":collection&&(namespace="?."+collection),(hostname||port)&&(span.data.peer={hostname:hostname,port:port}),hostname&&port?service=hostname+":"+port:hostname?service=hostname+":27017":port&&(service="?:27017");var spanData={command:command,service:service,namespace:namespace};span.data.mongo=spanData;let json,filter;spanData=readJsonOrFilterFromMessage(message,span);spanData&&(json=spanData.stringJson,filter=spanData.filter);let openSpanResult=null;message=getOrCreateTask(),0!==message&&((spanData={})[superconsts.Label.SupertenantResourceType]=superconsts.ResourceType.Mongodb,spanData[superconsts.Label.IntegrationModuleResourceId]=hostname,spanData[superconsts.Label.DbDatabase]=database,spanData[superconsts.Label.DbMongodbCollection]=collection,spanData[superconsts.Label.DbHost]=hostname,spanData[superconsts.Label.DbPort]=port,spanData[superconsts.Label.DbCommand]=command,spanData[superconsts.Label.DbMongodbCommandJson]=json,spanData[superconsts.Label.DbMongodbFilter]=filter,openSpanResult=superbrain.openSpan(message,superconsts.SpanType.ClientRequest,spanData)),message=createWrappedCallback(span,openSpanResult,originalCallback);if(originalArgs[callbackIndex]=message,!openSpanResult)return originalWrite.apply(ctx,originalArgs);callbackHandleAction(openSpanResult,ctx,originalWrite,originalArgs,message)})}function callbackHandleAction(openSpanResult,ctx,originalFunction,originalArgs,resultCallback,firstCall=!0){let actionRef;if((actionRef=firstCall?superbrain.getAction(openSpanResult.action):superbrain.pollSpanAction(openSpanResult.openSpanResult)).Action===superconsts.Action.Execute)openSpanResult.canceled||(openSpanResult.executed=!0,originalFunction.apply(ctx,originalArgs));else if(actionRef.Action===superconsts.Action.Wait)firstCall=getWaitPollInterval(actionRef),setTimeout(cls.ns.bind(function(){callbackHandleAction(openSpanResult,ctx,originalFunction,originalArgs,resultCallback,!1)}),firstCall).unref();else{if(actionRef.Action===superconsts.Action.Delay){const delayDuration=getDelayDuration(actionRef);return new Promise(resolve=>setTimeout(resolve,delayDuration).unref()).then(cls.ns.bind(()=>{openSpanResult.canceled||(openSpanResult.executed=!0,originalFunction.apply(ctx,originalArgs))}))}actionRef.Action===superconsts.Action.Reject?resultCallback(rejectToError(getMongodbRejectActionResult(actionRef))):(dedupReportError("httpServer:UNKNOWN_ACTION","received unknown action ID",actionRef),originalFunction.apply(ctx,originalArgs))}}function rejectToError(rejectActionResult){var err=new Error(rejectActionResult.error_message);return err.ok=0,err.code=rejectActionResult.code,err.codeName=ErrorConstants[rejectActionResult.code],err.message=rejectActionResult.error_message,err}function findCallback(originalArgs){let originalCallback,callbackIndex=-1;for(let i=1;i<originalArgs.length;i++)if("function"==typeof originalArgs[i]){originalCallback=originalArgs[i],callbackIndex=i;break}return{originalCallback:originalCallback,callbackIndex:callbackIndex}}function findCollection(cmdObj){for(let j=0;j<commands.length;j++)if(cmdObj[commands[j]]&&"string"==typeof cmdObj[commands[j]])return cmdObj[commands[j]]}function findCommand(cmdObj){for(let j=0;j<commands.length;j++)if(cmdObj[commands[j]])return commands[j]}function splitIntoHostAndPort(address){var idx;if("string"==typeof address)return 0<=address.indexOf(":")?(idx=address.indexOf(":"),{hostname:address.substring(0,idx),port:address.substring(idx+1)}):{hostname:address}}function readJsonOrFilterFromMessage(message,span){if(message){let cmdObj=message.command;if(cmdObj=cmdObj||message.query)return readJsonOrFilter(cmdObj,span)}}function readJsonOrFilter(cmdObj,span){let json,stringJson=(Array.isArray(cmdObj)&&1<=cmdObj.length?json=cmdObj:Array.isArray(cmdObj.updates)&&1<=cmdObj.updates.length?json=cmdObj.updates:Array.isArray(cmdObj.deletes)&&1<=cmdObj.deletes.length?json=cmdObj.deletes:Array.isArray(cmdObj.pipeline)&&1<=cmdObj.pipeline.length&&(json=cmdObj.pipeline),""),filter=(json?(stringJson=stringifyWhenNecessary(json),span.data.mongo.json=stringJson):(cmdObj.filter||cmdObj.query)&&(span.data.mongo.filter=stringifyWhenNecessary(cmdObj.filter||cmdObj.query)),"");return(cmdObj.filter||cmdObj.query)&&(filter=stringifyWhenNecessary(cmdObj.filter||cmdObj.query)),{stringJson:stringJson,filter:filter}}function stringifyWhenNecessary(obj){if(null!=obj)return"string"==typeof obj?tracingUtil.shortenDatabaseStatement(obj):tracingUtil.shortenDatabaseStatement(JSON.stringify(obj))}function createWrappedCallback(span,openSpanResult,originalCallback){return cls.ns.bind(function(error){var stSpanData={};return error&&(span.ec=1,span.data.mongo.error=tracingUtil.getErrorDetails(error),stSpanData[superconsts.Label.SupertenantError]="true"),span.d=Date.now()-span.ts,span.transmit(),null!=openSpanResult&&(openSpanResult.executed||(openSpanResult.canceled=!0,stSpanData[superconsts.Label.SupertenantCanceled]="true"),superbrain.closeSpan(openSpanResult.spanId,stSpanData)),originalCallback.apply(this,arguments)})}function dbAndCollectionFromNamespace(namespace){let db,collection;var idx;return null!=namespace&&("string"==typeof namespace?0<=(idx=namespace.indexOf("."))?(db=namespace.substring(0,idx),collection=namespace.substring(idx+1)):db=namespace:(db=namespace.db,collection=namespace.collection)),{db:db,collection:collection}}exports.spanName="mongo",exports.batchable=!0,exports.init=function(){requireHook.onFileLoad(/\/mongodb\/lib\/cmap\/connection.js/,instrumentCmapConnection),requireHook.onFileLoad(/\/mongodb\/lib\/core\/connection\/pool.js/,instrumentLegacyTopologyPool),requireHook.onFileLoad(/\/mongodb-core\/lib\/connection\/pool.js/,instrumentLegacyTopologyPool)},exports.activate=function(){isActive=!0},exports.deactivate=function(){isActive=!1};