Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
cassandra-driver / lib / request-handler.js
Size: Mime:
'use strict';
var util = require('util');

var errors = require('./errors');
var types = require('./types');
var utils = require('./utils');
var requests = require('./requests');
var retry = require('./policies/retry');

var maxSyncHostIterations = 20;
var retryOnNextHostDecision = {
  decision: retry.RetryPolicy.retryDecision.retry,
  useCurrentHost: false,
  consistency: undefined
};

/**
 * Handles a request to Cassandra, dealing with host fail-over and retries on error
 * @param {Client} client Client instance used to retrieve and set the keyspace.
 * @param {LoadBalancingPolicy} loadBalancingPolicy The load-balancing policy to use for the executions.
 * @param {RetryPolicy} retryPolicy The retry policy to use for the executions.
 * @constructor
 */
function RequestHandler(client, loadBalancingPolicy, retryPolicy) {
  this.client = client;
  this.loadBalancingPolicy = loadBalancingPolicy;
  this.retryPolicy = retryPolicy;
  this.profileManager = client.profileManager;
  this.logEmitter = client.options.logEmitter;
  this.retryCount = 0;
  //current request being executed.
  this.request = null;
  //the options for the request.
  this.requestOptions = utils.emptyObject;
  //The host selected by the load balancing policy to execute the request
  /** @type {Host} */
  this.host = null;
  /** @type {Connection} */
  this.connection = null;
  /** @type {Function} */
  this.retryHandler = null;
  this.stackContainer = null;
  this.triedHosts = {};
  this.hostIterations = 0;
}

/**
 * Gets a connection from the next host according to the load balancing policy
 * @param {QueryOptions} queryOptions
 * @param {function} callback
 * @private
 */
RequestHandler.prototype._getNextConnection = function (queryOptions, callback) {
  var self = this;
  var keyspace = this.client.keyspace;
  if (this.hostIterator) {
    return self._iterateThroughHosts(this.hostIterator, callback);
  }
  this.loadBalancingPolicy.newQueryPlan(keyspace, queryOptions, function (err, iterator) {
    if (err) {
      return callback(err);
    }
    self.hostIterator = iterator;
    self._iterateThroughHosts(self.hostIterator, callback);
  });
};

/**
 * Uses the iterator to try to acquire a connection from a host
 * @param {{next: function}} iterator
 * @param {Function} callback callback(err, connection, host) to use
 * @private
 */
RequestHandler.prototype._iterateThroughHosts = function (iterator, callback) {
  /** @type {Host} */
  var host;
  // get a host that is UP in a sync loop
  while (true) {
    var item = iterator.next();
    if (item.done) {
      return callback(new errors.NoHostAvailableError(this.triedHosts));
    }
    host = item.value;
    // set the distance relative to the client first
    var distance = this.profileManager.getDistance(host);
    if (distance === types.distance.ignored) {
      //If its marked as ignore by the load balancing policy, move on.
      continue;
    }
    if (host.isUp()) {
      break;
    }
    this.triedHosts[host.address] = 'Host considered as DOWN';
  }
  var self = this;
  this._getPooledConnection(host, function iterateSingleCallback(err, connection) {
    if (connection) {
      // Connection acquired
      self.host = host;
      return callback(null, connection);
    }
    if (err) {
      self.triedHosts[host.address] = err;
    }
    if (++self.hostIterations > maxSyncHostIterations) {
      //avoid a large number sync recursive calls
      self.hostIterations = 0;
      return process.nextTick(function iterateOnNextTick() {
        self._iterateThroughHosts(iterator, callback);
      });
    }
    //move to next host
    return self._iterateThroughHosts(iterator, callback);
  });
};

/**
 * Gets connection from the host connection pool
 * @param {Host} host
 * @param {Function} callback callback(err, connection, host) to use
 * @private
 */
RequestHandler.prototype._getPooledConnection = function (host, callback) {
  // Implementation detail: avoid the overhead of async flow control functions
  var self = this;
  host.borrowConnection(function (err, connection) {
    if (err) {
      return callback(err);
    }
    if (!self.client.keyspace || self.client.keyspace === connection.keyspace) {
      // connection is ready
      return callback(null, connection);
    }
    // switch keyspace
    connection.changeKeyspace(self.client.keyspace, function (err) {
      if (err) {
        host.removeFromPool(connection);
        return callback(err);
      }
      callback(null, connection);
    });
  });
};

RequestHandler.prototype.log = utils.log;

/**
 * Gets an available connection and sends the request
 * @param request
 * @param {QueryOptions|null} options
 * @param {Function} callback
 */
RequestHandler.prototype.send = function (request, options, callback) {
  if (this.request === null) {
    options = options || utils.emptyObject;
    if (options.captureStackTrace) {
      Error.captureStackTrace(this.stackContainer = {});
    }
  }
  this.request = request;
  this.requestOptions = options;
  var self = this;
  this._getNextConnection(options, function (err, c) {
    if (err) {
      //No connection available
      return callback(err);
    }
    self.connection = c;
    self._sendOnConnection(request, options, callback);
  });
};

/**
 * Sends a request to the current connection, adapting the result and retrying, if necessary.
 * @param request
 * @param {QueryOptions} options
 * @param {Function} callback
 * @private
 */
RequestHandler.prototype._sendOnConnection = function (request, options, callback) {
  var self = this;
  this.connection.sendStream(request, options, function readCallback(err, response) {
    if (err) {
      //Something bad happened, maybe retry or do something about it
      return self._handleError(err, callback);
    }
    response = response || utils.emptyObject;
    var result = new types.ResultSet(response, self.host.address, self.triedHosts, self.request.consistency);
    if (result.info.warnings) {
      //log the warnings
      result.info.warnings.forEach(function (message, i, warnings) {
        self.log('warning', util.format(
          'Received warning (%d of %d) "%s" for query "%s"',
          i + 1,
          warnings.length,
          message,
          self.request.query));
      });
    }
    if (response.schemaChange) {
      return self.client.handleSchemaAgreementAndRefresh(self.connection, response.schemaChange, function schemaCb() {
        callback(null, result);
      });
    }
    if (response.keyspaceSet) {
      self.client.keyspace = response.keyspaceSet;
    }
    callback(null, result);
  });
};

/**
 * Sets the keyspace in any connection that is already opened.
 * @param {Function} callback
 */
RequestHandler.prototype.setKeyspace = function (callback) {
  var connection;
  var hosts = this.client.hosts.values();
  for (var i = 0; i < hosts.length; i++) {
    this.host = hosts[i];
    connection = this.host.getActiveConnection();
    if (connection) {
      break;
    }
  }
  if (!connection) {
    return callback(new errors.DriverInternalError('No active connection found'));
  }
  connection.changeKeyspace(this.client.keyspace, callback);
};

/**
 * Sends multiple prepare requests on the next connection available
 * @param {Array.<string>} queries
 * @param {Array.<function>} callbacksArray
 * @param {QueryOptions} options
 * @param {function} callback
 */
RequestHandler.prototype.prepareMultiple = function (queries, callbacksArray, options, callback) {
  Error.captureStackTrace(this.stackContainer = {});
  var self = this;
  this.retryHandler = function () {
    // Use a custom retryHandler
    self.prepareMultiple(queries, callbacksArray, options, callback);
  };
  this._getNextConnection(options, function (err, c) {
    if (err) {
      //No connection available, no point on retrying
      return callback(err);
    }
    self.connection = c;
    self._prepareOnConnection(queries, callbacksArray, callback);
  });
};

/**
 * Serially prepares multiple queries on the current connection and handles retry if necessary
 * @param {Array.<string>} queries
 * @param {Array.<function>} callbacksArray
 * @param {function} callback
 * @private
 */
RequestHandler.prototype._prepareOnConnection = function (queries, callbacksArray, callback) {
  var self = this;
  var index = 0;
  utils.eachSeries(queries, function (query, next) {
    self.connection.prepareOnce(query, function (err, response) {
      if (callbacksArray) {
        callbacksArray[index++](err, response);
      }
      next(err);
    });
  }, function (err) {
    if (err) {
      return self._handleError(err, callback);
    }
    callback();
  });
};

/**
 * Checks if the error and determines if it should be ignored, retried or rethrown.
 * @param {Object} err
 * @param {Function} callback
 * @private
 */
RequestHandler.prototype._handleError = function (err, callback) {
  // add the error to the tried hosts
  this.triedHosts[this.host.address] = err;
  err['coordinator'] = this.host.address;
  if ((err instanceof errors.ResponseError) && err.code === types.responseErrorCodes.unprepared) {
    //noinspection JSUnresolvedVariable
    return this._prepareAndRetry(err.queryId, callback);
  }
  var decisionInfo = this._getDecision(err);
  if (err.isSocketError) {
    this.host.removeFromPool(this.connection);
  }
  if (!decisionInfo || decisionInfo.decision === retry.RetryPolicy.retryDecision.rethrow) {
    // callback in error
    if (this.requestOptions.captureStackTrace) {
      utils.fixStack(this.stackContainer.stack, err);
    }
    if (this.request instanceof requests.QueryRequest || this.request instanceof requests.ExecuteRequest) {
      err['query'] = this.request.query;
    }
    return callback(err);
  }
  if (decisionInfo.decision === retry.RetryPolicy.retryDecision.ignore) {
    //Return an empty response
    return callback(
      null,
      new types.ResultSet(utils.emptyObject, this.host.address, this.triedHosts, this.request.consistency));
  }
  return this._retry(decisionInfo.consistency, decisionInfo.useCurrentHost, callback);
};

/**
 * @returns {{decision, useCurrentHost, consistency}}
 */
RequestHandler.prototype._getDecision = function (err) {
  var operationInfo = {
    query: this.request && this.request.query,
    options: this.requestOptions,
    nbRetry: this.retryCount,
    // handler, request and retryOnTimeout properties are deprecated and should be removed in the next major version
    handler: this,
    request: this.request,
    retryOnTimeout: false
  };
  var self = this;
  function onRequestError() {
    if (self.retryHandler) {
      return retryOnNextHostDecision;
    }
    return self.retryPolicy.onRequestError(operationInfo, self.request.consistency, err);
  }
  if (err.isSocketError) {
    if (err.requestNotWritten) {
      // the request was definitely not applied, it's safe to retry
      return retryOnNextHostDecision;
    }
    return onRequestError();
  }
  if (err instanceof errors.OperationTimedOutError) {
    this.log('warning', err.message);
    this.host.checkHealth(this.connection);
    if (this.request instanceof requests.PrepareRequest) {
      // always retry on next host for PREPARE requests
      return retryOnNextHostDecision;
    }
    operationInfo.retryOnTimeout = this.requestOptions.retryOnTimeout !== false;
    return onRequestError();
  }
  if (err instanceof errors.ResponseError) {
    switch (err.code) {
      case types.responseErrorCodes.overloaded:
      case types.responseErrorCodes.isBootstrapping:
      case types.responseErrorCodes.truncateError:
        return onRequestError();
      case types.responseErrorCodes.unavailableException:
        //noinspection JSUnresolvedVariable
        return this.retryPolicy.onUnavailable(operationInfo, err.consistencies, err.required, err.alive);
      case types.responseErrorCodes.readTimeout:
        //noinspection JSUnresolvedVariable
        return this.retryPolicy.onReadTimeout(
          operationInfo, err.consistencies, err.received, err.blockFor, err.isDataPresent);
      case types.responseErrorCodes.writeTimeout:
        //noinspection JSUnresolvedVariable
        return this.retryPolicy.onWriteTimeout(
          operationInfo, err.consistencies, err.received, err.blockFor, err.writeType);
    }
  }
  return { decision: retry.RetryPolicy.retryDecision.rethrow };
};

/**
 * @param {Number} consistency
 * @param {Boolean} useCurrentHost
 * @param {Function} callback
 * @private
 */
RequestHandler.prototype._retry = function (consistency, useCurrentHost, callback) {
  this.log('info', 'Retrying request');
  if (this.retryHandler) {
    // custom retry handler (not a QueryRequest / ExecuteRequest / BatchRequest)
    return this.retryHandler(callback);
  }
  this.retryCount++;
  if (typeof consistency === 'number') {
    this.request.consistency = consistency;
  }
  if (useCurrentHost !== false) {
    // retry on the current host by default
    return this._sendOnConnection(this.request, this.requestOptions, callback);
  }
  // use the next host in the query plan to send the request
  this.send(this.request, this.requestOptions, callback);
};

/**
 * Prepares the query and retries on the SAME host
 * @param {Buffer} queryId
 * @param {Function} callback
 * @private
 */
RequestHandler.prototype._prepareAndRetry = function (queryId, callback) {
  this.log('info', util.format('Query 0x%s not prepared on host %s, preparing and retrying', queryId.toString('hex'), this.host.address));
  var self = this;
  function retryRequest (err) {
    if (err) {
      //Could not be prepared or retried, just send the error to caller
      return callback(err);
    }
    self._sendOnConnection(self.request, self.requestOptions, callback);
  }
  if (this.request instanceof requests.ExecuteRequest) {
    //Its a single Execute of a prepared statement
    this._prepareOnConnection([this.request.query], [function (err, response) {
      if (err) {
        // let the error be handle at general callback level
        return;
      }
      if (!Buffer.isBuffer(response.id) ||
        response.id.length !== self.request.queryId.length ||
        response.id.toString('hex') !== self.request.queryId.toString('hex')
      ) {
        self.log('warning', util.format('Unexpected difference between query ids for query "%s" (%s !== %s)',
          self.request.query,
          response.id.toString('hex'),
          self.request.queryId.toString('hex')));
        self.request.queryId = response.id;
      }
    }], retryRequest);
    return;
  }
  if (this.request instanceof requests.BatchRequest) {
    //Its a BATCH of prepared statements
    //We need to prepare all the different queries in the batch in the current host
    var queries = {};
    this.request.queries.forEach(function (item) {
      //Unique queries
      queries[item.query] = item.info;
    });
    var queryStrings = Object.keys(queries);
    var singleCallbacks = new Array(queryStrings.length);
    queryStrings.forEach(function (query, i) {
      var info = queries[query];
      singleCallbacks[i] = function singlePrepareCallback(err, response) {
        if (err) {
          // let the error be handle at general callback level
          return;
        }
        if (!Buffer.isBuffer(response.id) ||
          response.id.length !== info.queryId.length ||
          response.id.toString('hex') !== info.queryId.toString('hex')
        ) {
          self.log('warning', util.format('Unexpected difference between query ids for query "%s" (%s !== %s)',
            info.query,
            response.id.toString('hex'),
            info.queryId.toString('hex')));
          info.queryId = response.id;
        }
      };
    });
    this._prepareOnConnection(queryStrings, singleCallbacks, retryRequest);
    return;
  }
  return callback(new errors.DriverInternalError('Obtained unprepared from wrong request'));
};

module.exports = RequestHandler;