<html>
<head>
<title>node-memcached</title>
<script src="http://ajax.googleapis.com/ajax/libs/jquery/1/jquery.min.js"></script>
<style>body {
margin: 0;
padding: 0;
font: 14px/1.5 'Palatino Linotype', 'Book Antiqua', Palatino, FreeSerif, serif;
color: #252519;
}
a {
color: #252519;
}
a:hover {
text-decoration: underline;
color: #19469D;
}
p {
margin: 12px 0;
}
h1, h2, h3 {
margin: 0;
padding: 0;
}
table#source {
width: 100%;
border-collapse: collapse;
}
table#source td:first-child {
padding: 30px 40px 30px 40px;
vertical-align: top;
}
table#source td:first-child,
table#source td:first-child pre {
width: 450px;
}
table#source td:last-child {
padding: 30px 0 30px 40px;
border-left: 1px solid #E5E5EE;
background: #F5F5FF;
}
table#source tr {
border-bottom: 1px solid #E5E5EE;
}
table#source tr.filename {
padding-top: 40px;
border-top: 1px solid #E5E5EE;
}
table#source tr.filename td:first-child {
text-transform: capitalize;
}
table#source tr.filename td:last-child {
font-size: 12px;
}
table#source tr.filename h2 {
margin: 0;
padding: 0;
cursor: pointer;
}
table#source tr.code h1,
table#source tr.code h2,
table#source tr.code h3 {
margin-top: 30px;
font-family: "Lucida Grande", "Helvetica Nueue", Arial, sans-serif;
font-size: 18px;
}
table#source tr.code h2 {
font-size: 16px;
}
table#source tr.code h3 {
font-size: 14px;
}
table#source tr.code ul {
margin: 15px 0 15px 35px;
padding: 0;
}
table#source tr.code ul li {
margin: 0;
padding: 1px 0;
}
table#source tr.code ul li p {
margin: 0;
padding: 0;
}
table#source tr.code td:first-child pre {
padding: 20px;
}
#ribbon {
position: fixed;
top: 0;
right: 0;
}
code .string { color: #219161; }
code .regexp { color: #219161; }
code .keyword { color: #954121; }
code .number { color: #19469D; }
code .comment { color: #bbb; }
code .this { color: #19469D; }</style>
<script>
$(function(){
$('tr.code').hide();
$('tr.filename').toggle(function(){
$(this).nextUntil('.filename').fadeIn();
}, function(){
$(this).nextUntil('.filename').fadeOut();
});
});
</script>
</head>
<body>
<table id="source"><tbody><tr><td><h1>node-memcached</h1></td><td></td></tr><tr class="filename"><td><h2 id="lib/memcached.js"><a href="#">memcached</a></h2></td><td>lib/memcached.js</td></tr><tr class="code">
<td class="docs">
<p>var EventEmitter = require('events').EventEmitter
, Stream = require('net').Stream
, Buffer = require('buffer').Buffer;</p>
<p>var HashRing = require('hashring')
, Connection = require('./connection')
, Utils = require('./utils')
, Manager = Connection.Manager
, IssueLog = Connection.IssueLog;</p>
<p>// The constructor
function Client(args, options){
if(!(this && this.hasOwnProperty && (this instanceof Client))) this = new Client();</p>
<p> var servers = []
, weights = {}
, key;</p>
<p> // Parse down the connection arguments<br></br> switch (Object.prototype.toString.call(args)){
case '[object String]':
servers.push(args);
break;
case '[object Object]':
weights = args;
servers = Object.keys(args);
case '[object Array]':
default:
servers = args;
break;
}</p>
<p> if (!servers.length) throw new Error('No servers where supplied in the arguments');</p>
<p> // merge with global and user config
Utils.merge(this, Client.config);
Utils.merge(this, options);
EventEmitter.call(this);</p>
<p> this.servers = servers;
this.HashRing = new HashRing(args, this.algorithm);
this.connections = {};
this.issues = [];
};</p>
<p>// Allows users to configure the memcached globally or per memcached client
Client.config = {
maxKeySize: 251 // max key size allowed by Memcached
, maxExpiration: 2592000 // max expiration duration allowed by Memcached
, maxValue: 1048576 // max length of value allowed by Memcached</p>
<p>, algorithm: 'crc32' // hashing algorithm that is used for key mapping </p>
<p>, poolSize: 10 // maximal parallel connections
, reconnect: 18000000 // if dead, attempt reconnect each xx ms
, timeout: 5000 // after x ms the server should send a timeout if we can't connect
, retries: 5 // amount of retries before server is dead
, retry: 30000 // timeout between retries, all call will be marked as cache miss
, remove: false // remove server if dead if false, we will attempt to reconnect
, redundancy: false // allows you do re-distribute the keys over a x amount of servers
, keyCompression: true // compress keys if they are to large (md5)
, debug: false // Output the commands and responses
};</p>
<p>// There some functions we don't want users to touch so we scope them
(function(nMemcached){
const LINEBREAK = '\r\n'
, NOREPLY = ' noreply'
, FLUSH = 1E3
, BUFFER = 1E2
, CONTINUE = 1E1
, FLAG<em>JSON = 1<<1
, FLAG</em>BINARY = 2<<1;</p>
<p> var memcached = nMemcached.prototype = new EventEmitter
, private = {}
, undefined;</p>
<p> // Creates or generates a new connection for the give server, the callback will receive the connection
// if the operation was successful
memcached.connect = function connect(server, callback){
// server is dead, bail out
if (server in this.issues && this.issues[server].failed) return callback(false, false);</p>
<pre><code>// fetch from connection pool
if (server in this.connections) return this.connections[server].allocate(callback);
// No connection factory created yet, so we must build one
var serverTokens = /(.*):(\d+){1,}$/.exec(server).reverse()
, memcached = this;
serverTokens.pop();
this.connections[server] = new Manager(server, this.poolSize, function(callback){
var S = new Stream
, Manager = this;
// config the Stream
S.setTimeout(memcached.timeout);
S.setNoDelay(true);
S.metaData = [];
S.responseBuffer = "";
S.bufferArray = [];
S.server = server;
S.tokens = serverTokens;
// Add the event listeners
Utils.fuse(S, {
connect: function streamConnect(){ callback(false, this) }
, close: function streamClose(){ Manager.remove(this) }
, error: function streamError(err){ memcached.connectionIssue(err, S, callback) }
, data: Utils.curry(memcached, private.buffer, S)
, timeout: function streamTimeout(){ Manager.remove(this) }
, end: S.end
});
// connect the net.Stream [port, hostname]
S.connect.apply(S, serverTokens);
return S;
});
// now that we have setup our connection factory we can allocate a new connection
this.connections[server].allocate(callback);</code></pre>
<p> };</p>
<p> // Creates a multi stream, so it's easier to query agains
// multiple memcached servers.
memcached.multi = function memcachedMulti(keys, callback){
var map = {}
, memcached = this
, servers
, i;</p>
<pre><code>// gets all servers based on the supplied keys,
// or just gives all servers if we don't have keys
if (keys){
keys.forEach(function fetchMultipleServers(key){
var server = memcached.HashRing.getNode(key);
if (map[server]){
map[server].push(key);
} else {
map[server] = [key];
}
});
// store the servers
servers = Object.keys(map);
} else {
servers = this.servers;
}
i = servers.length;
while(i--){
callback.call(this, servers[i], map[servers[i]], i, servers.length);
}</code></pre>
<p> };</p>
<p> // Executes the command on the net.Stream, if no server is supplied it will use the query.key to get
// the server from the HashRing
memcached.command = function memcachedCommand(queryCompiler, server){</p>
<pre><code>// generate a regular query,
var query = queryCompiler()
, redundancy = this.redundancy && this.redundancy < this.servers.length
, queryRedundancy = query.redundancyEnabled
, memcached = this;
// validate the arguments
if (query.validation && !Utils.validateArg(query, this)) return;
// fetch servers
server = server ? server : redundancy && queryRedundancy ? (redundancy = this.HashRing.createRange(query.key, (this.redundancy + 1), true)).shift() : this.HashRing.getNode(query.key);
// check if the server is still alive
if (server in this.issues && this.issues[server].failed) return query.callback && query.callback(false, false);
Client.config.debug && console.log(query.command);
this.connect(server, function allocateMemcachedConnection(error, S){
// check for issues
if (!S) return query.callback && query.callback(false, false);
if (error) return query.callback && query.callback(error);
if (S.readyState !== 'open') return query.callback && query.callback('Connection readyState is set to ' + S.readySate);
// used for request timing
query.start = Date.now();
S.metaData.push(query);
S.write(query.command + LINEBREAK);
});
// if we have redundancy enabled and the query is used for redundancy, than we are going loop over
// the servers, check if we can reach them, and connect to the correct net connection.
// because all redundancy queries are executed with "no reply" we do not need to store the callback
// as there will be no value to parse.
if (redundancy && queryRedundancy){
queryRedundancy = queryCompiler(queryRedundancy);
redundancy.forEach(function(server){
if (server in memcached.issues && memcached.issues[server].failed) return;
memcached.connect(server, function allocateMemcachedConnection(error, S){
if (!S || error || S.readyState !== 'open') return;
S.write(queryRedundancy.command + LINEBREAK);
});
})
}</code></pre>
<p> };</p>
<p> // Logs all connection issues, and handles them off. Marking all requests as cache misses.
memcached.connectionIssue = function connectionIssue(error, S, callback){
// end connection and mark callback as cache miss
if (S && S.end) S.end();
if (callback) callback(false, false);</p>
<pre><code>var issues
, server = S.server
, memcached = this;
// check for existing issue logs, or create a new log
if (server in this.issues){
issues = this.issues[server];
} else {
issues = this.issues[server] = new IssueLog({
server: server
, tokens: S.tokens
, reconnect: this.reconnect
, retries: this.retries
, retry: this.retry
, remove: this.remove
});
// proxy the events
Utils.fuse(issues, {
issue: function(details){ memcached.emit('issue', details) }
, failure: function(details){ memcached.emit('failure', details) }
, reconnecting: function(details){ memcached.emit('reconnecting', details) }
, reconnected: function(details){ memcached.emit('reconnect', details) }
, remove: function(details){
// emit event and remove servers
memcached.emit('remove', details);
memcached.connections[server].end();
if (this.failOverServers && this.failOverServers.length){
memcached.HashRing.replaceServer(server, this.failOverServers.shift());
} else {
memcached.HashRing.removeServer(server);
}
}
});
}
// log the issue
issues.log(error);</code></pre>
<p> };</p>
<p> // Kills all active connections
memcached.end = function endMemcached(){
var memcached = this;
Object.keys(this.connections).forEach(function closeConnection(key){
memcached.connections[key].free(0)
});
};</p>
<p> // These do not need to be publicly available as it's one of the most important
// parts of the whole client, the parser commands:
private.parsers = {
// handle error responses
'NOT<em>FOUND': function(tokens, dataSet, err){ return [CONTINUE, false] }
, 'NOT</em>STORED': function(tokens, dataSet, err){ return [CONTINUE, false] }
, 'ERROR': function(tokens, dataSet, err){ err.push('Received an ERROR response'); return [FLUSH, false] }
, 'CLIENT<em>ERROR': function(tokens, dataSet, err){ err.push(tokens.splice(1).join(' ')); return [CONTINUE, false] }
, 'SERVER</em>ERROR': function(tokens, dataSet, err, queue, S, memcached){ memcached.connectionIssue(tokens.splice(1).join(' '), S); return [CONTINUE, false] }</p>
<pre><code>// keyword based responses</code></pre>
<p> , 'STORED': function(tokens, dataSet){ return [CONTINUE, true] }
, 'DELETED': function(tokens, dataSet){ return [CONTINUE, true] }
, 'OK': function(tokens, dataSet){ return [CONTINUE, true] }
, 'EXISTS': function(tokens, dataSet){ return [CONTINUE, false] }
, 'END': function(tokens, dataSet, err, queue){ if (!queue.length) queue.push(false); return [FLUSH, true] }</p>
<pre><code>// value parsing:</code></pre>
<p> , 'VALUE': function(tokens, dataSet, err, queue){
var key = tokens[1]
, flag = +tokens[2]
, expire = tokens[3]
, cas = tokens[4]
, multi = this.metaData[0] && this.metaData[0].multi || cas ? {} : false
, tmp;</p>
<pre><code> switch (flag){
case FLAG_JSON:
dataSet = JSON.parse(dataSet);
break;
case FLAG_BINARY:
tmp = new Buffer(dataSet.length);
tmp.write(dataSet, 0, 'binary');
dataSet = tmp;
break;
}
// Add to queue as multiple get key key key key key returns multiple values
if (!multi){
queue.push(dataSet);
} else {
multi[key] = dataSet;
if (cas) multi.cas = cas;
queue.push(multi);
}
return [BUFFER, false]
}</code></pre>
<p> , 'INCRDECR': function(tokens){ return [CONTINUE, +tokens[1]] }
, 'STAT': function(tokens, dataSet, err, queue){
queue.push([tokens[1], /^\d+$/.test(tokens[2]) ? +tokens[2] : tokens[2]]);
return [BUFFER, true]
}
, 'VERSION': function(tokens, dataSet){
var versionTokens = /(\d+)(?:.)(\d+)(?:.)(\d+)$/.exec(tokens.pop());</p>
<pre><code> return [CONTINUE, {
server: this.server
, version: versionTokens[0]
, major: versionTokens[1] || 0
, minor: versionTokens[2] || 0
, bugfix: versionTokens[3] || 0
}];
}</code></pre>
<p> , 'ITEM': function(tokens, dataSet, err, queue){
queue.push({
key: tokens[1]
, b: +tokens[2].substr(1)
, s: +tokens[4]
});
return [BUFFER, false]
}
};</p>
<p> // Parses down result sets
private.resultParsers = {
// combines the stats array, in to an object
'stats': function(resultSet){
var response = {};</p>
<pre><code> // add references to the retrieved server
response.server = this.server;
// Fill the object
resultSet.forEach(function(statSet){
response[statSet[0]] = statSet[1];
});
return response;
}
// the settings uses the same parse format as the regular stats</code></pre>
<p> , 'stats settings': function(){ return private.resultParsers.stats.apply(this, arguments) }
// Group slabs by slab id
, 'stats slabs': function(resultSet){
var response = {};</p>
<pre><code> // add references to the retrieved server
response.server = this.server;
// Fill the object
resultSet.forEach(function(statSet){
var identifier = statSet[0].split(':');
if (!response[identifier[0]]) response[identifier[0]] = {};
response[identifier[0]][identifier[1]] = statSet[1];
});
return response;
}</code></pre>
<p> , 'stats items': function(resultSet){
var response = {};</p>
<pre><code> // add references to the retrieved server
response.server = this.server;
// Fill the object
resultSet.forEach(function(statSet){
var identifier = statSet[0].split(':');
if (!response[identifier[1]]) response[identifier[1]] = {};
response[identifier[1]][identifier[2]] = statSet[1];
});
return response;
}</code></pre>
<p> };</p>
<p> // Generates a RegExp that can be used to check if a chunk is memcached response identifier
private.allCommands = new RegExp('^(?:' + Object.keys(private.parsers).join('|') + '|\d' + ')');
private.bufferedCommands = new RegExp('^(?:' + Object.keys(private.parsers).join('|') + ')');</p>
<p> // When working with large chunks of responses, node chunks it in to pieces. So we might have
// half responses. So we are going to buffer up the buffer and user our buffered buffer to query
// against. Also when you execute allot of .writes to the same stream, node will combine the responses
// in to one response stream. With no indication where it had cut the data. So it can be it cuts inside the value response,
// or even right in the middle of a line-break, so we need to make sure, the last piece in the buffer is a LINEBREAK
// because that is all what is sure about the Memcached Protocol, all responds end with them.
private.buffer = function BufferBuffer(S, BufferStream){
S.responseBuffer += BufferStream;</p>
<pre><code>Client.config.debug && console.log(S.responseBuffer);
// only call transform the data once we are sure, 100% sure, that we valid response ending
if (S.responseBuffer.substr(S.responseBuffer.length - 2) === LINEBREAK){
var chunks = S.responseBuffer.split(LINEBREAK);
S.responseBuffer = ""; // clear!
this.rawDataReceived(S, S.bufferArray = S.bufferArray.concat(chunks));
} </code></pre>
<p> };</p>
<p> // The actual parsers function that scan over the responseBuffer in search of Memcached response
// identifiers. Once we have found one, we will send it to the dedicated parsers that will transform
// the data in a human readable format, deciding if we should queue it up, or send it to a callback fn.
memcached.rawDataReceived = function rawDataReceived(S){
var queue = []
, token
, tokenSet
, dataSet = ''
, resultSet
, metaData
, err = []
, tmp;</p>
<pre><code>while(S.bufferArray.length && private.allCommands.test(S.bufferArray[0])){
token = S.bufferArray.shift();
tokenSet = token.split(' ');
// special case for digit only's these are responses from INCR and DECR
if (/\d+/.test(tokenSet[0])) tokenSet.unshift('INCRDECR');
// special case for value, it's required that it has a second response!
// add the token back, and wait for the next response, we might be handling a big
// ass response here.
if (tokenSet[0] == 'VALUE' && S.bufferArray.indexOf('END') == -1){
return S.bufferArray.unshift(token);
}
// check for dedicated parser
if (private.parsers[tokenSet[0]]){
// fetch the response content
while(S.bufferArray.length){
if (private.bufferedCommands.test(S.bufferArray[0])) break;
dataSet += S.bufferArray.shift();
};
resultSet = private.parsers[tokenSet[0]].call(S, tokenSet, dataSet || token, err, queue, this);
// check how we need to handle the resultSet response
switch(resultSet.shift()){
case BUFFER:
break;
case FLUSH:
metaData = S.metaData.shift();
resultSet = queue;
// if we have a callback, call it
if (metaData && metaData.callback){
metaData.execution = Date.now() - metaData.start;
metaData.callback.call(
metaData, err.length ? err : err[0],
// see if optional parsing needs to be applied to make the result set more readable
private.resultParsers[metaData.type] ? private.resultParsers[metaData.type].call(S, resultSet, err) :
!Array.isArray(queue) || queue.length > 1 ? queue : queue[0]
);
}
queue.length = err.length = 0;
break;
case CONTINUE:
default:
metaData = S.metaData.shift();
if (metaData && metaData.callback){
metaData.execution = Date.now() - metaData.start;
metaData.callback.call(metaData, err.length > 1 ? err : err[0], resultSet[0]);
}
err.length = 0;
break;
}
} else {
// handle unkown responses
metaData = S.metaData.shift();
if (metaData && metaData.callback){
metaData.execution = Date.now() - metaData.start;
metaData.callback.call(metaData, 'Unknown response from the memcached server: "' + token + '"', false);
}
}
// cleanup
dataSet = ''
tokenSet = metaData = undefined;
// check if we need to remove an empty item from the array, as splitting on /r/n might cause an empty
// item at the end..
if (S.bufferArray[0] === '') S.bufferArray.shift();
};</code></pre>
<p> };</p>
<p> // Small wrapper function that only executes errors when we have a callback
private.errorResponse = function errorResponse(error, callback){
if (typeof callback == 'function') callback(error, false);</p>
<pre><code>return false;</code></pre>
<p> };</p>
<p> // This is where the actual Memcached API layer begins:
memcached.get = function get(key, callback){
if (Array.isArray(key)) return this.getMulti.apply(this, arguments);</p>
<pre><code>this.command(function getCommand(noreply){ return {
key: key
, callback: callback
, validate: [['key', String], ['callback', Function]]
, type: 'get'
, command: 'get ' + key
}});</code></pre>
<p> };</p>
<p> // the difference between get and gets is that gets, also returns a cas value
// and gets doesn't support multi-gets at this moment.
memcached.gets = function get(key, callback){
this.command(function getCommand(noreply){ return {
key: key
, callback: callback
, validate: [['key', String], ['callback', Function]]
, type: 'gets'
, command: 'gets ' + key
}});
};</p>
<p> // Handles get's with multiple keys
memcached.getMulti = function getMulti(keys, callback){
var memcached = this
, responses = {}
, errors = []
, calls</p>
<pre><code> // handle multiple responses and cache them untill we receive all.
, handle = function(err, results){
if (err) errors.push(err);
// add all responses to the array
(Array.isArray(results) ? results : [results]).forEach(function(value){ Utils.merge(responses, value) });
if (!--calls) callback(errors.length ? errors : false, responses);
};
this.multi(keys, function(server, key, index, totals){
if (!calls) calls = totals;
memcached.command(function getMultiCommand(noreply){ return {
callback: handle
, multi:true
, type: 'get'
, command: 'get ' + key.join(' ')
}},
server
);
});</code></pre>
<p> };</p>
<p> // As all command nearly use the same syntax we are going to proxy them all to this
// function to ease maintenance. This is possible because most set commands will use the same
// syntax for the Memcached server. Some commands do not require a lifetime and a flag, but the
// memcached server is smart enough to ignore those.
private.setters = function setters(type, validate, key, value, lifetime, callback, cas){
var flag = 0
, memcached = this
, valuetype = typeof value
, length;</p>
<pre><code>if (Buffer.isBuffer(value)){
flag = FLAG_BINARY;
value = value.toString('binary');
} else if (valuetype !== 'string' && valuetype !== 'number'){
flag = FLAG_JSON;
value = JSON.stringify(value);
} else {
value = value.toString();
}
length = Buffer.byteLength(value);
if (length > memcached.maxValue) return private.errorResponse('The length of the value is greater than ' + memcached.maxValue, callback);
memcached.command(function settersCommand(noreply){ return {
key: key
, callback: callback
, lifetime: lifetime
, value: value
, cas: cas
, validate: validate
, type: type
, redundancyEnabled: true
, command: [type, key, flag, lifetime, length].join(' ') +
(cas ? ' ' + cas : '') +
(noreply ? NOREPLY : '') +
LINEBREAK + value
}});</code></pre>
<p> };</p>
<p> // Curry the function and so we can tell the type our private set function
memcached.set = Utils.curry(false, private.setters, 'set', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]]);
memcached.replace = Utils.curry(false, private.setters, 'replace', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]]);
memcached.add = Utils.curry(false, private.setters, 'add', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]]);</p>
<p> memcached.cas = function checkandset(key, value, cas, lifetime, callback){
private.setters.call(this, 'cas', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]], key, value, lifetime, callback, cas);
};</p>
<p> memcached.append = function append(key, value, callback){
private.setters.call(this, 'append', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]], key, value, 0, callback);
};</p>
<p> memcached.prepend = function prepend(key, value, callback){
private.setters.call(this, 'prepend', [['key', String], ['lifetime', Number], ['value', String], ['callback', Function]], key, value, 0, callback);
};</p>
<p> // Small handler for incr and decr's
private.incrdecr = function incrdecr(type, key, value, callback){
this.command(function incredecrCommand(noreply){ return {
key: key
, callback: callback
, value: value
, validate: [['key', String], ['value', Number], ['callback', Function]]
, type: type
, redundancyEnabled: true
, command: [type, key, value].join(' ') +
(noreply ? NOREPLY : '')
}});
};</p>
<p> // Curry the function and so we can tell the type our private incrdecr
memcached.increment = memcached.incr = Utils.curry(false, private.incrdecr, 'incr');
memcached.decrement = memcached.decr = Utils.curry(false, private.incrdecr, 'decr');</p>
<p> // Deletes the keys from the servers
memcached.del = function del(key, callback){
this.command(function deleteCommand(noreply){ return {
key: key
, callback: callback
, validate: [['key', String], ['callback', Function]]
, type: 'delete'
, redundancyEnabled: true
, command: 'delete ' + key +
(noreply ? NOREPLY : '')
}});
};
memcached['delete'] = memcached.del;</p>
<p> // Small wrapper that handle single keyword commands such as FLUSH ALL, VERSION and STAT
private.singles = function singles(type, callback){
var memcached = this
, responses = []
, errors = []
, calls</p>
<pre><code> // handle multiple servers
, handle = function(err, results){
if (err) errors.push(err);
if (results) responses = responses.concat(results);
// multi calls should ALWAYS return an array!
if (!--calls) callback(errors, responses);
};
this.multi(false, function(server, keys, index, totals){
if (!calls) calls = totals;
memcached.command(function singlesCommand(noreply){ return {
callback: handle
, type: type
, command: type
}},
server
);
});</code></pre>
<p> };</p>
<p> // Curry the function and so we can tell the type our private singles
memcached.version = Utils.curry(false, private.singles, 'version');
memcached.flush = Utils.curry(false, private.singles, 'flush_all');
memcached.stats = Utils.curry(false, private.singles, 'stats');
memcached.settings = Utils.curry(false, private.singles, 'stats settings');
memcached.slabs = Utils.curry(false, private.singles, 'stats slabs');
memcached.items = Utils.curry(false, private.singles, 'stats items');</p>
<p> // You need to use the items dump to get the correct server and slab settings
// see simple_cachedump.js for an example
memcached.cachedump = function cachedump(server, slabid, number, callback){
this.command(function cachedumpCommand(noreply){ return {
callback: callback
, number: number
, slabid: slabid
, validate: [['number', Number], ['slabid', Number], ['callback', Function]]
, type: 'stats cachedump'
, command: 'stats cachedump ' + slabid + ' ' + number
}},
server
);
};</p>
<p>})(Client);</p>
<p>module.exports = Client;</p>
</td>
<td class="code">
</td>
</tr><tr class="filename"><td><h2 id="lib/utils.js"><a href="#">utils</a></h2></td><td>lib/utils.js</td></tr><tr class="code">
<td class="docs">
<p>var CreateHash = require('crypto').createHash;</p>
<p>exports.validateArg = function validateArg(args, config){
var toString = Object.prototype.toString
, err
, callback;</p>
<p> args.validate.forEach(function(tokens){
var key = tokens[0]
, value = args[key];</p>
<pre><code>switch(tokens[1]){
case Number:
if (toString.call(value) !== '[object Number]') err = 'Argument "' + key + '" is not a valid Number.';
break;
case Boolean:
if (toString.call(value) !== '[object Boolean]') err = 'Argument "' + key + '" is not a valid Boolean.';
break;
case Array:
if (toString.call(value) !== '[object Array]') err = 'Argument "' + key + '" is not a valid Array.';
break;
case Object:
if (toString.call(value) !== '[object Object]') err = 'Argument "' + key + '" is not a valid Object.';
break;
case Function:
if (toString.call(value) !== '[object Function]') err = 'Argument "' + key + '" is not a valid Function.';
break;
case String:
if (toString.call(value) !== '[object String]') err = 'Argument "' + key + '" is not a valid String.';
if (!err && key == 'key' && value.length > config.maxKeySize){
if (config.keyCompression){
args[key] = CreateHash('md5').update(value).digest('hex');
// also make sure you update the command
config.command.replace(new RegExp('^(' + value + ')'), args[key]);
} else {
err = 'Argument "' + key + '" is longer than the maximum allowed length of ' + config.maxKeySize;
}
}
break;
default:
if (toString.call(value) == '[object global]' && !tokens[2]) err = 'Argument "' + key + '" is not defined.';
}</code></pre>
<p> });</p>
<p> if (err){
if(callback) callback(err, false);
return false;
}</p>
<p> return true;
};</p>
<p>// currys a function
exports.curry = function curry(context, func){
var copy = Array.prototype.slice
, args = copy.call(arguments, 2);</p>
<p> return function(){
return func.apply(context || this, args.concat(copy.call(arguments)));
}
};</p>
<p>// a small util to use an object for eventEmitter
exports.fuse = function fuse(target, handlers){
for(var i in handlers)
if (handlers.hasOwnProperty(i)){
target.on(i, handlers[i]);
}
};</p>
<p>// merges a object's proppertys / values with a other object
exports.merge = function merge(target, obj){
for(var i in obj){
target[i] = obj[i];
}</p>
<p> return target;
};</p>
<p>// a small items iterator
exports.Iterator = function iterator(collection, callback){
var arr = Array.isArray(collection)
, keys = !arr ? Object.keys(collection) : false
, index = 0
, maximum = arr ? collection.length : keys.length
, self = this;</p>
<p> // returns next item
this.next = function(){
var obj = arr ? collection[index] : { key: keys[index], value: collection[keys[index]] };
callback(obj, index++, collection, self);
};</p>
<p> // check if we have more items
this.hasNext = function(){
return index < maximum;
};
};</p>
</td>
<td class="code">
</td>
</tr><tr class="filename"><td><h2 id="lib/connection.js"><a href="#">connection</a></h2></td><td>lib/connection.js</td></tr><tr class="code">
<td class="docs">
<p>var EventEmitter = require('events').EventEmitter
, Spawn = require('child_process').spawn
, Utils = require('./utils');</p>
<p>exports.Manager = ConnectionManager; // connection pooling
exports.IssueLog = IssueLog; // connection issue handling
exports.Available = ping; // connection availablity</p>
<p>function ping(host, callback){
var pong = Spawn('ping', [host]);</p>
<p> pong.stdout.on('data', function(data) {
callback(false, data.toString().split('\n')[0].substr(14));
pong.kill();
});</p>
<p> pong.stderr.on('data', function(data) {
callback(data.toString().split('\n')[0].substr(14), false);
pong.kill();
});
};</p>
<p>function IssueLog(args){
this.config = args;
this.messages = [];
this.failed = false;</p>
<p> this.totalRetries = 0;
this.totalReconnectsAttempted = 0;
this.totalReconnectsSuccess = 0;</p>
<p> Utils.merge(this, args);
EventEmitter.call(this);
};</p>
<p>var issues = IssueLog.prototype = new EventEmitter;</p>
<p>issues.log = function(message){
var issue = this;</p>
<p> this.failed = true;
this.messages.push(message || 'No message specified');</p>
<p> if (this.retries){
setTimeout(Utils.curry(issue, issue.attemptRetry), this.retry);
return this.emit('issue', this.details);
}</p>
<p> if (this.remove) return this.emit('remove', this.details)</p>
<p> setTimeout(Utils.curry(issue, issue.attemptReconnect), this.reconnect);
};</p>
<p>Object.defineProperty(issues, 'details', {
get: function(){
var res = {};</p>
<pre><code>res.server = this.server;
res.tokens = this.tokens;
res.messages = this.messages;
if (this.retries){
res.retries = this.retries;
res.totalRetries = this.totalRetries
} else {
res.totalReconnectsAttempted = this.totalReconnectsAttempted;
res.totalReconnectsSuccess = this.totalReconnectsSuccess;
res.totalReconnectsFailed = this.totalReconnectsAttempted - this.totalReconnectsSuccess;
res.totalDownTime = (res.totalReconnectsFailed * this.reconnect) + (this.totalRetries * this.retry);
}
return res;</code></pre>
<p> }
});</p>
<p>issues.attemptRetry = function(){
this.totalRetries++;
this.retries--;
this.failed = false;
};</p>
<p>issues.attemptReconnect = function(){
var issue = this;
this.totalReconnectsAttempted++;
this.emit('reconnecting', this.details);</p>
<p> // Ping the server
ping(this.tokens[1], function(err){
// still no access to the server
if (err){
this.messages.push(message || 'No message specified');
return setTimeout(Utils.curry(issue, issue.attemptReconnect), issue.reconnect);
}</p>
<pre><code>issue.emit('reconnected', issue.details);
issue.totalReconnectsSuccess++;
issue.messages.length = 0;
issue.failed = false;
// we connected again, so we are going through the whole cycle again
Utils.merge(issue, JSON.parse(JSON.stringify(issue.config)));</code></pre>
<p> });
};</p>
<p>function ConnectionManager(name, limit, constructor){
this.name = name;
this.total = limit;
this.factory = constructor;
this.connections = [];
};</p>
<p>var Manager = ConnectionManager.prototype;</p>
<p>Manager.allocate = function(callback){
var total
, i = total = this.connections.length
, Manager = this;</p>
<p> // check for available
while(i--){
if (this.isAvailable(this.connections[i])){
return callback(false, this.connections[i]);
}
}</p>
<p> // create new
if (total < this.total){
return this.connections.push(this.factory.apply(this, arguments));
}</p>
<p> // wait untill the next event loop tick, to try again
process.nextTick(function(){Manager.allocate(callback)});
};</p>
<p>Manager.isAvailable = function(connection){
var readyState = connection.readyState;
return (readyState == 'open' || readyState == 'writeOnly') && !(connection.<em>writeQueue && connection.</em>writeQueue.length);
};</p>
<p>Manager.remove = function(connection){
var position = this.connections.indexOf(connection);</p>
<p> if (position !== -1) this.connections.splice(position, 1);</p>
<p> if (connection.readyState && connection.readyState !== 'closed' && connection.end) connection.end();
};</p>
<p>Manager.free = function(keep){
var save = 0
, connection;</p>
<p> while(this.connections.length){
connection = this.connections.shift();
if(save < keep && this.isAvailable(this.connection[0])){
save++
continue;
}</p>
<pre><code>this.remove(connection);</code></pre>
<p> }
};</p>
</td>
<td class="code">
</td>
</tr> </body>
</html></tbody></table>