Repository URL to install this package:
|
Version:
3.2.0 ▾
|
"use strict";
var util = require('util');
var events = require('events');
var utils = require('./utils');
var types = require('./types');
var HostConnectionPool = require('./host-connection-pool');
/**
* Creates a new Host instance.
* @classdesc
* Represents a Cassandra node.
* @extends EventEmitter
* @constructor
*/
function Host(address, protocolVersion, options) {
events.EventEmitter.call(this);
/**
* Gets ip address and port number of the node separated by `:`.
* @type {String}
*/
this.address = address;
this.setDownAt = 0;
Object.defineProperty(this, 'options', { value: options, enumerable: false, writable: false});
/**
* @type {HostConnectionPool}
*/
Object.defineProperty(this, 'pool', { value: new HostConnectionPool(this, protocolVersion), enumerable: false});
var self = this;
this.pool.on('open', this._onNewConnectionOpen.bind(this));
this.pool.on('remove', function onConnectionRemovedFromPool() {
self._checkPoolState();
});
/**
* Gets string containing the Cassandra version.
* @type {String}
*/
this.cassandraVersion = null;
/**
* Gets data center name of the node.
* @type {String}
*/
this.datacenter = null;
/**
* Gets rack name of the node.
* @type {String}
*/
this.rack = null;
/**
* Gets the tokens assigned to the node.
* @type {Array}
*/
this.tokens = null;
// the distance as last set using the load balancing policy
this._distance = types.distance.ignored;
this.reconnectionSchedule = this.options.policies.reconnection.newSchedule();
}
util.inherits(Host, events.EventEmitter);
/**
* Marks this host as not available for query coordination.
* @internal
* @ignore
*/
Host.prototype.setDown = function() {
// multiple events signaling that a host is failing could cause multiple calls to this method
if (this.setDownAt !== 0) {
// the host is already marked as Down
return;
}
if (this.pool.isClosing()) {
// the pool is being closed/shutdown, don't mind
return;
}
this.setDownAt = Date.now();
if (this._distance !== types.distance.ignored) {
this.log('warning',
util.format('Host %s considered as DOWN. Reconnection delay %dms.', this.address, this.reconnectionDelay));
}
else {
this.log('info', util.format('Host %s considered as DOWN.', this.address));
}
this.emit('down');
this._checkPoolState();
};
/**
* Marks this host as available for querying.
* @param {Boolean} [clearReconnection]
* @internal
* @ignore
*/
Host.prototype.setUp = function (clearReconnection) {
if (!this.setDownAt) {
//The host is already marked as UP
return;
}
this.log('info', util.format('Setting host %s as UP', this.address));
this.setDownAt = 0;
//if it was unhealthy and now it is not, lets reset the reconnection schedule.
this.reconnectionSchedule = this.options.policies.reconnection.newSchedule();
if (clearReconnection) {
this.pool.clearNewConnectionAttempt();
}
this.emit('up');
};
/**
* Resets the reconnectionSchedule and tries to issue a reconnection immediately.
* @internal
* @ignore
*/
Host.prototype.checkIsUp = function () {
if (this.isUp()) {
return;
}
this.reconnectionSchedule = this.options.policies.reconnection.newSchedule();
this.reconnectionDelay = 0;
this.pool.attemptNewConnectionImmediate();
};
/**
* @param {Boolean} waitForPending When true, it waits for in-flight operations to be finish before closing the
* connections.
* @param {Function} [callback]
* @internal
* @ignore
*/
Host.prototype.shutdown = function (waitForPending, callback) {
callback = callback || utils.noop;
if (waitForPending) {
this.pool.drainAndShutdown();
// Gracefully draining and shutting down the pool is being done in the background, it's not required
// for the shutting down to be over to callback
return callback();
}
this.pool.shutdown(callback);
};
/**
* Determines if the node is UP now (seen as UP by the driver).
* @returns {boolean}
*/
Host.prototype.isUp = function () {
return !this.setDownAt;
};
/**
* Determines if the host can be considered as UP
* @returns {boolean}
*/
Host.prototype.canBeConsideredAsUp = function () {
var self = this;
function hasTimePassed() {
return new Date().getTime() - self.setDownAt >= self.reconnectionDelay;
}
return !this.setDownAt || hasTimePassed();
};
/**
* Sets the distance of the host relative to the client using the load balancing policy.
* @param {Number} distance
* @internal
* @ignore
*/
Host.prototype.setDistance = function (distance) {
var previousDistance = this._distance;
this._distance = distance || types.distance.local;
if (this.options.pooling.coreConnectionsPerHost) {
this.pool.coreConnectionsLength = this.options.pooling.coreConnectionsPerHost[this._distance] || 0;
}
else {
this.pool.coreConnectionsLength = 1;
}
if (this._distance === previousDistance) {
return this._distance;
}
if (this._distance === types.distance.ignored) {
// this host was local/remote and now must be ignored
this.emit('ignore');
this.pool.drainAndShutdown();
}
else if (!this.isUp()) {
this.checkIsUp();
}
return this._distance;
};
/**
* Changes the protocol version of a given host
* @param {Number} value
* @internal
* @ignore
*/
Host.prototype.setProtocolVersion = function (value) {
this.pool.protocolVersion = value;
};
/**
* It gets an open connection to the host.
* If there isn't an available connections, it will open a new one according to the pooling options.
* @param {Function} callback
* @internal
* @ignore
*/
Host.prototype.borrowConnection = function (callback) {
this.pool.borrowConnection(callback);
};
/**
* Creates all the connection in the pool.
* @param {Function} callback
* @internal
* @ignore
*/
Host.prototype.warmupPool = function (callback) {
this.pool.create(true, callback);
};
/**
* Gets any connection that is already opened or null if not found.
* @returns {Connection}
* @internal
* @ignore
*/
Host.prototype.getActiveConnection = function () {
if (!this.isUp() || !this.pool.connections.length) {
return null;
}
return this.pool.connections[0];
};
/**
* Checks the health of a connection in the pool
* @param {Connection} connection
* @internal
* @ignore
*/
Host.prototype.checkHealth = function (connection) {
if (connection.timedOutHandlers <= this.options.socketOptions.defunctReadTimeoutThreshold) {
return;
}
this.removeFromPool(connection);
};
/**
* @param {Connection} connection
* @internal
* @ignore
*/
Host.prototype.removeFromPool = function (connection) {
this.pool.remove(connection);
this._checkPoolState();
};
/**
* Validates that the internal state of the connection pool.
* If the pool size is smaller than expected, schedule a new connection attempt.
* If the amount of connections is 0 for not ignored hosts, the host must be down.
* @private
*/
Host.prototype._checkPoolState = function () {
if (this.pool.isClosing()) {
return;
}
if (this.pool.connections.length < this.pool.coreConnectionsLength) {
// the pool still needs to grow
if (!this.pool.hasScheduledNewConnection()) {
this.reconnectionDelay = this.reconnectionSchedule.next().value;
this.pool.scheduleNewConnectionAttempt(this.reconnectionDelay);
}
}
if (this._distance !== types.distance.ignored &&
this.pool.connections.length === 0 &&
this.pool.coreConnectionsLength > 0) {
this.setDown();
}
};
/**
* Executed after an scheduled new connection attempt finished
* @private
*/
Host.prototype._onNewConnectionOpen = function (err) {
if (err) {
this._checkPoolState();
return;
}
this.setUp();
this.pool.increaseSize();
};
/**
* Returns an array containing the Cassandra Version as an Array of Numbers having the major version in the first
* position.
* @returns {Array.<Number>}
*/
Host.prototype.getCassandraVersion = function () {
if (!this.cassandraVersion) {
return utils.emptyArray;
}
return this.cassandraVersion.split('-')[0].split('.').map(function eachMap(x) {
return parseInt(x, 10);
});
};
Host.prototype.log = utils.log;
/**
* Represents an associative-array of {@link Host hosts} that can be iterated.
* It creates an internal copy when adding or removing, making it safe to iterate using the values() method within async operations.
* @extends events.EventEmitter
* @constructor
*/
function HostMap() {
events.EventEmitter.call(this);
this._items = {};
this._values = null;
Object.defineProperty(this, 'length', { get: function () { return this.values().length; }, enumerable: true });
}
util.inherits(HostMap, events.EventEmitter);
/**
* Emitted when a host is added to the map
* @event HostMap#add
*/
/**
* Emitted when a host is removed from the map
* @event HostMap#remove
*/
/**
* Executes a provided function once per map element.
* @param callback
*/
HostMap.prototype.forEach = function (callback) {
//Use a new reference, allowing the map to be modified.
var items = this._items;
for (var key in items) {
if (!items.hasOwnProperty(key)) {
continue;
}
callback(items[key], key);
}
};
/**
* Gets a {@link Host host} by key or undefined if not found.
* @param {String} key
* @returns {Host}
*/
HostMap.prototype.get = function (key) {
return this._items[key];
};
/**
* Returns an array of host addresses.
* @returns {Array.<String>}
*/
HostMap.prototype.keys = function () {
return Object.keys(this._items);
};
/**
* Removes an item from the map.
* @param {String} key The key of the host
* @fires HostMap#remove
*/
HostMap.prototype.remove = function (key) {
if (!this._items.hasOwnProperty(key)) {
//it's not part of it, do nothing
return;
}
//clear cache
this._values = null;
//copy the values
var copy = utils.extend({}, this._items);
var h = copy[key];
delete copy[key];
this._items = copy;
this.emit('remove', h);
};
/**
* Removes multiple hosts from the map.
* @param {Array.<String>} keys
* @fires HostMap#remove
*/
HostMap.prototype.removeMultiple = function (keys) {
//clear value cache
this._values = null;
//copy the values
var copy = utils.extend({}, this._items);
var removedHosts = [];
for (var i = 0; i < keys.length; i++) {
var h = copy[keys[i]];
if (!h) {
continue;
}
removedHosts.push(h);
delete copy[keys[i]];
}
this._items = copy;
removedHosts.forEach(function (h) {
this.emit('remove', h);
}, this);
};
/**
* Adds a new item to the map.
* @param {String} key The key of the host
* @param {Host} value The host to be added
* @fires HostMap#remove
* @fires HostMap#add
*/
HostMap.prototype.set = function (key, value) {
//clear values cache
this._values = null;
var originalValue = this._items[key];
if (originalValue) {
//The internal structure does not change
this._items[key] = value;
//emit a remove followed by a add
this.emit('remove', originalValue);
this.emit('add', value);
return;
}
//copy the values
var copy = utils.extend({}, this._items);
copy[key] = value;
this._items = copy;
this.emit('add', value);
return value;
};
/**
* Returns a shallow copy of a portion of the items into a new array object.
* Backward-compatibility.
* @param {Number} [begin]
* @param {Number} [end]
* @returns {Array}
* @ignore
*/
HostMap.prototype.slice = function (begin, end) {
if (!begin && !end) {
//avoid making a copy of the copy
return this.values();
}
begin = begin || 0;
return this.values().slice(begin, end);
};
//Backward-compatibility
HostMap.prototype.push = HostMap.prototype.set;
/**
* Returns a shallow copy of the values of the map.
* @returns {Array.<Host>}
*/
HostMap.prototype.values = function () {
if (!this._values) {
//cache the values
var values = [];
for (var key in this._items) {
if (!this._items.hasOwnProperty(key)) {
continue;
}
values.push(this._items[key]);
}
this._values = Object.freeze(values);
}
return this._values;
};
HostMap.prototype.inspect = function() {
return this._items;
};
HostMap.prototype.toJSON = function() {
return this._items;
};
exports.Host = Host;
exports.HostMap = HostMap;