<p>var EventEmitter = require('events').EventEmitter
, Stream = require('net').Stream
, Buffer = require('buffer').Buffer;</p>
<p>var HashRing = require('hashring')
, Connection = require('./connection')
, Utils = require('./utils')
, Manager = Connection.Manager
, IssueLog = Connection.IssueLog;</p>
<p>// The constructor
function Client(args, options){
if(!(this && this.hasOwnProperty && (this instanceof Client))) this = new Client();</p>
<p> var servers = []
, weights = {}
, key;</p>
<p> // Parse down the connection arguments<br></br> switch (Object.prototype.toString.call(args)){
case '[object String]':
case '[object Object]':
weights = args;
servers = Object.keys(args);
case '[object Array]':
servers = args;
<p> if (!servers.length) throw new Error('No servers where supplied in the arguments');</p>
<p> // merge with global and user config
Utils.merge(this, Client.config);
Utils.merge(this, options);
<p> this.servers = servers;
this.HashRing = new HashRing(args, this.algorithm);
this.connections = {};
this.issues = [];
<p>// Allows users to configure the memcached globally or per memcached client
Client.config = {
maxKeySize: 251 // max key size allowed by Memcached
, maxExpiration: 2592000 // max expiration duration allowed by Memcached
, maxValue: 1048576 // max length of value allowed by Memcached</p>
, algorithm: 'crc32' // hashing algorithm that is used for key mapping
<p>, poolSize: 10 // maximal parallel connections
, reconnect: 18000000 // if dead, attempt reconnect each xx ms
, timeout: 5000 // after x ms the server should send a timeout if we can't connect
, retries: 5 // amount of retries before server is dead
, retry: 30000 // timeout between retries, all call will be marked as cache miss
, remove: false // remove server if dead if false, we will attempt to reconnect
, redundancy: false // allows you do re-distribute the keys over a x amount of servers
, keyCompression: true // compress keys if they are to large (md5)
, debug: false // Output the commands and responses
<p>// There some functions we don't want users to touch so we scope them
const LINEBREAK = '\r\n'
, NOREPLY = ' noreply'
, FLUSH = 1E3
, BUFFER = 1E2
, FLAG<em>JSON = 1<<1
, FLAG</em>BINARY = 2<<1;</p>
<p> var memcached = nMemcached.prototype = new EventEmitter
, private = {}
, undefined;</p>
<p> // Creates or generates a new connection for the give server, the callback will receive the connection
// if the operation was successful
memcached.connect = function connect(server, callback){
// server is dead, bail out
if (server in this.issues && this.issues[server].failed) return callback(false, false);</p>
<pre><code>// fetch from connection pool
if (server in this.connections) return this.connections[server].allocate(callback);
// No connection factory created yet, so we must build one
var serverTokens = /(.*):(\d+){1,}$/.exec(server).reverse()
, memcached = this;
this.connections[server] = new Manager(server, this.poolSize, function(callback){
var S = new Stream
, Manager = this;
// config the Stream
S.metaData = [];
S.responseBuffer = "";
S.bufferArray = [];
S.server = server;
S.tokens = serverTokens;
// Add the event listeners
Utils.fuse(S, {
connect: function streamConnect(){ callback(false, this) }
, close: function streamClose(){ Manager.remove(this) }
, error: function streamError(err){ memcached.connectionIssue(err, S, callback) }
, data: Utils.curry(memcached, private.buffer, S)
, timeout: function streamTimeout(){ Manager.remove(this) }
, end: S.end
// connect the net.Stream [port, hostname]
S.connect.apply(S, serverTokens);
return S;
// now that we have setup our connection factory we can allocate a new connection
<p> // Creates a multi stream, so it's easier to query agains
// multiple memcached servers.
memcached.multi = function memcachedMulti(keys, callback){
var map = {}
, memcached = this
, servers
, i;</p>
<pre><code>// gets all servers based on the supplied keys,
// or just gives all servers if we don't have keys
if (keys){
keys.forEach(function fetchMultipleServers(key){
var server = memcached.HashRing.getNode(key);
if (map[server]){
} else {
map[server] = [key];
// store the servers
servers = Object.keys(map);
} else {
servers = this.servers;
i = servers.length;
callback.call(this, servers[i], map[servers[i]], i, servers.length);
<p> };</p>
<p> // Executes the command on the net.Stream, if no server is supplied it will use the query.key to get
// the server from the HashRing
memcached.command = function memcachedCommand(queryCompiler, server){</p>
<pre><code>// generate a regular query,
var query = queryCompiler()
, redundancy = this.redundancy && this.redundancy < this.servers.length
, queryRedundancy = query.redundancyEnabled
, memcached = this;
// validate the arguments
if (query.validation && !Utils.validateArg(query, this)) return;
// fetch servers
server = server ? server : redundancy && queryRedundancy ? (redundancy = this.HashRing.createRange(query.key, (this.redundancy + 1), true)).shift() : this.HashRing.getNode(query.key);
// check if the server is still alive
if (server in this.issues && this.issues[server].failed) return query.callback && query.callback(false, false);
Client.config.debug && console.log(query.command);
this.connect(server, function allocateMemcachedConnection(error, S){
// check for issues
if (!S) return query.callback && query.callback(false, false);
if (error) return query.callback && query.callback(error);
if (S.readyState !== 'open') return query.callback && query.callback('Connection readyState is set to ' + S.readySate);
// used for request timing
query.start = Date.now();
S.write(query.command + LINEBREAK);
// if we have redundancy enabled and the query is used for redundancy, than we are going loop over
// the servers, check if we can reach them, and connect to the correct net connection.
// because all redundancy queries are executed with "no reply" we do not need to store the callback
// as there will be no value to parse.
if (redundancy && queryRedundancy){
queryRedundancy = queryCompiler(queryRedundancy);
if (server in memcached.issues && memcached.issues[server].failed) return;
memcached.connect(server, function allocateMemcachedConnection(error, S){
if (!S || error || S.readyState !== 'open') return;
S.write(queryRedundancy.command + LINEBREAK);
<p> // Logs all connection issues, and handles them off. Marking all requests as cache misses.
memcached.connectionIssue = function connectionIssue(error, S, callback){
// end connection and mark callback as cache miss
if (S && S.end) S.end();
if (callback) callback(false, false);</p>
<pre><code>var issues
, server = S.server
, memcached = this;
// check for existing issue logs, or create a new log
if (server in this.issues){
issues = this.issues[server];
} else {
issues = this.issues[server] = new IssueLog({
server: server
, tokens: S.tokens
, reconnect: this.reconnect
, retries: this.retries
, retry: this.retry
, remove: this.remove
// proxy the events
Utils.fuse(issues, {
issue: function(details){ memcached.emit('issue', details) }
, failure: function(details){ memcached.emit('failure', details) }
, reconnecting: function(details){ memcached.emit('reconnecting', details) }
, reconnected: function(details){ memcached.emit('reconnect', details) }
, remove: function(details){
// emit event and remove servers
memcached.emit('remove', details);
if (this.failOverServers && this.failOverServers.length){
memcached.HashRing.replaceServer(server, this.failOverServers.shift());
} else {
// log the issue
<p> // Kills all active connections
memcached.end = function endMemcached(){
var memcached = this;
Object.keys(this.connections).forEach(function closeConnection(key){
<p> // These do not need to be publicly available as it's one of the most important
// parts of the whole client, the parser commands:
private.parsers = {
// handle error responses
'NOT<em>FOUND': function(tokens, dataSet, err){ return [CONTINUE, false] }
, 'NOT</em>STORED': function(tokens, dataSet, err){ return [CONTINUE, false] }
, 'ERROR': function(tokens, dataSet, err){ err.push('Received an ERROR response'); return [FLUSH, false] }
, 'CLIENT<em>ERROR': function(tokens, dataSet, err){ err.push(tokens.splice(1).join(' ')); return [CONTINUE, false] }
, 'SERVER</em>ERROR': function(tokens, dataSet, err, queue, S, memcached){ memcached.connectionIssue(tokens.splice(1).join(' '), S); return [CONTINUE, false] }</p>
// keyword based responses
<p> , 'STORED': function(tokens, dataSet){ return [CONTINUE, true] }
, 'DELETED': function(tokens, dataSet){ return [CONTINUE, true] }
, 'OK': function(tokens, dataSet){ return [CONTINUE, true] }
, 'EXISTS': function(tokens, dataSet){ return [CONTINUE, false] }
, 'END': function(tokens, dataSet, err, queue){ if (!queue.length) queue.push(false); return [FLUSH, true] }</p>
// value parsing:
<p> , 'VALUE': function(tokens, dataSet, err, queue){
var key = tokens[1]
, flag = +tokens[2]
, expire = tokens[3]
, cas = tokens[4]
, multi = this.metaData[0] && this.metaData[0].multi || cas ? {} : false
, tmp;</p>
<pre><code> switch (flag){
dataSet = JSON.parse(dataSet);
tmp = new Buffer(dataSet.length);
tmp.write(dataSet, 0, 'binary');
dataSet = tmp;
// Add to queue as multiple get key key key key key returns multiple values
if (!multi){
} else {
multi[key] = dataSet;
if (cas) multi.cas = cas;
return [BUFFER, false]
<p> , 'INCRDECR': function(tokens){ return [CONTINUE, +tokens[1]] }
, 'STAT': function(tokens, dataSet, err, queue){
queue.push([tokens[1], /^\d+$/.test(tokens[2]) ? +tokens[2] : tokens[2]]);
return [BUFFER, true]
, 'VERSION': function(tokens, dataSet){
var versionTokens = /(\d+)(?:.)(\d+)(?:.)(\d+)$/.exec(tokens.pop());</p>
<pre><code> return [CONTINUE, {
server: this.server
, version: versionTokens[0]
, major: versionTokens[1] || 0
, minor: versionTokens[2] || 0
, bugfix: versionTokens[3] || 0
<p> , 'ITEM': function(tokens, dataSet, err, queue){
key: tokens[1]
, b: +tokens[2].substr(1)
, s: +tokens[4]
return [BUFFER, false]
<p> // Parses down result sets
private.resultParsers = {
// combines the stats array, in to an object
'stats': function(resultSet){
var response = {};</p>
<pre><code> // add references to the retrieved server
response.server = this.server;
// Fill the object
response[statSet[0]] = statSet[1];
return response;
// the settings uses the same parse format as the regular stats
<p> , 'stats settings': function(){ return private.resultParsers.stats.apply(this, arguments) }
// Group slabs by slab id
, 'stats slabs': function(resultSet){
var response = {};</p>
<pre><code> // add references to the retrieved server
response.server = this.server;
// Fill the object
var identifier = statSet[0].split(':');
if (!response[identifier[0]]) response[identifier[0]] = {};
response[identifier[0]][identifier[1]] = statSet[1];
return response;
<p> , 'stats items': function(resultSet){
var response = {};</p>
<pre><code> // add references to the retrieved server
response.server = this.server;
// Fill the object
var identifier = statSet[0].split(':');
if (!response[identifier[1]]) response[identifier[1]] = {};
response[identifier[1]][identifier[2]] = statSet[1];
return response;
<p> // Generates a RegExp that can be used to check if a chunk is memcached response identifier
private.allCommands = new RegExp('^(?:' + Object.keys(private.parsers).join('|') + '|\d' + ')');
private.bufferedCommands = new RegExp('^(?:' + Object.keys(private.parsers).join('|') + ')');</p>
<p> // When working with large chunks of responses, node chunks it in to pieces. So we might have
// half responses. So we are going to buffer up the buffer and user our buffered buffer to query
// against. Also when you execute allot of .writes to the same stream, node will combine the responses
// in to one response stream. With no indication where it had cut the data. So it can be it cuts inside the value response,
// or even right in the middle of a line-break, so we need to make sure, the last piece in the buffer is a LINEBREAK
// because that is all what is sure about the Memcached Protocol, all responds end with them.
private.buffer = function BufferBuffer(S, BufferStream){
S.responseBuffer += BufferStream;</p>
<pre><code>Client.config.debug && console.log(S.responseBuffer);
// only call transform the data once we are sure, 100% sure, that we valid response ending
if (S.responseBuffer.substr(S.responseBuffer.length - 2) === LINEBREAK){
var chunks = S.responseBuffer.split(LINEBREAK);
S.responseBuffer = ""; // clear!
this.rawDataReceived(S, S.bufferArray = S.bufferArray.concat(chunks));
} </code></pre>
<p> // The actual parsers function that scan over the responseBuffer in search of Memcached response
// identifiers. Once we have found one, we will send it to the dedicated parsers that will transform
// the data in a human readable format, deciding if we should queue it up, or send it to a callback fn.
memcached.rawDataReceived = function rawDataReceived(S){
var queue = []
, token
, tokenSet
, dataSet = ''
, resultSet
, metaData
, err = []
, tmp;</p>
<pre><code>while(S.bufferArray.length && private.allCommands.test(S.bufferArray[0])){
token = S.bufferArray.shift();
tokenSet = token.split(' ');
// special case for digit only's these are responses from INCR and DECR
if (/\d+/.test(tokenSet[0])) tokenSet.unshift('INCRDECR');
// special case for value, it's required that it has a second response!
// add the token back, and wait for the next response, we might be handling a big
// ass response here.
if (tokenSet[0] == 'VALUE' && S.bufferArray.indexOf('END') == -1){
return S.bufferArray.unshift(token);
// check for dedicated parser
if (private.parsers[tokenSet[0]]){
// fetch the response content
if (private.bufferedCommands.test(S.bufferArray[0])) break;
dataSet += S.bufferArray.shift();
resultSet = private.parsers[tokenSet[0]].call(S, tokenSet, dataSet || token, err, queue, this);
// check how we need to handle the resultSet response
case BUFFER:
case FLUSH:
metaData = S.metaData.shift();
resultSet = queue;
// if we have a callback, call it
if (metaData && metaData.callback){
metaData.execution = Date.now() - metaData.start;
metaData, err.length ? err : err[0],
// see if optional parsing needs to be applied to make the result set more readable
private.resultParsers[metaData.type] ? private.resultParsers[metaData.type].call(S, resultSet, err) :
!Array.isArray(queue) || queue.length > 1 ? queue : queue[0]
queue.length = err.length = 0;
metaData = S.metaData.shift();
if (metaData && metaData.callback){
metaData.execution = Date.now() - metaData.start;
metaData.callback.call(metaData, err.length > 1 ? err : err[0], resultSet[0]);
err.length = 0;
} else {
// handle unkown responses
metaData = S.metaData.shift();
if (metaData && metaData.callback){
metaData.execution = Date.now() - metaData.start;
metaData.callback.call(metaData, 'Unknown response from the memcached server: "' + token + '"', false);
// cleanup
dataSet = ''
tokenSet = metaData = undefined;
// check if we need to remove an empty item from the array, as splitting on /r/n might cause an empty
// item at the end..
if (S.bufferArray[0] === '') S.bufferArray.shift();
<p> // Small wrapper function that only executes errors when we have a callback
private.errorResponse = function errorResponse(error, callback){
if (typeof callback == 'function') callback(error, false);</p>
<pre><code>return false;</code></pre>
<p> // This is where the actual Memcached API layer begins:
memcached.get = function get(key, callback){
if (Array.isArray(key)) return this.getMulti.apply(this, arguments);</p>
<pre><code>this.command(function getCommand(noreply){ return {
key: key
, callback: callback
, validate: [['key', String], ['callback', Function]]
, type: 'get'
, command: 'get ' + key
<p> // the difference between get and gets is that gets, also returns a cas value
// and gets doesn't support multi-gets at this moment.
memcached.gets = function get(key, callback){
this.command(function getCommand(noreply){ return {
key: key
, callback: callback
, validate: [['key', String], ['callback', Function]]
, type: 'gets'
, command: 'gets ' + key
<p> // Handles get's with multiple keys
memcached.getMulti = function getMulti(keys, callback){
var memcached = this
, responses = {}
, errors = []
, calls</p>
<pre><code> // handle multiple responses and cache them untill we receive all.
, handle = function(err, results){
if (err) errors.push(err);
// add all responses to the array
(Array.isArray(results) ? results : [results]).forEach(function(value){ Utils.merge(responses, value) });
if (!--calls) callback(errors.length ? errors : false, responses);
this.multi(keys, function(server, key, index, totals){
if (!calls) calls = totals;
memcached.command(function getMultiCommand(noreply){ return {
callback: handle
, multi:true
, type: 'get'
, command: 'get ' + key.join(' ')
<p> // As all command nearly use the same syntax we are going to proxy them all to this
// function to ease maintenance. This is possible because most set commands will use the same
// syntax for the Memcached server. Some commands do not require a lifetime and a flag, but the
// memcached server is smart enough to ignore those.
private.setters = function setters(type, validate, key, value, lifetime, callback, cas){
var flag = 0
, memcached = this
, valuetype = typeof value
, length;</p>
<pre><code>if (Buffer.isBuffer(value)){
value = value.toString('binary');
} else if (valuetype !== 'string' && valuetype !== 'number'){
flag = FLAG_JSON;
value = JSON.stringify(value);
} else {
value = value.toString();
length = Buffer.byteLength(value);
if (length > memcached.maxValue) return private.errorResponse('The length of the value is greater than ' + memcached.maxValue, callback);
memcached.command(function settersCommand(noreply){ return {
key: key
, callback: callback
, lifetime: lifetime
, value: value
, cas: cas
, validate: validate
, type: type
, redundancyEnabled: true
, command: [type, key, flag, lifetime, length].join(' ') +
(cas ? ' ' + cas : '') +
(noreply ? NOREPLY : '') +
<p> // Curry the function and so we can tell the type our private set function
memcached.set = Utils.curry(false, private.setters, 'set', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]]);
memcached.replace = Utils.curry(false, private.setters, 'replace', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]]);
memcached.add = Utils.curry(false, private.setters, 'add', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]]);</p>
<p> memcached.cas = function checkandset(key, value, cas, lifetime, callback){
private.setters.call(this, 'cas', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]], key, value, lifetime, callback, cas);
<p> memcached.append = function append(key, value, callback){
private.setters.call(this, 'append', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]], key, value, 0, callback);
<p> memcached.prepend = function prepend(key, value, callback){
private.setters.call(this, 'prepend', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]], key, value, 0, callback);
<p> // Small handler for incr and decr's
private.incrdecr = function incrdecr(type, key, value, callback){
this.command(function incredecrCommand(noreply){ return {
key: key
, callback: callback
, value: value
, validate: [['key', String], ['value', Number], ['callback', Function]]
, type: type
, redundancyEnabled: true
, command: [type, key, value].join(' ') +
(noreply ? NOREPLY : '')
<p> // Curry the function and so we can tell the type our private incrdecr
memcached.increment = memcached.incr = Utils.curry(false, private.incrdecr, 'incr');
memcached.decrement = memcached.decr = Utils.curry(false, private.incrdecr, 'decr');</p>
<p> // Deletes the keys from the servers
memcached.del = function del(key, callback){
this.command(function deleteCommand(noreply){ return {
key: key
, callback: callback
, validate: [['key', String], ['callback', Function]]
, type: 'delete'
, redundancyEnabled: true
, command: 'delete ' + key +
(noreply ? NOREPLY : '')
memcached['delete'] = memcached.del;</p>
<p> // Small wrapper that handle single keyword commands such as FLUSH ALL, VERSION and STAT
private.singles = function singles(type, callback){
var memcached = this
, responses = []
, errors = []
, calls</p>
<pre><code> // handle multiple servers
, handle = function(err, results){
if (err) errors.push(err);
if (results) responses = responses.concat(results);
// multi calls should ALWAYS return an array!
if (!--calls) callback(errors, responses);
this.multi(false, function(server, keys, index, totals){
if (!calls) calls = totals;
memcached.command(function singlesCommand(noreply){ return {
callback: handle
, type: type
, command: type
<p> // Curry the function and so we can tell the type our private singles
memcached.version = Utils.curry(false, private.singles, 'version');
memcached.flush = Utils.curry(false, private.singles, 'flush_all');
memcached.stats = Utils.curry(false, private.singles, 'stats');
memcached.settings = Utils.curry(false, private.singles, 'stats settings');
memcached.slabs = Utils.curry(false, private.singles, 'stats slabs');
memcached.items = Utils.curry(false, private.singles, 'stats items');</p>
<p> // You need to use the items dump to get the correct server and slab settings
// see simple_cachedump.js for an example
memcached.cachedump = function cachedump(server, slabid, number, callback){
this.command(function cachedumpCommand(noreply){ return {
callback: callback
, number: number
, slabid: slabid
, validate: [['number', Number], ['slabid', Number], ['callback', Function]]
, type: 'stats cachedump'
, command: 'stats cachedump ' + slabid + ' ' + number
<p>var CreateHash = require('crypto').createHash;</p>
<p>exports.validateArg = function validateArg(args, config){
var toString = Object.prototype.toString
, err
, callback;</p>
<p> args.validate.forEach(function(tokens){
var key = tokens[0]
, value = args[key];</p>
case Number:
if (toString.call(value) !== '[object Number]') err = 'Argument "' + key + '" is not a valid Number.';
case Boolean:
if (toString.call(value) !== '[object Boolean]') err = 'Argument "' + key + '" is not a valid Boolean.';
case Array:
if (toString.call(value) !== '[object Array]') err = 'Argument "' + key + '" is not a valid Array.';
case Object:
if (toString.call(value) !== '[object Object]') err = 'Argument "' + key + '" is not a valid Object.';
case Function:
if (toString.call(value) !== '[object Function]') err = 'Argument "' + key + '" is not a valid Function.';
case String:
if (toString.call(value) !== '[object String]') err = 'Argument "' + key + '" is not a valid String.';
if (!err && key == 'key' && value.length > config.maxKeySize){
if (config.keyCompression){
args[key] = CreateHash('md5').update(value).digest('hex');
// also make sure you update the command
config.command.replace(new RegExp('^(' + value + ')'), args[key]);
} else {
err = 'Argument "' + key + '" is longer than the maximum allowed length of ' + config.maxKeySize;
if (toString.call(value) == '[object global]' && !tokens[2]) err = 'Argument "' + key + '" is not defined.';
<p> });</p>
<p> if (err){
if(callback) callback(err, false);
return false;
<p> return true;
<p>// currys a function
exports.curry = function curry(context, func){
var copy = Array.prototype.slice
, args = copy.call(arguments, 2);</p>
<p> return function(){
return func.apply(context || this, args.concat(copy.call(arguments)));
<p>// a small util to use an object for eventEmitter
exports.fuse = function fuse(target, handlers){
for(var i in handlers)
if (handlers.hasOwnProperty(i)){
target.on(i, handlers[i]);
<p>// merges a object's proppertys / values with a other object
exports.merge = function merge(target, obj){
for(var i in obj){
target[i] = obj[i];
<p> return target;
<p>// a small items iterator
exports.Iterator = function iterator(collection, callback){
var arr = Array.isArray(collection)
, keys = !arr ? Object.keys(collection) : false
, index = 0
, maximum = arr ? collection.length : keys.length
, self = this;</p>
<p> // returns next item
this.next = function(){
var obj = arr ? collection[index] : { key: keys[index], value: collection[keys[index]] };
callback(obj, index++, collection, self);
<p> // check if we have more items
this.hasNext = function(){
return index < maximum;
<p>var EventEmitter = require('events').EventEmitter
, Spawn = require('child_process').spawn
, Utils = require('./utils');</p>
<p>exports.Manager = ConnectionManager; // connection pooling
exports.IssueLog = IssueLog; // connection issue handling
exports.Available = ping; // connection availablity</p>
<p>function ping(host, callback){
var pong = Spawn('ping', [host]);</p>
<p> pong.stdout.on('data', function(data) {
callback(false, data.toString().split('\n')[0].substr(14));
<p> pong.stderr.on('data', function(data) {
callback(data.toString().split('\n')[0].substr(14), false);
<p>function IssueLog(args){
this.config = args;
this.messages = [];
this.failed = false;</p>
<p> this.totalRetries = 0;
this.totalReconnectsAttempted = 0;
this.totalReconnectsSuccess = 0;</p>
<p> Utils.merge(this, args);
<p>var issues = IssueLog.prototype = new EventEmitter;</p>
<p>issues.log = function(message){
var issue = this;</p>
<p> this.failed = true;
this.messages.push(message || 'No message specified');</p>
<p> if (this.retries){
setTimeout(Utils.curry(issue, issue.attemptRetry), this.retry);
return this.emit('issue', this.details);
<p> if (this.remove) return this.emit('remove', this.details)</p>
<p> setTimeout(Utils.curry(issue, issue.attemptReconnect), this.reconnect);
<p>Object.defineProperty(issues, 'details', {
get: function(){
var res = {};</p>
<pre><code>res.server = this.server;
res.tokens = this.tokens;
res.messages = this.messages;
if (this.retries){
res.retries = this.retries;
res.totalRetries = this.totalRetries
} else {
res.totalReconnectsAttempted = this.totalReconnectsAttempted;
res.totalReconnectsSuccess = this.totalReconnectsSuccess;
res.totalReconnectsFailed = this.totalReconnectsAttempted - this.totalReconnectsSuccess;
res.totalDownTime = (res.totalReconnectsFailed * this.reconnect) + (this.totalRetries * this.retry);
return res;</code></pre>
<p>issues.attemptRetry = function(){
this.failed = false;
<p>issues.attemptReconnect = function(){
var issue = this;
this.emit('reconnecting', this.details);</p>
<p> // Ping the server
ping(this.tokens[1], function(err){
// still no access to the server
if (err){
this.messages.push(message || 'No message specified');
return setTimeout(Utils.curry(issue, issue.attemptReconnect), issue.reconnect);
<pre><code>issue.emit('reconnected', issue.details);
issue.messages.length = 0;
issue.failed = false;
// we connected again, so we are going through the whole cycle again
Utils.merge(issue, JSON.parse(JSON.stringify(issue.config)));</code></pre>
<p>function ConnectionManager(name, limit, constructor){
this.name = name;
this.total = limit;
this.factory = constructor;
this.connections = [];
<p>var Manager = ConnectionManager.prototype;</p>
<p>Manager.allocate = function(callback){
var total
, i = total = this.connections.length
, Manager = this;</p>
<p> // check for available
if (this.isAvailable(this.connections[i])){
return callback(false, this.connections[i]);
<p> // create new
if (total < this.total){
return this.connections.push(this.factory.apply(this, arguments));
// wait untill the next event loop tick, to try again
<p>Manager.isAvailable = function(connection){
var readyState = connection.readyState;
return (readyState == 'open' || readyState == 'writeOnly') && !(connection.<em>writeQueue && connection.</em>writeQueue.length);
<p>Manager.remove = function(connection){
var position = this.connections.indexOf(connection);</p>
<p> if (position !== -1) this.connections.splice(position, 1);</p>
<p> if (connection.readyState && connection.readyState !== 'closed' && connection.end) connection.end();
<p>Manager.free = function(keep){
var save = 0
, connection;</p>
<p> while(this.connections.length){
connection = this.connections.shift();
if(save < keep && this.isAvailable(this.connection[0])){
