Repository URL to install this package:
|
Version:
5.0.0-rc.11 ▾
|
'use strict';
Object.defineProperty(exports, '__esModule', { value: true });
function _interopDefault (ex) { return (ex && (typeof ex === 'object') && 'default' in ex) ? ex['default'] : ex; }
var stream = _interopDefault(require('stream'));
/*
* merge2
* https://github.com/teambition/merge2
*
* Copyright (c) 2014-2020 Teambition
* Licensed under the MIT license.
*/
const PassThrough = stream.PassThrough;
const slice = Array.prototype.slice;
var merge2_1 = merge2;
function merge2 () {
const streamsQueue = [];
const args = slice.call(arguments);
let merging = false;
let options = args[args.length - 1];
if (options && !Array.isArray(options) && options.pipe == null) {
args.pop();
} else {
options = {};
}
const doEnd = options.end !== false;
const doPipeError = options.pipeError === true;
if (options.objectMode == null) {
options.objectMode = true;
}
if (options.highWaterMark == null) {
options.highWaterMark = 64 * 1024;
}
const mergedStream = PassThrough(options);
function addStream () {
for (let i = 0, len = arguments.length; i < len; i++) {
streamsQueue.push(pauseStreams(arguments[i], options));
}
mergeStream();
return this
}
function mergeStream () {
if (merging) {
return
}
merging = true;
let streams = streamsQueue.shift();
if (!streams) {
process.nextTick(endStream);
return
}
if (!Array.isArray(streams)) {
streams = [streams];
}
let pipesCount = streams.length + 1;
function next () {
if (--pipesCount > 0) {
return
}
merging = false;
mergeStream();
}
function pipe (stream) {
function onend () {
stream.removeListener('merge2UnpipeEnd', onend);
stream.removeListener('end', onend);
if (doPipeError) {
stream.removeListener('error', onerror);
}
next();
}
function onerror (err) {
mergedStream.emit('error', err);
}
// skip ended stream
if (stream._readableState.endEmitted) {
return next()
}
stream.on('merge2UnpipeEnd', onend);
stream.on('end', onend);
if (doPipeError) {
stream.on('error', onerror);
}
stream.pipe(mergedStream, { end: false });
// compatible for old stream
stream.resume();
}
for (let i = 0; i < streams.length; i++) {
pipe(streams[i]);
}
next();
}
function endStream () {
merging = false;
// emit 'queueDrain' when all streams merged.
mergedStream.emit('queueDrain');
if (doEnd) {
mergedStream.end();
}
}
mergedStream.setMaxListeners(0);
mergedStream.add = addStream;
mergedStream.on('unpipe', function (stream) {
stream.emit('merge2UnpipeEnd');
});
if (args.length) {
addStream.apply(null, args);
}
return mergedStream
}
// check and pause streams for pipe.
function pauseStreams (streams, options) {
if (!Array.isArray(streams)) {
// Backwards-compat with old-style streams
if (!streams._readableState && streams.pipe) {
streams = streams.pipe(PassThrough(options));
}
if (!streams._readableState || !streams.pause || !streams.pipe) {
throw new Error('Only readable stream can be merged.')
}
streams.pause();
} else {
for (let i = 0, len = streams.length; i < len; i++) {
streams[i] = pauseStreams(streams[i], options);
}
}
return streams
}
exports.__moduleExports = merge2_1;
exports.default = merge2_1;
//# sourceMappingURL=index.js.map