Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
cassandra-driver / lib / streams.js
Size: Mime:
var util = require('util');
var stream = require('stream');
var Transform = stream.Transform;
var Writable = stream.Writable;

var types = require('./types');
var utils = require('./utils');
var FrameHeader = types.FrameHeader;
var FrameReader = require('./readers').FrameReader;

/**
 * Transforms chunks, emits data objects {header, chunk}
 * @param options Stream options
 * @param {Number} initialVersion Initial protocol version to be used
 */
function Protocol (options, initialVersion) {
  Transform.call(this, options);
  this.header = null;
  this.headerChunks = [];
  this.bodyLength = 0;
  this.version = 0;
  //Use header size based on the initial protocol version
  this.headerSize = FrameHeader.size(initialVersion);
}

util.inherits(Protocol, Transform);

Protocol.prototype._transform = function (chunk, encoding, callback) {
  var error = null;
  try {
    this.transformChunk(chunk);
  }
  catch (err) {
    error = err;
  }
  callback(error);
};

Protocol.prototype.transformChunk = function (chunk) {
  var bodyChunk = chunk;

  if (this.header === null) {
    this.headerChunks.push(chunk);
    var length = utils.totalLength(this.headerChunks);
    if (length < this.headerSize) {
      return;
    }
    var chunksGrouped = Buffer.concat(this.headerChunks, length);
    this.header = FrameHeader.fromBuffer(chunksGrouped);
    if (this.version !== this.header.version) {
      this.version = this.header.version;
      //set the correct header size
      this.headerSize = FrameHeader.size(this.version);
    }
    if (length >= this.headerSize) {
      bodyChunk = chunksGrouped.slice(this.headerSize);
    }
  }

  this.bodyLength += bodyChunk.length;
  var frameEnded = this.bodyLength >= this.header.bodyLength;
  var header = this.header;

  var nextChunk = null;

  if (this.bodyLength > this.header.bodyLength) {
    //We received more than a complete frame
    var previousBodyLength = (this.bodyLength - bodyChunk.length);

    var nextStart = this.header.bodyLength - previousBodyLength;
    if (nextStart > bodyChunk.length) {
      throw new Error('Tried to slice a received chunk outside boundaries');
    }
    nextChunk = bodyChunk.slice(nextStart);
    bodyChunk = bodyChunk.slice(0, nextStart);
    this.clear();

    //close loop: parse next chunk before emitting
    this.transformChunk(nextChunk);
  }
  else if (this.bodyLength === this.header.bodyLength) {
    this.clear();
  }

  this.push({header: header, chunk: bodyChunk, frameEnded: frameEnded});
};

Protocol.prototype.clear = function () {
  this.header = null;
  this.bodyLength = 0;
  this.headerChunks = [];
};

/**
 * A stream that gets reads header + body chunks and transforms them into header + (row | error)
 * @param {Object} streamOptions Node.js Stream options
 * @param {Encoder} encoder Encoder instance for the parser to use
 */
function Parser (streamOptions, encoder) {
  Transform.call(this, streamOptions);
  //frames that are streaming, indexed by id
  this.frames = {};
  this.encoder = encoder;
}

util.inherits(Parser, Transform);

Parser.prototype._transform = function (item, encoding, callback) {
  var frameInfo = this.frameState(item);

  var error = null;
  try {
    this.parseBody(frameInfo, item);
  }
  catch (err) {
    error = err;
  }
  callback(error);

  if (item.frameEnded) {
    //all the parsing finished and it was streamed down
    //emit an item that signals it
    this.emitItem(frameInfo, {frameEnded: true});
  }
};

/**
 * Pushes the item with the header and the provided props to the consumer
 */
Parser.prototype.emitItem = function (frameInfo, props) {
  //push the header and props to be read by consumers
  this.push(utils.extend({ header: frameInfo.header}, props));
};

Parser.prototype.parseBody = function (frameInfo, item) {
  if (frameInfo.missingBytes) {
    // Avoid quadratic buffer copying by accumulating chunks for a large cell
    // value in an array until enough bytes are available to read past the
    // cell value.
    if (frameInfo.buffers) {
      frameInfo.buffers.push(item.chunk);
    } else if (frameInfo.buffer) {
        frameInfo.buffers = [frameInfo.buffer, item.chunk];
    } else {
        frameInfo.buffers = [item.chunk];
    }

    if (item.chunk.length < frameInfo.missingBytes) {
      frameInfo.missingBytes -= item.chunk.length;
      // Don't continue parsing until we have collected enough bytes in the
      // buffer.
      return;
    } else {
      // Now we have enough bytes in the buffers. Concat all chunks into a
      // single buffer & proceed with normal parsing.
      item.chunk = Buffer.concat(frameInfo.buffers);
      frameInfo.missingBytes = null;
      frameInfo.buffer = null;
      frameInfo.buffers = null;
    }
  }

  var reader = new FrameReader(item.header, item.chunk);
  if (frameInfo.buffer) {
    reader.unshift(frameInfo.buffer);
    frameInfo.buffer = null;
  }
  switch (item.header.opcode) {
    case types.opcodes.result:
      return this.parseResult(frameInfo, reader);
    case types.opcodes.ready:
    case types.opcodes.authSuccess:
      return this.emitItem(frameInfo, {ready: true});
    case types.opcodes.authChallenge:
      return this.parseAuthChallenge(frameInfo, reader);
    case types.opcodes.authenticate:
      return this.emitItem(frameInfo, {mustAuthenticate: true});
    case types.opcodes.error:
      return this.parseError(frameInfo, reader);
    case types.opcodes.supported:
      return this.emitItem(frameInfo);
    case types.opcodes.event:
      return this.parseEvent(frameInfo, reader);
    default:
      return this.emitItem(frameInfo, {error: new Error('Received invalid opcode: ' + item.header.opcode)});
  }
};

/**
 * Tries to read the error code and message.
 * If there is enough data to read, it pushes the header and error. If there isn't, it buffers it.
 * @param frameInfo information of the frame being parsed
 * @param {FrameReader} reader
 */
Parser.prototype.parseError = function (frameInfo, reader) {
  try {
    this.emitItem(frameInfo, {error: reader.readError()});
  }
  catch (e) {
    if (e instanceof RangeError) {
      frameInfo.buffer = reader.getBuffer();
      return;
    }
    throw e;
  }
};

/**
 * Tries to read the event from Cassandra
 * If there is enough data to read, it pushes the header and body. If there isn't, it buffers it.
 * @param frameInfo information of the frame being parsed
 * @param {FrameReader} reader
 */
Parser.prototype.parseEvent = function (frameInfo, reader) {
  try {
    this.emitItem(frameInfo, {event: reader.readEvent()});
  }
  catch (e) {
    if (e instanceof RangeError) {
      frameInfo.buffer = reader.getBuffer();
      return;
    }
    throw e;
  }
};

/**
 * Tries to read the result in the body of a message
 * @param frameInfo Frame information, header / metadata
 * @param {FrameReader} reader
 */
Parser.prototype.parseResult = function (frameInfo, reader) {
  var originalOffset = reader.offset;
  try {
    if (!frameInfo.meta) {
      frameInfo.flagsInfo = reader.readFlagsInfo();
      frameInfo.kind = reader.readInt();
      if (frameInfo.kind === types.resultKind.prepared) {
        frameInfo.preparedId = utils.copyBuffer(reader.readShortBytes());
      }
      else if (frameInfo.kind === types.resultKind.setKeyspace) {
        frameInfo.keyspace = reader.readString();
      }
      if (frameInfo.kind === types.resultKind.rows ||
          frameInfo.kind === types.resultKind.prepared) {
        frameInfo.meta = reader.readMetadata(frameInfo.kind);
      }
    }
  }
  catch (e) {
    if (e instanceof RangeError) {
      //A controlled error, the kind / metadata is not available to be read yet
      return this.bufferForLater(frameInfo, reader, originalOffset);
    }
    throw e;
  }
  switch (frameInfo.kind) {
    case types.resultKind.setKeyspace:
      return this.emitItem(frameInfo, { keyspaceSet: frameInfo.keyspace});
    case types.resultKind.voidResult:
      return this.emitItem(frameInfo, { id: frameInfo.preparedId, flags: frameInfo.flagsInfo});
    case types.resultKind.schemaChange:
      //it contains additional info that it is not parsed
      if (frameInfo.emitted) {
        return;
      }
      frameInfo.emitted = true;
      return this.emitItem(frameInfo, { schemaChange: true });
    case types.resultKind.prepared:
      //it contains result metadata that it is not parsed
      if (frameInfo.emitted) {
        return;
      }
      frameInfo.emitted = true;
      return this.emitItem(frameInfo, { id: frameInfo.preparedId, meta: frameInfo.meta, flags: frameInfo.flagsInfo});
  }
  //it contains rows
  if (reader.remainingLength() > 0) {
    this.parseRows(frameInfo, reader);
  }
};

/**
 * @param frameInfo
 * @param {FrameReader} reader
 */
Parser.prototype.parseRows = function (frameInfo, reader) {
  if (typeof frameInfo.rowLength === 'undefined') {
    try {
      frameInfo.rowLength = reader.readInt();
    }
    catch (e) {
      if (e instanceof RangeError) {
        //there is not enough data to read this row
        this.bufferForLater(frameInfo, reader);
        return;
      }
      throw e;
    }
  }
  if (frameInfo.rowLength === 0) {
    return this.emitItem(frameInfo, { result: { rows: utils.emptyArray, meta: frameInfo.meta, flags: frameInfo.flagsInfo}});
  }
  var meta = frameInfo.meta;
  frameInfo.rowIndex = frameInfo.rowIndex || 0;
  var stopReading = false;
  for (var i = frameInfo.rowIndex; i < frameInfo.rowLength; i++) {
    var rowOffset = reader.offset;
    var row = new types.Row(meta.columns);
    for (var j = 0; j < meta.columns.length; j++ ) {
      var c = meta.columns[j];
      try {
        row[c.name] = this.encoder.decode(reader.readBytes(), c.type);
      }
      catch (e) {
        if (e instanceof RangeError) {
          //there is not enough data to read this row
          this.bufferForLater(frameInfo, reader, rowOffset, i, e.missingBytes);
          stopReading = true;
          break;
        }
        throw e;
      }
    }
    if (stopReading) {
      break;
    }
    this.emitItem(frameInfo, {
      row: row,
      meta: frameInfo.meta,
      byRow: frameInfo.byRow,
      length: frameInfo.rowLength,
      flags: frameInfo.flagsInfo
    });
  }
};

/**
 * @param frameInfo
 * @param {FrameReader} reader
 */
Parser.prototype.parseAuthChallenge = function (frameInfo, reader) {
  try {
    //receives a token (can be empty or null)
    this.emitItem(frameInfo, {authChallenge: true, token: reader.readBytes()});
  }
  catch (e) {
    if (e instanceof RangeError) {
      frameInfo.buffer = reader.getBuffer();
      return;
    }
    throw e;
  }
};

/**
 * Sets parser options (ie: how to yield the results as they are parsed)
 * @param {Number} id Id of the stream
 * @param options
 */
Parser.prototype.setOptions = function (id, options) {
  if (this.frames[id.toString()]) {
    throw new types.DriverError('There was already state for this frame');
  }
  this.frames[id.toString()] = options;
};

/**
 * Gets the frame info from the internal state.
 * In case it is not there, it creates it.
 * In case the frame ended
 */
Parser.prototype.frameState = function (item) {
  var frameInfo = this.frames[item.header.streamId];
  if (!frameInfo) {
    frameInfo = this.frames[item.header.streamId] = {};
  }
  if (item.frameEnded) {
    delete this.frames[item.header.streamId];
  }
  frameInfo.header = item.header;
  return frameInfo;
};

/**
 * Buffers for later use as there isn't enough data to read
 * @param frameInfo
 * @param {FrameReader} reader
 * @param {Number} [originalOffset]
 * @param {Number} [rowIndex]
 * @param {Number} [missingBytes]
 */
Parser.prototype.bufferForLater = function (frameInfo, reader, originalOffset, rowIndex, missingBytes) {
  if (!originalOffset && originalOffset !== 0) {
    originalOffset = reader.offset;
  }
  frameInfo.rowIndex = rowIndex;
  frameInfo.buffer = reader.slice(originalOffset);
  // Keep track of missing bytes in a cell, so that we can efficiently
  // accumulate chunks for it without quadratic buffer copying.
  frameInfo.missingBytes = missingBytes;
  reader.toEnd();
};

/**
 * Represents a writable streams that emits results
 */
function ResultEmitter(options) {
  Writable.call(this, options);
  /**
   * Stores the rows for frames that needs to be yielded as one result with many rows
   */
  this.rowBuffer = {};
}

util.inherits(ResultEmitter, Writable);

ResultEmitter.prototype._write = function (item, encoding, callback) {
  var error = null;
  try {
    this.each(item);
  }
  catch (err) {
    error = err;
  }
  callback(error);
};


/**
 * Analyzes the item and emit the corresponding event
 */
ResultEmitter.prototype.each = function (item) {
  if (item.error || item.result) {
    //Its either an error or an empty array rows
    //no transformation needs to be made
    return this.emit('result', item.header, item.error, item.result);
  }
  if (item.frameEnded) {
    return this.emit('frameEnded', item.header);
  }
  if (item.byRow) {
    //it should be yielded by row
    return this.emit('row', item.header, item.row, item.meta, item.length, item.flags);
  }
  if (item.row) {
    //it should be yielded as a result
    //it needs to be buffered to an array of rows
    return this.bufferAndEmit(item);
  }
  if (item.event) {
    //its an event from Cassandra
    return this.emit('nodeEvent', item.header, item.event);
  }
  //its a raw response (object with flags)
  return this.emit('result', item.header, null, item);
};

/**
 * Buffers the rows until the result set is completed and emits the result event.
 */
ResultEmitter.prototype.bufferAndEmit = function (item) {
  var rows = this.rowBuffer[item.header.streamId];
  if (!rows) {
    rows = this.rowBuffer[item.header.streamId] = [];
  }
  rows.push(item.row);
  if (rows.length === item.length) {
    this.emit('result', item.header, null, { rows: rows, meta: item.meta, flags: item.flags});
    delete this.rowBuffer[item.header.streamId];
  }
};

function ParserError(err, rowIndex, colIndex) {
  types.DriverError.call(this, err.message, this.constructor);
  this.rowIndex = rowIndex;
  this.colIndex = colIndex;
  this.innerError = err;
  this.info = 'Represents an Error while parsing the result';
}

util.inherits(ParserError, types.DriverError);

exports.Protocol = Protocol;
exports.Parser = Parser;
exports.ResultEmitter = ResultEmitter;