Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
cassandra-driver / test / integration / long / reconnection-tests.js
Size: Mime:
"use strict";
var assert = require('assert');
var util = require('util');

var helper = require('../../test-helper.js');
var Client = require('../../../lib/client.js');
var types = require('../../../lib/types');
var utils = require('../../../lib/utils');
var errors = require('../../../lib/errors');
var reconnection = require('../../../lib/policies/reconnection');

describe('reconnection', function () {
  this.timeout(120000);
  describe('when a node is back UP', function () {
    it('should re-prepare on demand', function (done) {
      var dummyClient = new Client(helper.baseOptions);
      var keyspace = helper.getRandomName('ks');
      var table = helper.getRandomName('tbl');
      var insertQuery1 = util.format('INSERT INTO %s (id, text_sample) VALUES (?, ?)', table);
      var insertQuery2 = util.format('INSERT INTO %s (id, int_sample) VALUES (?, ?)', table);
      var client = new Client(utils.extend({
        keyspace: keyspace,
        contactPoints: helper.baseOptions.contactPoints,
        policies: { reconnection: new reconnection.ConstantReconnectionPolicy(100)}
      }));
      utils.series([
        function removeCcm(next) {
          helper.ccmHelper.remove(function () {
            //Ignore error
            next();
          });
        },
        helper.ccmHelper.start(2),
        function createKs(next) {
          dummyClient.execute(helper.createKeyspaceCql(keyspace, 3), helper.waitSchema(dummyClient, next));
        },
        function createTable(next) {
          dummyClient.execute(helper.createTableCql(keyspace + '.' + table), helper.waitSchema(dummyClient, next));
        },
        //Connect using an active keyspace
        client.connect.bind(client),
        function insert1(next) {
          //execute a couple of times to ensure it is prepared on all hosts
          utils.times(15, function (n, timesNext) {
            client.execute(insertQuery1, [types.uuid(), n.toString()], {prepare: 1}, timesNext);
          }, next);
        },
        function killNode(next) {
          helper.ccmHelper.exec(['node1', 'stop', '--not-gently'], next);
        },
        function insert2(next) {
          //Prepare and execute a new Query, that it is NOT prepared on the stopped node :)
          //execute a couple of times to ensure it is prepared on the remaining hosts
          utils.times(3, function (n, timesNext) {
            client.execute(insertQuery2, [types.uuid(), n], {prepare: 1}, timesNext);
          }, next);
        },
        function restart(next) {
          helper.ccmHelper.exec(['node1', 'start'], next);
        },
        function killTheOther(next) {
          helper.ccmHelper.exec(['node2', 'stop'], next);
        },
        function insert1_plus (next) {
          utils.times(15, function (n, timesNext) {
            client.execute(insertQuery1, [types.uuid(), n.toString()], {prepare: 1}, timesNext);
          }, next);
        },
        function insert2_plus (next) {
          utils.times(1, function (n, timesNext) {
            client.execute(insertQuery2, [types.uuid(), n], {prepare: 1}, timesNext);
          }, next);
        }
      ], done);
    });
    it('should reconnect and re-prepare once there is an available host', function (done) {
      var dummyClient = new Client(helper.baseOptions);
      var keyspace = helper.getRandomName('ks');
      var table = helper.getRandomName('tbl');
      var insertQuery1 = util.format('INSERT INTO %s (id, text_sample) VALUES (?, ?)', table);
      var client = new Client({
        keyspace: keyspace,
        contactPoints: helper.baseOptions.contactPoints,
        policies: { reconnection: new reconnection.ConstantReconnectionPolicy(100)}
      });
      utils.series([
        function removeCcm(next) {
          helper.ccmHelper.remove(function () {
            //Ignore error
            next();
          });
        },
        helper.ccmHelper.start(1),
        function createKs(next) {
          dummyClient.execute(helper.createKeyspaceCql(keyspace, 3), helper.waitSchema(dummyClient, next));
        },
        function createTable(next) {
          dummyClient.execute(helper.createTableCql(keyspace + '.' + table), helper.waitSchema(dummyClient, next));
        },
        //Connect using an active keyspace
        client.connect.bind(client),
        function insert1(next) {
          //execute a couple of times to ensure it is prepared on all hosts
          utils.times(15, function (n, timesNext) {
            client.execute(insertQuery1, [types.uuid(), n.toString()], {prepare: 1}, timesNext);
          }, next);
        },
        function killNode(next) {
          helper.ccmHelper.exec(['node1', 'stop'], next);
        },
        function insert2(next) {
          //execute a couple of times to even though the cluster is DOWN
          utils.timesSeries(20, function (n, timesNext) {
            client.execute(insertQuery1, [types.uuid(), n.toString()], {prepare: 1}, function (err) {
              //It should callback in error
              helper.assertInstanceOf(err, errors.NoHostAvailableError);
              setTimeout(timesNext, 100);
            });
          }, next);
        },
        function restart(next) {
          helper.ccmHelper.exec(['node1', 'start'], helper.wait(5000, next));
        },
        function insert1_plus(next) {
          //The cluster should be UP!
          client.execute(util.format('SELECT * from %s', table), function (err, result) {
            assert.ifError(err);
            assert.strictEqual(result.rows.length, 15);
            utils.times(25, function (n, timesNext) {
              client.execute(insertQuery1, [types.uuid(), n.toString()], {prepare: 1}, timesNext);
            }, next);
          });
        },
        function assertResults(next) {
          client.execute(util.format('SELECT * from %s', table), function (err, result) {
            assert.ifError(err);
            assert.ok(result);
            //It should have inserted 15+25
            assert.strictEqual(result.rows.length, 40);
            next();
          });
        }
      ], done);
    });
    it('should re-prepare and execute batches of prepared queries', function (done) {
      var dummyClient = new Client(helper.baseOptions);
      var keyspace = helper.getRandomName('ks');
      var table = helper.getRandomName('tbl');
      var insertQuery1 = util.format('INSERT INTO %s (id, text_sample) VALUES (?, ?)', table);
      var insertQuery2 = util.format('INSERT INTO %s (id, int_sample) VALUES (?, ?)', table);
      var queriedHosts = {};
      var client = new Client(utils.extend({
        keyspace: keyspace,
        contactPoints: helper.baseOptions.contactPoints,
        policies: { reconnection: new reconnection.ConstantReconnectionPolicy(100)},
        //disable heartbeat
        pooling: { heartBeatInterval: 0}
      }));
      utils.series([
        function removeCcm(next) {
          helper.ccmHelper.remove(function () {
            //Ignore error
            next();
          });
        },
        helper.ccmHelper.start(3),
        helper.toTask(dummyClient.execute, dummyClient, helper.createKeyspaceCql(keyspace, 3)),
        helper.toTask(dummyClient.execute, dummyClient, helper.createTableCql(keyspace + '.' + table)),
        //Connect using an active keyspace
        client.connect.bind(client),
        //Kill node1
        helper.toTask(helper.ccmHelper.exec, null, ['node2', 'stop']),
        function (next) {
          setTimeout(next, 5000);
        },
        function insert1(next) {
          utils.times(10, function (n, timesNext) {
            var queries = [
              { query: insertQuery1, params: [types.Uuid.random(), n.toString()]},
              { query: insertQuery2, params: [types.Uuid.random(), n]}
            ];
            client.batch(queries, {prepare: true}, timesNext);
          }, next);
        },
        //restart node1
        helper.toTask(helper.ccmHelper.exec, null, ['node2', 'start']),
        function insertAfterRestart(next) {
          utils.times(15, function (n, timesNext) {
            var queries = [
              { query: insertQuery1, params: [types.Uuid.random(), n.toString()]},
              { query: insertQuery2, params: [types.Uuid.random(), n]}
            ];
            client.batch(queries, {prepare: true}, function (err, result) {
              assert.ifError(err);
              queriedHosts[result.info.queriedHost] = true;
              timesNext();
            });
          }, next);
        }
      ], done);
    });
  });
  describe('when connections are silently dropped', function () {
    it('should callback in err the next request', function (done) {
      //never reconnect
      var client = new Client(utils.extend({}, helper.baseOptions, {policies: {reconnection: new reconnection.ConstantReconnectionPolicy(Number.MAX_VALUE)}}));
      utils.series([
        helper.ccmHelper.start(1),
        client.connect.bind(client),
        function doSomeQueries(next) {
          utils.times(30, function (n, timesNext) {
            client.execute(helper.queries.basic, timesNext);
          }, next);
        },
        function silentlyKillConnections(next) {
          killConnections(client, true);
          setTimeout(next, 2000);
        },
        function issueARequest(next) {
          client.execute('SELECT * FROM system.local', function (err) {
            //The error is expected
            assert.ok(err);
            assert.ok(err.message.indexOf('undefined') < 0);
            assert.ok(Object.keys(err.innerErrors).length > 0);
            next();
          });
        }
      ], function (err) {
        assert.ifError(err);
        helper.ccmHelper.removeIfAny(done);
      });
    });
  });
  describe('when a node is killed during connection initialization', function() {
    it('should properly abort the connection and retry', function(done) {
      var client = newInstance();
      utils.series([
        helper.ccmHelper.start(2),
        function pauseNode2(next) {
          // Pause node2 so establishing connection to it hangs.
          helper.ccmHelper.exec(['node2', 'pause'], next);
        },
        client.connect.bind(client),
        function doSomeQueries(next) {
          // Issue some queries to get a connection attempt on node2.
          utils.times(30, function (n, timesNext) {
            client.execute(helper.queries.basic, function() {
              timesNext();
            });
          });
          // Since queries won't actually complete, wait 5 seconds and then kill node.
          setTimeout(next, 5000);
        },
        function killNode2(next) {
          // Kill node2 non-gently so OS sends a TCP RST on the connection.
          helper.ccmHelper.exec(['node2', 'stop', '--not-gently'], next);
        },
        function startNode2(next) {
          // If we've made it this far without process dying we are in good
          // shape, but validate node2 comes up anyways.
          helper.ccmHelper.startNode(2, next);
        },
        function wait10(next) {
          // Give 10 seconds for node to be marked up.
          setTimeout(next, 10000);
        },
        function isUp(next) {
          // Ensure all hosts are up.
          var hosts = client.hosts.values();
          assert.strictEqual(hosts.length, 2);
          hosts.forEach(function(host) {
            assert.ok(host.isUp());
          });
          next();
        }
      ], function (err) {
        assert.ifError(err);
        helper.ccmHelper.removeIfAny(done);
      });
    });
  });
});

/**
 * @param {Client} client
 * @param {Boolean} destroy
 */
function killConnections(client, destroy) {
  client.hosts.forEach(function (h) {
    h.pool.connections.forEach(function (c) {
      if (destroy) {
        c.netClient.destroy();
      }
      else {
        c.netClient.end();
      }
    });
  });
}

/**
 * @returns {Client}
 */
function newInstance(options) {
  options = options || {};
  options = utils.deepExtend(options, helper.baseOptions);
  return new Client(options);
}