Repository URL to install this package:
|
Version:
3.2.0 ▾
|
"use strict";
var events = require('events');
var util = require('util');
var utils = require('./utils.js');
var errors = require('./errors.js');
var types = require('./types');
var ControlConnection = require('./control-connection');
var ProfileManager = require('./execution-profile').ProfileManager;
var RequestHandler = require('./request-handler');
var requests = require('./requests');
var clientOptions = require('./client-options');
/**
* Max amount of pools being warmup in parallel, when warmup is enabled
* @const {Number}
* @private
*/
var warmupLimit = 32;
/**
* Client options
* @typedef {Object} ClientOptions
* @property {Array.<string>} contactPoints
* Array of addresses or host names of the nodes to add as contact points.
* <p>
* Contact points are addresses of Cassandra nodes that the driver uses to discover the cluster topology.
* </p>
* <p>
* Only one contact point is required (the driver will retrieve the address of the other nodes automatically),
* but it is usually a good idea to provide more than one contact point, because if that single contact point is
* unavailable, the driver will not be able to initialize correctly.
* </p>
* @property {String} keyspace The logged keyspace for all the connections created within the {@link Client} instance.
* @property {Number} refreshSchemaDelay The default window size in milliseconds used to debounce node list and schema
* refresh metadata requests. Default: 1000.
* @property {Boolean} isMetadataSyncEnabled Determines whether client-side schema metadata retrieval and update is
* enabled.
* <p>Setting this value to <code>false</code> will cause keyspace information not to be automatically loaded, affecting
* replica calculation per token in the different keyspaces. When disabling metadata synchronization, use
* [Metadata.refreshKeyspaces()]{@link module:metadata~Metadata#refreshKeyspaces} to keep keyspace information up to
* date or token-awareness will not work correctly.</p>
* Default: <code>true</code>.
* @property {Object} policies
* @property {LoadBalancingPolicy} policies.loadBalancing The load balancing policy instance to be used to determine
* the coordinator per query.
* @property {RetryPolicy} policies.retry The retry policy.
* @property {ReconnectionPolicy} policies.reconnection The reconnection policy to be used.
* @property {AddressTranslator} policies.addressResolution The address resolution policy.
* @property {TimestampGenerator} policies.timestampGeneration The client-side
* [query timestamp generator]{@link module:policies/timestampGeneration~TimestampGenerator}.
* <p>
* Default: <code>[MonotonicTimestampGenerator]{@link module:policies/timestampGeneration~MonotonicTimestampGenerator}
* </code>
* </p>
* <p>Use <code>null</code> to disable client-side timestamp generation.</p>
* @property {QueryOptions} queryOptions Default options for all queries.
* @property {Object} pooling Pooling options.
* @property {Number} pooling.heartBeatInterval The amount of idle time in milliseconds that has to pass before the
* driver issues a request on an active connection to avoid idle time disconnections. Default: 30000.
* @property {Object} pooling.coreConnectionsPerHost Associative array containing amount of connections per host
* distance.
* @property {Boolean} pooling.warmup Determines if all connections to hosts in the local datacenter must be opened on
* connect. Default: false.
* @property {Object} protocolOptions
* @property {Number} protocolOptions.port The port to use to connect to the Cassandra host. If not set through this
* method, the default port (9042) will be used instead.
* @property {Number} protocolOptions.maxSchemaAgreementWaitSeconds The maximum time in seconds to wait for schema
* agreement between nodes before returning from a DDL query. Default: 10.
* @property {Number} protocolOptions.maxVersion When set, it limits the maximum protocol version used to connect to
* the nodes.
* Useful for using the driver against a cluster that contains nodes with different major/minor versions of Cassandra.
* @property {Object} socketOptions
* @property {Number} socketOptions.connectTimeout Connection timeout in milliseconds. Default: 5000.
* @property {Number} socketOptions.defunctReadTimeoutThreshold Determines the amount of requests that simultaneously
* have to timeout before closing the connection. Default: 64.
* @property {Boolean} socketOptions.keepAlive Whether to enable TCP keep-alive on the socket. Default: true.
* @property {Number} socketOptions.keepAliveDelay TCP keep-alive delay in milliseconds. Default: 0.
* @property {Number} socketOptions.readTimeout Per-host read timeout in milliseconds.
* <p>
* Please note that this is not the maximum time a call to {@link Client#execute} may have to wait;
* this is the maximum time that call will wait for one particular Cassandra host, but other hosts will be tried if
* one of them timeout. In other words, a {@link Client#execute} call may theoretically wait up to
* <code>readTimeout * number_of_cassandra_hosts</code> (though the total number of hosts tried for a given query also
* depends on the LoadBalancingPolicy in use).
* <p>When setting this value, keep in mind the following:</p>
* <ul>
* <li>the timeout settings used on the Cassandra side (*_request_timeout_in_ms in cassandra.yaml) should be taken
* into account when picking a value for this read timeout. You should pick a value a couple of seconds greater than
* the Cassandra timeout settings.
* </li>
* <li>
* the read timeout is only approximate and only control the timeout to one Cassandra host, not the full query.
* </li>
* </ul>
* Setting a value of 0 disables read timeouts. Default: 0.
* @property {Boolean} socketOptions.tcpNoDelay When set to true, it disables the Nagle algorithm. Default: true.
* @property {Number} socketOptions.coalescingThreshold Buffer length in bytes use by the write queue before flushing
* the frames. Default: 8000.
* @property {AuthProvider} authProvider Provider to be used to authenticate to an auth-enabled cluster.
* @property {Object} sslOptions Client-to-node ssl options, when set the driver will use the secure layer.
* You can specify cert, ca, ... options named after the Node.js tls.connect options.
* @property {Object} encoding
* @property {Function} encoding.map Map constructor to use for Cassandra map<k,v> type encoding and decoding.
* If not set, it will default to Javascript Object with map keys as property names.
* @property {Function} encoding.set Set constructor to use for Cassandra set<k> type encoding and decoding.
* If not set, it will default to Javascript Array.
* @property {Boolean} encoding.copyBuffer Determines if the network buffer should be copied for buffer based data
* types (blob, uuid, timeuuid and inet).
* <p>
* Setting it to true will cause that the network buffer is copied for each row value of those types,
* causing additional allocations but freeing the network buffer to be reused.
* Setting it to true is a good choice for cases where the Row and ResultSet returned by the queries are long-lived
* objects.
* </p>
* <p>
* Setting it to false will cause less overhead and the reference of the network buffer to be maintained until the row
* / result set are de-referenced.
* Default: true.
* </p>
* @property {Boolean} encoding.useUndefinedAsUnset Valid for Cassandra 2.2 and above. Determines that, if a parameter
* is set to
* <code>undefined</code> it should be encoded as <code>unset</code>.
* <p>
* By default, ECMAScript <code>undefined</code> is encoded as <code>null</code> in the driver. Cassandra 2.2
* introduced the concept of unset.
* At driver level, you can set a parameter to unset using the field <code>types.unset</code>. Setting this flag to
* true allows you to use ECMAScript undefined as Cassandra <code>unset</code>.
* </p>
* <p>
* Default: true.
* </p>
* @property {Array.<ExecutionProfile>} profiles The array of [execution profiles]{@link ExecutionProfile}.
* @property {Function} promiseFactory Function to be used to create a <code>Promise</code> from a
* callback-style function.
* <p>
* Promise libraries often provide different methods to create a promise. For example, you can use Bluebird's
* <code>Promise.fromCallback()</code> method.
* </p>
* <p>
* By default, the driver will use the
* [Promise constructor]{@link https://developer.mozilla.org/en/docs/Web/JavaScript/Reference/Global_Objects/Promise}.
* </p>
*/
/**
* Query options
* @typedef {Object} QueryOptions
* @property {Boolean} [autoPage] Determines if the driver must retrieve the following result pages automatically.
* <p>
* This setting is only considered by the [Client#eachRow()]{@link Client#eachRow} method. For more information,
* check the
* [paging results documentation]{@link http://docs.datastax.com/en/developer/nodejs-driver/latest/features/paging/}.
* </p>
* @property {Boolean} [captureStackTrace] Determines if the stack trace before the query execution should be
* maintained.
* <p>
* Useful for debugging purposes, it should be set to <code>false</code> under production environment as it adds an
* unnecessary overhead to each execution.
* </p>
* Default: false.
* @property {Number} [consistency] [Consistency level]{@link module:types~consistencies}. Default: localOne.
* @property {Object} [customPayload] Key-value payload to be passed to the server. On the Cassandra side,
* implementations of QueryHandler can use this data.
* @property {String|ExecutionProfile} [executionProfile] Name or instance of the [profile]{@link ExecutionProfile} to
* be used for this execution. If not set, it will the use "default" execution profile.
* @property {Number} [fetchSize] Amount of rows to retrieve per page.
* @property {Array|Array<Array>} [hints] Type hints for parameters given in the query, ordered as for the parameters.
* <p>For batch queries, an array of such arrays, ordered as with the queries in the batch.</p>
* @property {Boolean} [isIdempotent] Defines whether the query can be applied multiple times without changing the result
* beyond the initial application.
* <p>
* The query execution idempotence can be used at [RetryPolicy]{@link module:policies/retry~RetryPolicy} level to
* determine if an statement can be retried in case of request error or write timeout.
* </p>
* <p>Default: <code>false</code>.</p>
* @property {Boolean} [logged] Determines if the batch should be written to the batchlog. Only valid for
* [Client#batch()]{@link Client#batch}, it will be ignored by other methods. Default: true.
* @property {Buffer|String} [pageState] Buffer or string token representing the paging state.
* <p>Useful for manual paging, if provided, the query will be executed starting from a given paging state.</p>
* @property {Boolean} [prepare] Determines if the query must be executed as a prepared statement.
* @property {Number} [readTimeout] When defined, it overrides the default read timeout
* (<code>socketOptions.readTimeout</code>) in milliseconds for this execution per coordinator.
* <p>
* Suitable for statements for which the coordinator may allow a longer server-side timeout, for example aggregation
* queries.
* </p>
* <p>
* A value of <code>0</code> disables client side read timeout for the execution. Default: <code>undefined</code>.
* </p>
* @property {RetryPolicy} [retry] Retry policy for the query.
* <p>
* This property can be used to specify a different [retry policy]{@link module:policies/retry} to the one specified
* in the {@link ClientOptions}.policies.
* </p>
* @property {Boolean} [retryOnTimeout] Determines if the client should retry when it didn't hear back from a host
* within <code>socketOptions.readTimeout</code>. Default: true.
* @property {Array} [routingIndexes] Index of the parameters that are part of the partition key to determine
* the routing.
* @property {Buffer|Array} [routingKey] Partition key(s) to determine which coordinator should be used for the query.
* @property {Array} [routingNames] Array of the parameters names that are part of the partition key to determine the
* routing.
* @property {Number} [serialConsistency] Serial consistency is the consistency level for the serial phase of
* conditional updates.
* This option will be ignored for anything else that a conditional update/insert.
* @property {Number|Long} [timestamp] The default timestamp for the query in microseconds from the unix epoch
* (00:00:00, January 1st, 1970).
* <p>If provided, this will replace the server side assigned timestamp as default timestamp.</p>
* <p>Use [generateTimestamp()]{@link module:types~generateTimestamp} utility method to generate a valid timestamp
* based on a Date and microseconds parts.</p>
* @property {Boolean} [traceQuery] Enable query tracing for the execution. Use query tracing to diagnose performance
* problems related to query executions. Default: false.
* <p>To retrieve trace, you can call [Metadata.getTrace()]{@link module:metadata~Metadata#getTrace} method.</p>
*/
/**
* Creates a new instance of {@link Client}.
* @classdesc
* A Client holds connections to a Cassandra cluster, allowing it to be queried.
* Each Client instance maintains multiple connections to the cluster nodes,
* provides [policies]{@link module:policies} to choose which node to use for each query,
* and handles [retries]{@link module:policies/retry} for failed query (when it makes sense), etc...
* <p>
* Client instances are designed to be long-lived and usually a single instance is enough
* per application. As a given Client can only be "logged" into one keyspace at
* a time (where the "logged" keyspace is the one used by query if the query doesn't
* explicitly use a fully qualified table name), it can make sense to create one
* client per keyspace used. This is however not necessary to query multiple keyspaces
* since it is always possible to use a single session with fully qualified table name
* in queries.
* </p>
* @extends EventEmitter
* @param {ClientOptions} options The options for this instance.
* @example <caption>Creating a new client instance</caption>
* const client = new Client({ contactPoints: ['192.168.1.100'] });
* client.connect(function (err) {
* if (err) return console.error(err);
* console.log('Connected to cluster with %d host(s): %j', client.hosts.length, client.hosts.keys());
* });
* @example <caption>Executing a query</caption>
* // calling #execute() can be made without previously calling #connect(), as internally
* // it will ensure it's connected before attempting to execute the query
* client.execute('SELECT key FROM system.local', function (err, result) {
* if (err) return console.error(err);
* const row = result.first();
* console.log(row['key']);
* });
* @example <caption>Executing a query with promise-based API</caption>
* const result = await client.execute('SELECT key FROM system.local');
* const row = result.first();
* console.log(row['key']);
* @constructor
*/
function Client(options) {
events.EventEmitter.call(this);
this.options = clientOptions.extend({ logEmitter: this.emit.bind(this) }, options);
Object.defineProperty(this, 'profileManager', { value: new ProfileManager(this.options) });
Object.defineProperty(this, 'controlConnection', {
value: new ControlConnection(this.options, this.profileManager), writable: true }
);
//Unlimited amount of listeners for internal event queues by default
this.setMaxListeners(0);
this.connected = false;
this.isShuttingDown = false;
/**
* Gets the name of the active keyspace.
* @type {String}
*/
this.keyspace = options.keyspace;
/**
* Gets the schema and cluster metadata information.
* @type {Metadata}
*/
this.metadata = this.controlConnection.metadata;
/**
* Gets an associative array of cluster hosts.
* @type {HostMap}
*/
this.hosts = null;
}
util.inherits(Client, events.EventEmitter);
/**
* Emitted when a new host is added to the cluster.
* <ul>
* <li>{@link Host} The host being added.</li>
* </ul>
* @event Client#hostAdd
*/
/**
* Emitted when a host is removed from the cluster
* <ul>
* <li>{@link Host} The host being removed.</li>
* </ul>
* @event Client#hostRemove
*/
/**
* Emitted when a host in the cluster changed status from down to up.
* <ul>
* <li>{@link Host host} The host that changed the status.</li>
* </ul>
* @event Client#hostUp
*/
/**
* Emitted when a host in the cluster changed status from up to down.
* <ul>
* <li>{@link Host host} The host that changed the status.</li>
* </ul>
* @event Client#hostDown
*/
/**
* Tries to connect to one of the [contactPoints]{@link ClientOptions} and discovers the rest the nodes of the cluster.
* <p>
* If a <code>callback</code> is provided, it will invoke the callback when the client is connected. Otherwise,
* it will return a <code>Promise</code>.
* </p>
* <p>
* If the {@link Client} is already connected, it invokes callback immediately (when provided) or the promise is
* fulfilled .
* </p>
* @example <caption>Callback-based execution</caption>
* client.connect(function (err) {
* if (err) return console.error(err);
* console.log('Connected to cluster with %d host(s): %j', client.hosts.length, client.hosts.keys());
* });
* @example <caption>Promise-based execution</caption>
* await client.connect();
* @param {function} [callback] The callback is invoked when the pool is connected it failed to connect.
*/
Client.prototype.connect = function (callback) {
return utils.promiseWrapper.call(this, this.options, callback, false, this._connectCb);
};
/**
* @param {Function} callback
* @private
*/
Client.prototype._connectCb = function (callback) {
if (this.connected) {
return callback();
}
if (this.isShuttingDown) {
//it is being shutdown, don't allow further calls to connect()
return callback(new errors.NoHostAvailableError(null, 'Connecting after shutdown is not supported'));
}
this.once('connected', callback);
if (this.connecting) {
//the listener to connect was added, move on
return;
}
this.connecting = true;
var self = this;
utils.series([
function initControlConnection(next) {
self.controlConnection.init(next);
},
function initLoadBalancingPolicy(next) {
self.hosts = self.controlConnection.hosts;
self._setHostListeners();
self.profileManager.init(self, self.hosts, next);
},
function setKeyspace(next) {
if (!self.keyspace) {
return next();
}
self._setKeyspace(next);
},
function setPoolOptionsAndWarmup(next) {
//Set the pooling options depending on the protocol version
var coreConnectionsPerHost = clientOptions.coreConnectionsPerHostV3;
if (self.controlConnection.protocolVersion < 3) {
coreConnectionsPerHost = clientOptions.coreConnectionsPerHostV2;
}
self.options.pooling = utils.deepExtend({}, { coreConnectionsPerHost: coreConnectionsPerHost }, self.options.pooling);
if (!self.options.pooling.warmup) {
return next();
}
self._warmup(next);
}
], function connectFinished(err) {
self.connected = !err;
self.connecting = false;
self.emit('connected', err);
if (self.connected) {
// Set the distance of the control connection host relatively to this instance
self.profileManager.getDistance(self.controlConnection.host);
}
});
};
/**
* Executes a query on an available connection.
* <p>
* If a <code>callback</code> is provided, it will invoke the callback when the execution completes. Otherwise,
* it will return a <code>Promise</code>.
* </p>
* <p>The query can be prepared (recommended) or not depending on {@link QueryOptions}.prepare flag.</p>
* <p>
* Some executions failures can be handled transparently by the driver, according to the
* [RetryPolicy]{@link module:policies/retry~RetryPolicy} defined at {@link ClientOptions} or {@link QueryOptions}
* level.
* </p>
* @param {String} query The query to execute.
* @param {Array|Object} [params] Array of parameter values or an associative array (object) containing parameter names
* as keys and its value.
* @param {QueryOptions} [options] The query options for the execution.
* @param {ResultCallback} [callback] Executes callback(err, result) when execution completed. When not defined, the
* method will return a promise.
* @example <caption>Callback-based API</caption>
* const query = 'SELECT name, email FROM users WHERE id = ?';
* client.execute(query, [ id ], { prepare: true }, function (err, result) {
* assert.ifError(err);
* const row = result.first();
* console.log('%s: %s', row.name, row.email);
* });
* @example <caption>Promise-based API, using async/await</caption>
* const query = 'SELECT name, email FROM users WHERE id = ?';
* const result = await client.execute(query, [ id ], { prepare: true });
* const row = result.first();
* console.log('%s: %s', row.name, row.email);
* @see {@link ExecutionProfile} to reuse a set of options across different query executions.
*/
Client.prototype.execute = function (query, params, options, callback) {
// set default argument values for optional parameters
callback = callback || (options ? options : params);
if (typeof callback === 'function') {
params = typeof params !== 'function' ? params : null;
}
return utils.promiseWrapper.call(this, this.options, callback, false, function handler(cb) {
options = clientOptions.createQueryOptions(this, options);
this._innerExecute(query, params, options, cb);
});
};
/**
* Executes the query and calls rowCallback for each row as soon as they are received. Calls final callback after all
* rows have been sent, or when there is an error.
* <p>
* The query can be prepared (recommended) or not depending on {@link QueryOptions}.prepare flag. Retries on multiple
* hosts if needed.
* </p>
* @param {String} query The query to execute
* @param {Array|Object} [params] Array of parameter values or an associative array (object) containing parameter names
* as keys and its value.
* @param {QueryOptions} [options]
* @param {function} rowCallback Executes <code>rowCallback(n, row)</code> per each row received, where n is the row
* index and row is the current Row.
* @param {function} [callback] Executes <code>callback(err, result)</code> after all rows have been received.
* <p>
* When dealing with paged results, [ResultSet#nextPage()]{@link module:types~ResultSet#nextPage} method can be used
* to retrieve the following page. In that case, <code>rowCallback()</code> will be again called for each row and
* the final callback will be invoked when all rows in the following page has been retrieved.
* </p>
* @example <caption>Using per-row callback and arrow functions</caption>
* client.eachRow(query, params, { prepare: true }, (n, row) => console.log(n, row), err => console.error(err));
* @example <caption>Overloads</caption>
* client.eachRow(query, rowCallback);
* client.eachRow(query, params, rowCallback);
* client.eachRow(query, params, options, rowCallback);
* client.eachRow(query, params, rowCallback, callback);
* client.eachRow(query, params, options, rowCallback, callback);
*/
Client.prototype.eachRow = function (query, params, options, rowCallback, callback) {
if (!callback && rowCallback && typeof options === 'function') {
callback = utils.bindDomain(rowCallback);
rowCallback = utils.bindDomain(options);
}
else {
callback = utils.bindDomain(callback || utils.noop);
rowCallback = utils.bindDomain(rowCallback || options || params);
}
params = typeof params !== 'function' ? params : null;
options = clientOptions.createQueryOptions(this, options, rowCallback);
var self = this;
var rowLength = 0;
function nextPage() {
self._innerExecute(query, params, options, pageCallback);
}
function pageCallback (err, result) {
if (err) {
return callback(err);
}
// Next requests in case paging (auto or explicit) is used
rowLength += result.rowLength;
if (result.meta && result.meta.pageState) {
// Use new page state as next request page state
options.pageState = result.meta.pageState;
if (options.autoPage) {
// Issue next request for the next page
return nextPage();
}
// Allows for explicit (manual) paging, in case the caller needs it
result.nextPage = nextPage;
}
// Finished auto-paging
result.rowLength = rowLength;
callback(null, result);
}
this._innerExecute(query, params, options, pageCallback);
};
/**
* Executes the query and pushes the rows to the result stream
* as soon as they received.
* Calls callback after all rows have been sent, or when there is an error.
* <p>
* The stream is a [Readable Streams2]{@link http://nodejs.org/api/stream.html#stream_class_stream_readable} object
* that contains the raw bytes of the field value.
* It can be piped downstream and provides automatic pause/resume logic (it buffers when not read).
* </p>
* <p>
* The query can be prepared (recommended) or not depending on {@link QueryOptions}.prepare flag. Retries on multiple
* hosts if needed.
* </p>
* @param {String} query The query to prepare and execute
* @param {Array|Object} [params] Array of parameter values or an associative array (object) containing parameter names
* as keys and its value
* @param {QueryOptions} [options]
* @param {function} [callback], executes callback(err) after all rows have been received or if there is an error
* @returns {types.ResultStream}
*/
Client.prototype.stream = function (query, params, options, callback) {
callback = utils.bindDomain(callback || utils.noop);
// NOTE: the nodejs stream maintains yet another internal buffer
// we rely on the default stream implementation to keep memory
// usage reasonable.
var resultStream = new types.ResultStream({ objectMode: 1 });
function onFinish(err, result) {
if (err) {
resultStream.emit('error', err);
}
if (result && result.nextPage ) {
// allows for throttling as per the
// default nodejs stream implementation
resultStream._valve(function pageValve() {
try {
result.nextPage();
}
catch( ex ) {
resultStream.emit('error', ex );
}
});
return;
}
// Explicitly dropping the valve (closure)
resultStream._valve(null);
resultStream.add(null);
callback(err);
}
var sync = true;
this.eachRow(query, params, options, function rowCallback(n, row) {
resultStream.add(row);
}, function eachRowFinished(err, result) {
if (sync) {
// Prevent sync callback
return setImmediate(function eachRowFinishedImmediate() {
onFinish(err, result);
});
}
onFinish(err, result);
});
sync = false;
return resultStream;
};
/**
* Executes batch of queries on an available connection to a host.
* <p>
* If a <code>callback</code> is provided, it will invoke the callback when the execution completes. Otherwise,
* it will return a <code>Promise</code>.
* </p>
* @param {Array.<string>|Array.<{query, params}>} queries The queries to execute as an Array of strings or as an array
* of object containing the query and params
* @param {QueryOptions} [options]
* @param {ResultCallback} [callback] Executes callback(err, result) when the batch was executed
*/
Client.prototype.batch = function (queries, options, callback) {
callback = callback || options;
return utils.promiseWrapper.call(this, this.options, callback, false, function handler(cb) {
this._batchCb(queries, options, cb);
});
};
/**
* @param {Array.<string>|Array.<{query, params}>}queries
* @param {QueryOptions} options
* @param {ResultCallback} callback
* @private
*/
Client.prototype._batchCb = function (queries, options, callback) {
var self = this;
queries = validateBatchQueries(queries);
options = clientOptions.createQueryOptions(this, options, null, true);
if (options.message && options instanceof Error) {
return callback(options);
}
this.connect(function afterConnect(err) {
if (err) {
return callback(err);
}
if (options.prepare) {
//Batch of prepared statements
return self._batchPrepared(queries, options, callback);
}
//Batch of simple statements
self._sendBatch(queries, options, callback);
});
};
/**
* Gets the host list representing the replicas that contain such partition.
* @param {String} keyspace
* @param {Buffer} token
* @returns {Array}
*/
Client.prototype.getReplicas = function (keyspace, token) {
return this.metadata.getReplicas(keyspace, token);
};
Client.prototype.log = utils.log;
/**
* Closes all connections to all hosts.
* <p>
* If a <code>callback</code> is provided, it will invoke the callback when the client is disconnected. Otherwise,
* it will return a <code>Promise</code>.
* </p>
* @param {Function} [callback] Optional callback to be invoked when finished closing all connections.
*/
Client.prototype.shutdown = function (callback) {
return utils.promiseWrapper.call(this, this.options, callback, true, this._shutdownCb);
};
/**
* @param {Function} callback
* @private
*/
Client.prototype._shutdownCb = function (callback) {
var self = this;
function doShutdown() {
self.connected = false;
self.isShuttingDown = true;
var hosts = self.hosts.values();
self.controlConnection.shutdown();
// go through all the host and shut down their pools
utils.each(hosts, function (h, next) {
h.shutdown(false, next);
}, callback);
}
this.log('info', 'Shutting down');
callback = callback || utils.noop;
if (!this.hosts || !this.connected) {
// not initialized
this.connected = false;
return callback();
}
if (this.connecting) {
this.log('warning', 'Shutting down while connecting');
// wait until finish connecting for easier troubleshooting
return this.once('connected', doShutdown);
}
doShutdown();
};
/**
* Waits until that the schema version in all nodes is the same or the waiting time passed.
* @param {Connection} connection
* @param {Function} callback
* @ignore
*/
Client.prototype._waitForSchemaAgreement = function (connection, callback) {
if (this.hosts.length === 1) {
return setImmediate(callback);
}
var self = this;
var start = new Date();
var maxWaitTime = this.options.protocolOptions.maxSchemaAgreementWaitSeconds * 1000;
this.log('info', 'Waiting for schema agreement');
var versionsMatch;
var peerVersions;
utils.whilst(function condition() {
return !versionsMatch && (new Date() - start) < maxWaitTime;
}, function fn(next) {
utils.series([
function (next) {
self.metadata.getPeersSchemaVersions(connection, function (err, result) {
peerVersions = result;
next(err);
});
},
function (next) {
self.metadata.getLocalSchemaVersion(connection, next);
}
], function seriesEnded(err, localVersion) {
if (err) {
return next(err);
}
//check the different versions
versionsMatch = true;
localVersion = localVersion.toString();
for (var i = 0; i < peerVersions.length; i++) {
if (peerVersions[i].toString() !== localVersion) {
versionsMatch = false;
break;
}
}
if (versionsMatch) {
self.log('info', 'Schema versions match');
}
//let some time pass before the next check
setTimeout(next, 500);
});
}, callback);
};
/**
* Waits for schema agreements and schedules schema metadata refresh.
* @param {Connection} connection
* @param event
* @param {Function} callback
* @ignore
* @internal
*/
Client.prototype.handleSchemaAgreementAndRefresh = function (connection, event, callback) {
var self = this;
this._waitForSchemaAgreement(connection, function agreementCb(err) {
if (err) {
//we issue a warning but we continue with the normal flow
self.log('warning', 'There was an error while waiting for the schema agreement between nodes', err);
}
if (!self.options.isMetadataSyncEnabled) {
return callback();
}
// schedule metadata refresh immediately and the callback will be invoked once it was refreshed
self.controlConnection.handleSchemaChange(event, true, callback);
});
};
/**
* Connects and handles the execution of prepared and simple statements. All parameters are mandatory.
* @param {string} query
* @param {Array} params
* @param {Object} options Options, contained already all the required QueryOptions.
* @param {Function} callback
* @private
*/
Client.prototype._innerExecute = function (query, params, options, callback) {
// Use Error#message property because is faster than checking prototypes
if (options.message && options instanceof Error) {
return callback(options);
}
if (options.prepare) {
return this._executeAsPrepared(query, params, options, callback);
}
var self = this;
utils.series([
function connecting(next) {
self.connect(next);
},
function settingOptions(next) {
self._setQueryOptions(options, params, null, function setOptionsCallback(err, p) {
params = p;
next(err);
});
},
function sendingQuery(next) {
var request = new requests.QueryRequest(
query,
params,
options);
var handler = new RequestHandler(self, options.executionProfile.loadBalancing, options.retry);
handler.send(request, options, next);
}
], callback);
};
/**
* Prepares (the first time) and executes the prepared query, retrying on multiple hosts if needed.
* @param {String} query The query to prepare and execute
* @param {Array|Object} params Array of params or params object with the name as keys
* @param {Object} options
* @param {ResultCallback} callback Executes callback(err, result) when finished
* @private
*/
Client.prototype._executeAsPrepared = function (query, params, options, callback) {
var queryId;
var meta;
var self = this;
utils.series([
function connecting(next) {
self.connect(next);
},
function preparing(next) {
self._getPrepared(query, options, function (err, id, m) {
queryId = id;
meta = m;
next(err);
});
},
function settingOptions(next) {
self._setQueryOptions(options, params, meta, function (err, p) {
params = p;
next(err);
});
},
function sendingExecute(next) {
var request = new requests.ExecuteRequest(
query,
queryId,
params,
options);
request.query = query;
var handler = new RequestHandler(self, options.executionProfile.loadBalancing, options.retry);
handler.send(request, options, next);
}
], callback);
};
/**
* Prepares the queries and then executes the batch.
* @param {Array.<{query, params}>} queries Array of object instances containing query and params properties.
* @param {Object} options
* @param {ResultCallback} callback Executes callback(err, result) when the batch was executed
* @private
*/
Client.prototype._batchPrepared = function (queries, options, callback) {
var self = this;
queries = queries.map(function batchQueryMap(item) {
return { info: self.metadata.getPreparedInfo(self.keyspace, item.query), query: item.query, params: item.params};
});
//Identify the query that are being prepared and wait for it
this._waitForPendingPrepares(queries, function afterWait(err, toPrepare) {
if (err) {
return callback(err);
}
var queriesToPrepare = Object.keys(toPrepare);
if (queriesToPrepare.length === 0) {
//The ones that were being prepared are now prepared
return self._sendBatch(queries, options, callback);
}
//Prepare the pending
var callbacksArray = new Array(queriesToPrepare.length);
queriesToPrepare.forEach(function (query, i) {
var info = toPrepare[query];
info.preparing = true;
callbacksArray[i] = function prepareCallback(err, response) {
info.preparing = false;
if (err) {
return info.emit('prepared', err);
}
info.queryId = response.id;
info.meta = response.meta;
info.emit('prepared', null, info.queryId, info.meta);
};
});
var handler = new RequestHandler(self, options.executionProfile.loadBalancing, options.retry);
//Prepare the queries that are not already prepared on a single host
handler.prepareMultiple(queriesToPrepare, callbacksArray, options, function (err) {
if (err) {
return callback(err);
}
return self._sendBatch(queries, options, callback);
});
});
};
/** @private */
Client.prototype._sendBatch = function (queries, options, callback) {
var request = new requests.BatchRequest(queries, options);
var handler = new RequestHandler(this, options.executionProfile.loadBalancing, options.retry);
handler.send(request, options, callback);
};
/**
* Waits for all pending prepared queries to be prepared and callbacks with the queries to prepare
* @param {Array} queries
* @param {Function} callback
* @private
*/
Client.prototype._waitForPendingPrepares = function (queries, callback) {
function doWait(queriesMap) {
var toPrepare = {};
var pendingIO = false;
utils.each(Object.keys(queriesMap), function waitIterator(query, next) {
var info = queriesMap[query];
if (info.queryId) {
//Its already prepared
return next();
}
if (info.preparing) {
//it is already being prepared
pendingIO = true;
return info.once('prepared', next);
}
toPrepare[query] = info;
next();
}, function waitFinished(err) {
if (err) {
//There was an error with the queries being prepared
return callback(err);
}
if (pendingIO) {
//There was IO between the last call
//it is possible that queries marked to prepare are being prepared
//iterate again until we have the filtered list of items to prepare
return setImmediate(function pendingIOCallback() {
doWait(toPrepare);
});
}
callback(null, toPrepare);
});
}
var queriesMap = {};
queries.forEach(function (item) {
queriesMap[item.query] = item.info;
});
doWait(queriesMap);
};
/**
* Parses and validates the arguments received by executeBatch
* @private
*/
function validateBatchQueries(queries) {
if (!util.isArray(queries)) {
throw new errors.ArgumentError('The first argument must be an Array of queries.');
}
if (queries.length === 0) {
throw new errors.ArgumentError('The Array of queries to batch execute can not be empty');
}
var parsedQueries = new Array(queries.length);
for (var i = 0; i < queries.length; i++) {
var item = queries[i];
if (!item) {
throw new errors.ArgumentError(util.format('Invalid query at index %d', i));
}
var query = item.query;
if (typeof item === 'string') {
query = item;
}
if (!query) {
throw new errors.ArgumentError(util.format('Invalid query at index %d', i));
}
parsedQueries[i] = { query: query, params: item.params};
}
return parsedQueries;
}
/**
* It returns the id of the prepared query.
* If its not prepared, it prepares the query.
* If its being prepared, it queues the callback
* @param {String} query Query to prepare with ? or :param_name as parameter placeholders
* @param {Object} options Execution query options
* @param {function} callback Executes callback(err, queryId) when there is a prepared statement on a connection or
* there is an error.
* @private
*/
Client.prototype._getPrepared = function (query, options, callback) {
var info = this.metadata.getPreparedInfo(this.keyspace, query);
if (info.queryId) {
return callback(null, info.queryId, info.meta);
}
info.once('prepared', callback);
if (info.preparing) {
//it is already being prepared
return;
}
info.preparing = true;
var request = new requests.PrepareRequest(query);
var handler = new RequestHandler(this, options.executionProfile.loadBalancing, options.retry);
handler.send(request, null, function (err, result) {
info.preparing = false;
if (err) {
err.query = query;
return info.emit('prepared', err);
}
info.queryId = result.id;
info.meta = result.meta;
info.emit('prepared', null, info.queryId, info.meta);
});
};
/**
* Sets the keyspace in a connection that is already opened.
* @param {Function} callback
* @private
*/
Client.prototype._setKeyspace = function (callback) {
var handler = new RequestHandler(this, this.options.policies.loadBalancing, this.options.policies.retry);
handler.setKeyspace(callback);
};
/**
* Sets the listeners for the nodes.
* @private
*/
Client.prototype._setHostListeners = function () {
var self = this;
function getHostUpListener(emitter, h) {
return (function hostUpListener() {
emitter.emit('hostUp', h);
});
}
function getHostDownListener(emitter, h) {
return (function hostDownListener() {
emitter.emit('hostDown', h);
});
}
//Add status listeners when new nodes are added and emit hostAdd
this.hosts.on('add', function hostAddedListener(h) {
h.on('up', getHostUpListener(self, h));
h.on('down', getHostDownListener(self, h));
self.emit('hostAdd', h);
});
//Remove all listeners and emit hostRemove
this.hosts.on('remove', function hostRemovedListener(h) {
h.removeAllListeners();
self.emit('hostRemove', h);
});
//Add status listeners for existing hosts
this.hosts.forEach(function (h) {
h.on('up', getHostUpListener(self, h));
h.on('down', getHostDownListener(self, h));
});
};
Client.prototype._warmup = function (callback) {
var self = this;
var hosts = this.hosts.values();
utils.timesLimit(hosts.length, warmupLimit, function warmupEachCallback(i, next) {
var h = hosts[i];
var distance = self.profileManager.getDistance(h);
if (distance !== types.distance.local) {
//do not warmup pool for remote or ignored hosts
return next();
}
h.warmupPool(function (err) {
if (err) {
//An error while trying to create a connection
//To 1 host is not an issue, warn the user and move on
self.log('warning', util.format('Connection pool to host %s could not be created: %s', h.address, err));
}
next();
});
}, callback);
};
/**
* @returns {Encoder}
* @private
*/
Client.prototype._getEncoder = function () {
var encoder;
encoder = this.controlConnection.getEncoder();
if (!encoder) {
throw new errors.DriverInternalError('Encoder is not defined');
}
return encoder;
};
/**
* Validates the values and sets the default values for the {@link QueryOptions} to be used in the query.
* @param {QueryOptions} options Options specified by the user
* @param params
* @param [meta] Prepared statement metadata
* @param {Function} callback
* @private
*/
Client.prototype._setQueryOptions = function (options, params, meta, callback) {
var protocolVersion = this.controlConnection.protocolVersion;
if (!options.prepare && params && !util.isArray(params) && protocolVersion < 3) {
//Only Cassandra 2.1 and above supports named parameters
return callback(new errors.ArgumentError('Named parameters for simple statements are not supported, use prepare flag'));
}
var paramsInfo;
var self = this;
utils.series([
function fillRoutingKeys(next) {
if (options.routingKey || options.routingIndexes || options.routingNames || !meta) {
//it is filled by the user
//or it is not prepared
return next();
}
if (util.isArray(meta.partitionKeys)) {
//the partition keys are provided as part of the metadata
options.routingIndexes = meta.partitionKeys;
return next();
}
self.metadata.getTable(meta.keyspace, meta.table, function (err, tableInfo) {
if (err) {
self.log('warning', util.format('Table %s.%s metadata could not be retrieved', meta.keyspace, meta.table));
//execute without a routing key
return next();
}
if (!tableInfo) {
//The data is not there, maybe it is being recreated
return next();
}
options.routingIndexes = tableInfo.partitionKeys.map(function (c) {
return meta.columnsByName[c.name];
});
//Skip parsing metadata next time
meta.partitionKeys = options.routingIndexes;
next();
});
},
function adaptParameterNames(next) {
try {
if (options.prepare) {
paramsInfo = utils.adaptNamedParamsPrepared(params, meta.columns);
//Add the type information provided by the prepared metadata
options.hints = meta.columns.map(function (c) {
return c.type;
});
}
else {
paramsInfo = utils.adaptNamedParamsWithHints(params, options);
}
}
catch (err) {
return next(err);
}
next();
},
function adaptParameterTypes(next) {
if (options.prepare || !util.isArray(options.hints)) {
return next();
}
//Only not prepared with hints
//Adapting user hints is an async op
self.metadata.adaptUserHints(self.keyspace, options.hints, next);
}
], function finishSettingOptions(err) {
if (err) {
//There was an error setting the query options
return callback(err);
}
try {
if (typeof options.pageState === 'string') {
//pageState can be a hex string
options.pageState = new Buffer(options.pageState, 'hex');
}
//noinspection JSAccessibilityCheck
self._getEncoder().setRoutingKey(paramsInfo.params, options, paramsInfo.keys);
}
catch (err) {
return callback(err);
}
callback(null, paramsInfo.params);
});
};
/**
* Callback used by execution methods.
* @callback ResultCallback
* @param {Error} err Error occurred in the execution of the query.
* @param {ResultSet} [result] Result of the execution of the query.
*/
module.exports = Client;