Repository URL to install this package:
|
Version:
0.8.2 ▾
|
// (c) Copyright 2023 Supertenant Ltd. - all rights reserved.
// See LICENSE file in project root for license terms.
"use strict";const EventEmitter=require("events"),{compression:applyCompression}=require("@supertenant/core")["util"],SKIPPED={};let processorIdx=0;class DataProcessor extends EventEmitter{constructor(pluginName,compressionExcludeList){super(),this.dataSources={},this.essentialDataSources={},this.compressionExcludeList=compressionExcludeList,this.pluginName=pluginName,this.id=this.pluginName+"--"+processorIdx++,this.previous={},this.next={},this._resetCompressionState(),this.sendUncompressedEveryXTransmissions=300}getId(){return this.id}addSource(id,source,essential=!0){this.dataSources[id]=source,essential&&(this.essentialDataSources[id]=source).on("firstRefresh",()=>{this._potentiallyEmitReadyEvent()})}activate(){this._forEachSource(source=>source.activate())}deactivate(){this._forEachSource(source=>source.deactivate())}resetSources(){this._forEachSource(source=>source.reset()),this._resetCompressionState()}_resetCompressionState(){this.lastCompressionFetchPerSource={},this.compressedTransmissionsSinceLastUncompressed=0,this.lastTransmittedPayload=null}isReady(){var essentialDataSourceIds=Object.keys(this.essentialDataSources);return 0!==essentialDataSourceIds.length&&null==essentialDataSourceIds.find(id=>!this.dataSources[id].hasRefreshedAtLeastOnce())}_potentiallyEmitReadyEvent(){this.isReady()&&this.emit("ready",this._getFullPayload())}getEntityId(){throw new Error("DataProcessor needs to override getEntityId.")}_getFullPayload(){return this.wrapAsPayload(this.getUncompressedData())}canSkipRecompilation(){return!this.compressionExcludeList||0===this.compressionExcludeList.length}getUncompressedData(withSkip){var willSendUncompressedUpdate;return this.isReady()?(willSendUncompressedUpdate=this._shouldSendUncompressedUpdate(),withSkip&&this.canSkipRecompilation()&&!willSendUncompressedUpdate&&null==Object.keys(this.dataSources).find(id=>null==this.lastCompressionFetchPerSource[id]||this.dataSources[id].getLastRefreshTimestamp()>this.lastCompressionFetchPerSource[id])?SKIPPED:this._getProcessedData(withSkip)):null}_shouldSendUncompressedUpdate(){return null==this.lastTransmittedPayload||this.compressedTransmissionsSinceLastUncompressed>=this.sendUncompressedEveryXTransmissions}compress(uncompressedData){let dataToBeSent;var shouldSendUncompressedUpdate=uncompressedData===SKIPPED||this._shouldSendUncompressedUpdate();return dataToBeSent=shouldSendUncompressedUpdate?uncompressedData:applyCompression(this.lastTransmittedPayload,uncompressedData,this.compressionExcludeList),this.compressedTransmissionsSinceLastUncompressed>=this.sendUncompressedEveryXTransmissions&&(this.compressedTransmissionsSinceLastUncompressed=0),dataToBeSent}wrapAsPayload(data){return{name:this.pluginName,entityId:this.getEntityId(),data:data}}_getProcessedData(withSkip){this.next={};withSkip=this.processData(this._compileRawData(withSkip),this.previous,this.next);return this.previous=this.next,withSkip}processData(){throw new Error("DataProcessor needs to override processData.")}_compileRawData(withSkip){const rawDataPerSource={};return this._forEachSource((source,id)=>{rawDataPerSource[id]=source.getRawData(),withSkip&&(this.lastCompressionFetchPerSource[id]=Date.now())}),rawDataPerSource}setLastTransmittedPayload(payload){payload!==SKIPPED&&(this.lastTransmittedPayload=payload),this.compressedTransmissionsSinceLastUncompressed++}_forEachSource(fn){Object.keys(this.dataSources).forEach(id=>fn(this.dataSources[id],id))}}module.exports=exports=DataProcessor;