Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
// (c) Copyright 2023 Supertenant Ltd. - all rights reserved.
// See LICENSE file in project root for license terms.
"use strict";const shimmer=require("shimmer"),cls=require("../../../cls"),{traceIdHeaderNameLowerCase,spanIdHeaderNameLowerCase,traceLevelHeaderNameLowerCase,ENTRY,EXIT}=require("../../../constants"),requireHook=require("../../../../util/requireHook"),tracingUtil=require("../../../tracingUtil");let logger;logger=require("../../../../logger").getLogger("tracing/pubsub",newLogger=>{logger=newLogger});const subscriptionRegex=/^projects\/([^/]+)\/subscriptions\/(.+)$/;let isActive=!1;function instrumentPublisher(publisher){process.nextTick(()=>{publisher&&publisher.Publisher&&instrumentConstructor(publisher,"Publisher","publishMessage",shimPublishMessage)})}function instrumentSubscriber(subscriber){process.nextTick(()=>{subscriber&&subscriber.Subscriber&&instrumentConstructor(subscriber,"Subscriber","emit",shimSubscriberEmit)})}function instrumentConstructor(module,constructorAttribute,methodAttribue,shimmedMethod){const OriginalConstructor=module[constructorAttribute];module[constructorAttribute]=function(){var newInstance=new OriginalConstructor(...arguments);return shimmer.wrap(newInstance,methodAttribue,shimmedMethod),newInstance}}function shimPublishMessage(originalFunction){return function(){var originalArgs=new Array(arguments.length);for(let i=0;i<arguments.length;i++)originalArgs[i]=arguments[i];var message=originalArgs[0];let attributes=message.attributes;attributes=attributes||(message.attributes={});message=cls.skipExitTracing({isActive:isActive,extendedResponse:!0});return message.skip?(message.suppressed&&propagateSuppression(attributes),originalFunction.apply(this,arguments)):instrumentedPublishMessage(this,originalFunction,originalArgs)}}function instrumentedPublishMessage(ctx,originalPublishMessage,originalArgs){const message=originalArgs[0];let attributes=message.attributes;return attributes=attributes||(message.attributes={}),cls.ns.runAndReturn(()=>{const span=cls.startSpan("gcps",EXIT),originalCallback=(span.ts=Date.now(),span.stack=tracingUtil.getStackTrace(instrumentedPublishMessage),span.data.gcps={op:"publish",projid:ctx.topic&&(ctx.topic.parent||ctx.topic.pubsub||{}).projectId,top:unqualifyName(ctx.topic&&ctx.topic.name),messageId:message.id},propagateTraceContext(attributes,span),originalArgs[1]);"function"==typeof originalCallback&&(originalArgs[1]=cls.ns.bind(function(err,messageId){finishSpan(err,messageId,span),originalCallback.apply(this,arguments)}));var thenable=originalPublishMessage.apply(ctx,originalArgs);return thenable&&"function"==typeof thenable.then?thenable.then(messageId=>(finishSpan(null,messageId,span),messageId),err=>{throw finishSpan(err,null,span),err}):thenable})}function propagateSuppression(attributes){attributes&&"object"==typeof attributes&&(attributes[traceLevelHeaderNameLowerCase]="0")}function propagateTraceContext(attributes,span){attributes&&"object"==typeof attributes&&(attributes[traceIdHeaderNameLowerCase]=span.t,attributes[spanIdHeaderNameLowerCase]=span.s,attributes[traceLevelHeaderNameLowerCase]="1")}function shimSubscriberEmit(originalEmit){return function(type){if("message"!==type||!isActive)return originalEmit.apply(this,arguments);type=cls.getCurrentSpan();if(type)return logger.warn("Cannot start a Google Cloud PubSub entry span when another span is already active. Currently, the following span is active: "+JSON.stringify(type)),originalEmit.apply(this,arguments);var originalArgs=new Array(arguments.length);for(let i=0;i<arguments.length;i++)originalArgs[i]=arguments[i];return instrumentedEmitMessage(this,originalEmit,originalArgs)}}function instrumentedEmitMessage(ctx,originalEmit,originalArgs){const message=originalArgs[1];if(!message||"object"!=typeof message)return originalEmit.apply(ctx,originalArgs);const attribtes=message.attributes||{};return cls.ns.runAndReturn(()=>{if("0"===tracingUtil.readAttribCaseInsensitive(attribtes,traceLevelHeaderNameLowerCase))return cls.setTracingLevel("0"),originalEmit.apply(ctx,originalArgs);var{projid,sub}=parseSubscription(message._subscriber&&message._subscriber._subscription);const span=cls.startSpan("gcps",ENTRY,tracingUtil.readAttribCaseInsensitive(attribtes,traceIdHeaderNameLowerCase),tracingUtil.readAttribCaseInsensitive(attribtes,spanIdHeaderNameLowerCase));span.ts=Date.now(),span.stack=tracingUtil.getStackTrace(instrumentedEmitMessage),span.data.gcps={op:"consume",projid:projid,sub:sub,messageId:message.id};try{return originalEmit.apply(ctx,originalArgs)}finally{setImmediate(()=>{span.d=Date.now()-span.ts,span.transmit()})}})}function unqualifyName(name){var idxSlash;if(name&&"string"==typeof name)return idxSlash=name.lastIndexOf("/"),name.substring(idxSlash+1)}function parseSubscription(subscription){return subscription&&subscription.name&&(subscription=subscriptionRegex.exec(subscription.name))?{projid:subscription[1],sub:subscription[2]}:{}}function finishSpan(err,messageId,span){err&&addErrorToSpan(err,span),"string"==typeof messageId&&(span.data.gcps.messageId=messageId),span.d=Date.now()-span.ts,span.transmit()}function addErrorToSpan(err,span){err&&(span.ec=1,err.message?span.data.gcps.error=err.message:"string"==typeof err&&(span.data.gcps.error=err))}exports.init=function(){requireHook.onFileLoad(/\/@google-cloud\/pubsub\/build\/src\/publisher\/index.js/,instrumentPublisher),requireHook.onFileLoad(/\/@google-cloud\/pubsub\/build\/src\/subscriber.js/,instrumentSubscriber)},exports.activate=function(){isActive=!0},exports.deactivate=function(){isActive=!1};