Repository URL to install this package:
|
Version:
0.7.10 ▾
|
// (c) Copyright 2023 Supertenant Ltd. - all rights reserved.
// See LICENSE file in project root for license terms.
"use strict";const tracingMetrics=require("./metrics");let logger;logger=require("../logger").getLogger("tracing/spanBuffer",newLogger=>{logger=newLogger});const batchableSpanNames=[];let downstreamConnection=null,isActive=!1,activatedAt=null,minDelayBeforeSendingSpans=1e3;null!=process.env.INSTANA_DEV_MIN_DELAY_BEFORE_SENDING_SPANS&&(minDelayBeforeSendingSpans=parseInt(process.env.INSTANA_DEV_MIN_DELAY_BEFORE_SENDING_SPANS,10),isNaN(minDelayBeforeSendingSpans))&&(minDelayBeforeSendingSpans=1e3);let initialDelayBeforeSendingSpans,transmissionDelay,maxBufferedSpans,forceTransmissionStartingAt,transmissionTimeoutHandle,preActivationCleanupIntervalHandle,isFaaS,transmitImmediate,spans=[],batchThreshold=10,batchingEnabled=!1;null!=process.env.INSTANA_DEV_BATCH_THRESHOLD&&(batchThreshold=parseInt(process.env.INSTANA_DEV_BATCH_THRESHOLD,10),isNaN(batchThreshold))&&(batchThreshold=10);const batchBucketWidth=18,batchingBuckets=new Map;function addToBatch(span){var bucketsForTrace,key;return!!batchingBuckets.has(span.t)&&(!!findBatchPartnerAndMerge(span,bucketsForTrace=batchingBuckets.get(span.t),key=batchingBucketKey(span))||findBatchPartnerAndMerge(span,bucketsForTrace,key-batchBucketWidth))}function findBatchPartnerAndMerge(newSpan,bucketsForTrace,bucketKey){var bucket=bucketsForTrace.get(bucketKey);if(bucket)for(let i=0;i<bucket.length;i++){var bufferedSpan=bucket[i];if(bufferedSpan.t===newSpan.t&&bufferedSpan.p===newSpan.p&&bufferedSpan.n===newSpan.n&&(newSpan.ts>=bufferedSpan.ts?newSpan.ts<bufferedSpan.ts+bufferedSpan.d+batchThreshold:bufferedSpan.ts<newSpan.ts+newSpan.d+batchThreshold))return mergeSpansAsBatch(bufferedSpan,newSpan,bucket,bucketKey,i),!0}return!1}function mergeSpansAsBatch(oldSpan,newSpan,bucket,bucketKey,indexInBucket){let mustSwap;var indexInSpanBuffer;(mustSwap=newSpan.ec>oldSpan.ec||newSpan.ec===oldSpan.ec&&newSpan.d>oldSpan.d||newSpan.ec===oldSpan.ec&&newSpan.d===oldSpan.d&&newSpan.ts<oldSpan.ts?!0:mustSwap)?(0<=(indexInSpanBuffer=spans.indexOf(oldSpan))&&(spans[indexInSpanBuffer]=newSpan),mergeIntoTargetSpan(bucket[indexInBucket]=newSpan,oldSpan,bucketKey)):mergeIntoTargetSpan(oldSpan,newSpan,bucketKey)}function mergeIntoTargetSpan(target,source,originalBucketKey){target.b=target.b||{},null!=target.b.d&&source.b&&null!=source.b.d?target.b.d+=source.b.d:null!=target.b.d?target.b.d+=source.d:source.b&&null!=source.b.d?target.b.d=target.d+source.b.d:target.b.d=target.d+source.d;var latestEnd=Math.max(target.ts+target.d,source.ts+source.d),latestEnd=(target.ts=Math.min(target.ts,source.ts),target.d=latestEnd-target.ts,target.ec+=source.ec,setBatchSize(target,source),batchingBucketKey(target));originalBucketKey!==latestEnd&&addToBucket(target,latestEnd)}function setBatchSize(target,source){target.b&&target.b.s&&source.b&&source.b.s?target.b.s+=source.b.s:target.b&&target.b.s?target.b.s+=1:source.b&&source.b.s?target.b.s=source.b.s+1:target.b.s=2}function addToBucket(span,preComputedBucketKey){preComputedBucketKey=preComputedBucketKey||batchingBucketKey(span);batchingBuckets.has(span.t)||batchingBuckets.set(span.t,new Map),batchingBuckets.get(span.t).has(preComputedBucketKey)||batchingBuckets.get(span.t).set(preComputedBucketKey,[]),batchingBuckets.get(span.t).get(preComputedBucketKey).push(span)}function batchingBucketKey(span){span=span.ts+span.d;return span-span%batchBucketWidth}function isBatchable(span){return span.d<batchThreshold&&span.p&&batchableSpanNames.includes(span.n)}function transmitSpans(){if(clearTimeout(transmissionTimeoutHandle),0===spans.length)isFaaS||(transmissionTimeoutHandle=setTimeout(transmitSpans,transmissionDelay)).unref();else{const spansToSend=spans;spans=[],batchingBuckets.clear(),downstreamConnection.sendSpans(spansToSend,function(error){error&&(logger.warn(`Failed to transmit spans, will retry in ${transmissionDelay} ms.`,error.message),spans=spans.concat(spansToSend),removeSpansIfNecessary()),isFaaS||(transmissionTimeoutHandle=setTimeout(transmitSpans,transmissionDelay)).unref()})}}function removeSpansIfNecessary(){var droppedCount;spans.length>maxBufferedSpans&&(droppedCount=spans.length-maxBufferedSpans,logger.warn(`Span buffer is over capacity, dropping ${droppedCount} spans.`),tracingMetrics.incrementDropped(spans.length-maxBufferedSpans),spans=spans.slice(-maxBufferedSpans))}exports.init=function(config,_downstreamConnection){downstreamConnection=_downstreamConnection,maxBufferedSpans=config.tracing.maxBufferedSpans,forceTransmissionStartingAt=config.tracing.forceTransmissionStartingAt,transmissionDelay=config.tracing.transmissionDelay,batchingEnabled=config.tracing.spanBatchingEnabled,initialDelayBeforeSendingSpans=Math.max(transmissionDelay,minDelayBeforeSendingSpans),isFaaS=!1,transmitImmediate=!1,config.tracing.activateImmediately&&(preActivationCleanupIntervalHandle=setInterval(()=>{removeSpansIfNecessary()},transmissionDelay)).unref()},exports.activate=function(extraConfig){downstreamConnection?downstreamConnection.sendSpans?"function"!=typeof downstreamConnection.sendSpans?logger.error("downstreamConnection.sendSpans is not a function."):(extraConfig&&extraConfig.tracing&&extraConfig.tracing.spanBatchingEnabled&&(batchingEnabled=!0),isActive=!0,null==activatedAt&&(activatedAt=Date.now()),spans=[],batchingBuckets.clear(),isFaaS||(transmissionTimeoutHandle=setTimeout(transmitSpans,initialDelayBeforeSendingSpans)).unref(),preActivationCleanupIntervalHandle&&clearInterval(preActivationCleanupIntervalHandle)):logger.error('Configured downstreamConnection has no attribute "sendSpans".'):logger.error("No downstreamConnection has been set.")},exports.deactivate=function(){isActive=!1,spans=[],batchingBuckets.clear(),clearTimeout(transmissionTimeoutHandle)},exports.addBatchableSpanName=function(spanName){batchableSpanNames.includes(spanName)||batchableSpanNames.push(spanName)},exports.addSpan=function(span){var spanIsBatchable;isActive&&(null==span.t?logger.warn("Span of type %s has no trace ID. Not transmitting this span",span.n):transmitImmediate?(spans.push(span),transmitSpans()):(spanIsBatchable=batchingEnabled&&isBatchable(span))&&addToBatch(span)||(spans.push(span),spanIsBatchable&&addToBucket(span),spans.length>=forceTransmissionStartingAt&&Date.now()-minDelayBeforeSendingSpans>activatedAt&&transmitSpans()))},exports.getAndResetSpans=function(){var spansToSend=spans;return spans=[],batchingBuckets.clear(),spansToSend},exports.isEmpty=function(){return 0===spans.length},exports.setTransmitImmediate=function(val){transmitImmediate=val},exports.setIsFaaS=function(val){isFaaS=val};