Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
cassandra-driver / test / integration / short / client-each-row-tests.js
Size: Mime:
"use strict";
var assert = require('assert');
var util = require('util');

var helper = require('../../test-helper.js');
var Client = require('../../../lib/client.js');
var types = require('../../../lib/types');
var utils = require('../../../lib/utils.js');
var errors = require('../../../lib/errors.js');
var vit = helper.vit;

describe('Client', function () {
  this.timeout(120000);
  describe('#eachRow(query, params, {prepare: 0})', function () {
    before(helper.ccmHelper.start(1));
    after(helper.ccmHelper.remove);
    it('should callback per row and the end callback', function (done) {
      var client = newInstance();
      var query = helper.queries.basic;
      var counter = 0;
      //fail if its preparing
      //noinspection JSAccessibilityCheck
      client._getPrepared = function () {throw new Error('Prepared should not be called');};
      client.eachRow(query, [], {prepare: false}, function (n, row) {
        assert.strictEqual(n, 0);
        assert.ok(row instanceof types.Row, null);
        counter++;
      }, function (err) {
        assert.ifError(err);
        assert.strictEqual(counter, 1);
        done();
      });
    });
    it('should allow calls without end callback', function (done) {
      var client = newInstance();
      var query = helper.queries.basic;
      client.eachRow(query, [], {}, function (n, row) {
        assert.strictEqual(n, 0);
        assert.ok(row instanceof types.Row, null);
        done();
      });
    });
    it('should end callback when no rows', function (done) {
      var client = newInstance();
      var query = helper.queries.basicNoResults;
      var counter = 0;
      client.eachRow(query, [], {}, function () {
        counter++;
      }, function (err) {
        assert.ifError(err);
        assert.strictEqual(counter, 0);
        done();
      });
    });
    it('should end callback when VOID result', function (done) {
      var client = newInstance();
      var keyspace = helper.getRandomName('ks');
      var query = helper.createKeyspaceCql(keyspace, 1);
      var counter = 0;
      client.eachRow(query, [], {}, function () {
        counter++;
      }, function (err) {
        assert.ifError(err);
        assert.strictEqual(counter, 0);
        done();
      });
    });
    it('should call rowCallback per each row', function (done) {
      var client = newInstance();
      var keyspace = helper.getRandomName('ks');
      var table = keyspace + '.' + helper.getRandomName('table');
      var length = 300;
      var noop = function () {};
      var counter = 0;
      utils.series([
        function createKs(next) {
          client.eachRow(helper.createKeyspaceCql(keyspace, 1), [], noop, helper.waitSchema(client, next));
        },
        function createTable(next) {
          client.eachRow(helper.createTableCql(table), [], noop, helper.waitSchema(client, next));
        },
        function insert(next) {
          var query = 'INSERT INTO %s (id, text_sample) VALUES (%s, \'text%s\')';
          utils.timesSeries(length, function (n, timesNext) {
            client.eachRow(util.format(query, table, types.Uuid.random(), n), [], noop, timesNext);
          }, next);
        },
        function select(next) {
          client.eachRow(util.format('SELECT * FROM %s', table), [], function (n, row) {
            assert.strictEqual(n, counter++);
            assert.ok(row instanceof types.Row);
          }, function (err, result) {
            assert.ifError(err);
            assert.strictEqual(counter, length);
            //rowLength should be exposed
            assert.strictEqual(counter, result.rowLength);
            next();
          });
        }], done);
    });
    it('should fail if non-existent profile provided', function (done) {
      var client = newInstance();
      utils.series([
        function queryWithBadProfile(next) {
          var counter = 0;
          client.eachRow(helper.queries.basicNoResults, [], {executionProfile: 'none'}, function() {
            counter++;
          }, function (err) {
            assert.ok(err);
            helper.assertInstanceOf(err, errors.ArgumentError);
            assert.strictEqual(counter, 0);
            next();
          });
        },
        client.shutdown.bind(client)
      ], done);
    });
    vit('2.0', 'should autoPage', function (done) {
      var keyspace = helper.getRandomName('ks');
      var table = keyspace + '.' + helper.getRandomName('table');
      var client = newInstance();
      var noop = function () {};
      utils.series([
        function createKs(next) {
          client.eachRow(helper.createKeyspaceCql(keyspace, 1), [], noop, helper.waitSchema(client, next));
        },
        function createTable(next) {
          client.eachRow(helper.createTableCql(table), [], noop, helper.waitSchema(client, next));
        },
        function insertData(seriesNext) {
          var query = util.format('INSERT INTO %s (id, text_sample) VALUES (?, ?)', table);
          utils.times(100, function (n, next) {
            client.eachRow(query, [types.Uuid.random(), n.toString()], noop, next);
          }, seriesNext);
        },
        function selectDataMultiplePages(seriesNext) {
          //It should fetch 3 times, a total of 100 rows (45+45+10)
          var query = util.format('SELECT * FROM %s', table);
          var rowCount = 0;
          client.eachRow(query, [], {fetchSize: 45, autoPage: true}, function () {
            rowCount++;
          }, function (err, result) {
            assert.ifError(err);
            assert.strictEqual(rowCount, 100);
            assert.strictEqual(rowCount, result.rowLength);
            seriesNext();
          });
        },
        function selectDataOnePage(seriesNext) {
          //It should fetch 1 time, a total of 100 rows (even if asked more)
          var query = util.format('SELECT * FROM %s', table);
          var rowCount = 0;
          client.eachRow(query, [], {fetchSize: 2000, autoPage: true}, function () {
            rowCount++;
          }, function (err, result) {
            assert.ifError(err);
            assert.strictEqual(rowCount, 100);
            assert.strictEqual(rowCount, result.rowLength);
            seriesNext();
          });
        }
      ], done);
    });
  });
  describe('#eachRow(query, params, {prepare: 1})', function () {
    var keyspace = helper.getRandomName('ks');
    var table = keyspace + '.' + helper.getRandomName('table');
    var noop = function () {};
    before(function (done) {
      var client = newInstance();
      utils.series([
        helper.ccmHelper.start(3, {
          jvmArgs: ['-Dcassandra.wait_for_tracing_events_timeout_secs=-1']
        }),
        function (next) {
          client.eachRow(helper.createKeyspaceCql(keyspace, 3), [], noop, next);
        },
        function (next) {
          client.eachRow(helper.createTableCql(table), [], noop, next);
        }
      ], done);
    });
    after(helper.ccmHelper.remove);
    it('should callback per row and the end callback', function (done) {
      var client = newInstance();
      var query = helper.queries.basic;
      var counter = 0;
      //noinspection JSAccessibilityCheck
      var originalGetPrepared = client._getPrepared;
      var prepareCalled = false;
      //noinspection JSAccessibilityCheck
      client._getPrepared = function () {
        prepareCalled = true;
        originalGetPrepared.apply(client, arguments);
      };
      client.eachRow(query, [], {prepare: true}, function (n, row) {
        assert.strictEqual(n, 0);
        assert.ok(row instanceof types.Row, null);
        counter++;
      }, function (err) {
        assert.ifError(err);
        assert.strictEqual(counter, 1);
        assert.strictEqual(prepareCalled, true);
        done();
      });
    });
    it('should call rowCallback per each row', function (done) {
      var client = newInstance();
      var keyspace = helper.getRandomName('ks');
      var table = keyspace + '.' + helper.getRandomName('table');
      var length = 500;
      var noop = function () {};
      var counter = 0;
      utils.series([
        function createKs(next) {
          client.eachRow(helper.createKeyspaceCql(keyspace, 3), [], {prepare: true}, noop, helper.waitSchema(client, next));
        },
        function createTable(next) {
          client.eachRow(helper.createTableCql(table), [], {prepare: true}, noop, helper.waitSchema(client, next));
        },
        function insert(next) {
          var query = 'INSERT INTO %s (id, text_sample) VALUES (%s, \'text%s\')';
          utils.timesSeries(length, function (n, timesNext) {
            client.eachRow(util.format(query, table, types.Uuid.random(), n), [], {prepare: true}, noop, timesNext);
          }, next);
        },
        function select(next) {
          client.eachRow(util.format('SELECT * FROM %s', table), [], {prepare: true}, function (n, row) {
            assert.strictEqual(n, counter++);
            assert.ok(row instanceof types.Row);
          }, function (err) {
            assert.ifError(err);
            assert.strictEqual(counter, length);
            next();
          });
        }], done);
    });
    it('should allow maps with float values NaN and infinite values', function (done) {
      var client = newInstance({ keyspace: keyspace});
      var values = [
        [ 'finite', { val: 1 }],
        [ 'NaN', { val: NaN }],
        [ 'Infinite', { val: Number.POSITIVE_INFINITY }],
        [ 'Negative Infinite', { val: Number.NEGATIVE_INFINITY }]
      ];
      var queryOptions = { prepare: true, consistency: types.consistencies.quorum };
      var expectedValues = {};
      utils.series([
        client.connect.bind(client),
        function createTable(next) {
          var query = 'CREATE TABLE tbl_map_floats (id text PRIMARY KEY, data map<text, float>)';
          client.execute(query, next);
        },
        function insertData(next) {
          var query = 'INSERT INTO tbl_map_floats (id, data) VALUES (?, ?)';
          utils.eachSeries(values, function (params, eachNext) {
            expectedValues[params[0]] = params[1].val;
            client.execute(query, params, queryOptions, eachNext);
          }, next);
        },
        function retrieveData(next) {
          client.eachRow('SELECT * FROM tbl_map_floats', [], queryOptions, function (n, row) {
            var expected = expectedValues[row['id']];
            if (isNaN(expected)) {
              assert.ok(isNaN(row['data'].val));
            }
            else {
              assert.strictEqual(row['data'].val, expected);
            }
          }, function (err, result) {
            assert.ifError(err);
            assert.ok(result);
            assert.strictEqual(result.rowLength, values.length);
            next();
          });
        },
        client.shutdown.bind(client)
      ], done);
    });
    vit('2.0', 'should autoPage on parallel different tables', function (done) {
      var keyspace = helper.getRandomName('ks');
      var table1 = keyspace + '.' + helper.getRandomName('table');
      var table2 = keyspace + '.' + helper.getRandomName('table');
      var client = newInstance({ queryOptions: { consistency: types.consistencies.quorum }});
      var noop = function () {};
      utils.series([
        function createKs(next) {
          client.eachRow(helper.createKeyspaceCql(keyspace, 3), [], noop, helper.waitSchema(client, next));
        },
        function createTable(next) {
          client.eachRow(helper.createTableCql(table1), [], noop, helper.waitSchema(client, next));
        },
        function createTable(next) {
          client.eachRow(helper.createTableCql(table2), [], noop, helper.waitSchema(client, next));
        },
        function insertData(seriesNext) {
          var query = util.format('INSERT INTO %s (id, text_sample) VALUES (?, ?)', table1);
          utils.timesLimit(200, 25, function (n, next) {
            client.eachRow(query, [types.Uuid.random(), n.toString()], {prepare: 1}, noop, next);
          }, seriesNext);
        },
        function insertData(seriesNext) {
          var query = util.format('INSERT INTO %s (id, int_sample) VALUES (?, ?)', table2);
          utils.timesLimit(135, 25, function (n, next) {
            client.eachRow(query, [types.Uuid.random(), n+1], {prepare: 1}, noop, next);
          }, seriesNext);
        },
        function selectDataMultiplePages(seriesNext) {
          utils.parallel([
            function (parallelNext) {
              var query = util.format('SELECT * FROM %s', table1);
              var rowCount = 0;
              client.eachRow(query, [], {fetchSize: 39, autoPage: true, prepare: true}, function (n, row) {
                assert.ok(row['text_sample']);
                rowCount++;
              }, function (err, result) {
                validateResult(err, result, rowCount, 200, 39);
                parallelNext();
              });
            },
            function (parallelNext) {
              var query = util.format('SELECT * FROM %s', table2);
              var rowCount = 0;
              client.eachRow(query, [], { fetchSize: 23, autoPage: true, prepare: true }, function (n, row) {
                rowCount++;
                assert.ok(row['int_sample']);
              }, function (err, result) {
                validateResult(err, result, rowCount, 135, 23);
                parallelNext();
              });
            }
          ], seriesNext);
        }
      ], done);
      function validateResult(err, result, rowCount, expectedLength){
        assert.ifError(err);
        assert.strictEqual(rowCount, expectedLength);
        assert.strictEqual(rowCount, result.rowLength);
      }
    });
    vit('2.0', 'should use pageState and fetchSize', function (done) {
      var client = newInstance({queryOptions: {consistency: types.consistencies.quorum}});
      var metaPageState;
      var pageState;
      utils.series([
        helper.toTask(insertTestData, null, client, table, 131),
        function selectData(seriesNext) {
          //Only fetch 70
          var counter = 0;
          client.eachRow(util.format('SELECT * FROM %s', table), [], {prepare: 1, fetchSize: 70}, function () {
            counter++;
          }, function (err, result) {
            assert.ifError(err);
            assert.strictEqual(counter, 70);
            assert.strictEqual(result.rowLength, counter);
            pageState = result.pageState;
            metaPageState = result.meta.pageState;
            seriesNext();
          });
        },
        function selectDataRemaining(seriesNext) {
          //The remaining
          var counter = 0;
          client.eachRow(util.format('SELECT * FROM %s', table), [], {prepare: 1, fetchSize: 70, pageState: pageState}, function (n, row) {
            assert.ok(row);
            counter++;
          }, function (err, result) {
            assert.ifError(err);
            assert.strictEqual(result.rowLength, 61);
            assert.strictEqual(counter, result.rowLength);
            seriesNext();
          });
        },
        function selectDataRemainingWithMetaPageState(seriesNext) {
          //The remaining
          var counter = 0;
          client.eachRow(util.format('SELECT * FROM %s', table), [], {prepare: 1, fetchSize: 70, pageState: metaPageState}, function (n, row) {
            assert.ok(row);
            counter++;
          }, function (err, result) {
            assert.ifError(err);
            assert.strictEqual(result.rowLength, 61);
            assert.strictEqual(counter, result.rowLength);
            seriesNext();
          });
        }
      ], done);
    });
    vit('2.0', 'should expose result.nextPage() method', function (done) {
      var client = newInstance({queryOptions: {consistency: types.consistencies.quorum}});
      var pageState;
      var nextPageRows;
      utils.series([
        client.connect.bind(client),
        helper.toTask(insertTestData, null, client, table, 110),
        function selectData(seriesNext) {
          //Only fetch 60 the first time, 50 the following
          client.eachRow(util.format('SELECT * FROM %s', table), [], {prepare: 1, fetchSize: 60}, function (n, row) {
            if (nextPageRows) {
              nextPageRows.push(row);
            }
          }, function (err, result) {
            assert.ifError(err);
            helper.assertInstanceOf(result, types.ResultSet);
            if (!nextPageRows) {
              //the first time, it should have a next page
              assert.strictEqual(typeof result.nextPage, 'function');
              assert.strictEqual(typeof result.pageState, 'string');
              nextPageRows = [];
              pageState = result.pageState;
              //call to retrieve the following page rows.
              result.nextPage();
              return;
            }
            //the following times, there shouldn't be any additional page
            assert.strictEqual(typeof result.nextPage, 'undefined');
            assert.equal(result.pageState, null);
            seriesNext();
          });
        },
        function selectDataRemaining(seriesNext) {
          //Select the remaining with pageState and compare the results.
          var counter = 0;
          client.eachRow(util.format('SELECT * FROM %s', table), [], {prepare: 1, fetchSize: 100, pageState: pageState}, function (n, row) {
            assert.ok(row);
            counter++;
            assert.strictEqual(row['id'].toString(), nextPageRows[n]['id'].toString());
          }, function (err, result) {
            assert.ifError(err);
            assert.strictEqual(result.rowLength, 50);
            assert.strictEqual(counter, result.rowLength);
            seriesNext();
          });
        },
        client.shutdown.bind(client)
      ], done);
    });
    vit('2.0', 'should not expose result.nextPage() method when no more rows', function (done) {
      var client = newInstance({queryOptions: {consistency: types.consistencies.quorum}});
      var counter = 0;
      var rowLength = 10;
      utils.series([
        client.connect.bind(client),
        helper.toTask(insertTestData, null, client, table, rowLength),
        function assertNextPageNull(next) {
          client.eachRow(util.format('SELECT * FROM %s', table), [], {prepare: 1, fetchSize: 1000}, function () {
            counter++;
          }, function (err, result) {
            assert.ifError(err);
            assert.strictEqual(result.rowLength, rowLength);
            assert.strictEqual(counter, result.rowLength);
            assert.strictEqual(typeof result.nextPage, 'undefined');
            next();
          });
        },
        client.shutdown.bind(client)
      ], done);
    });
    it('should retrieve the trace id when queryTrace flag is set', function (done) {
      var client = newInstance({queryOptions: {consistency: types.consistencies.quorum}});
      var id = types.Uuid.random();
      utils.series([
        client.connect.bind(client),
        function selectNotExistent(next) {
          var query = util.format('SELECT * FROM %s WHERE id = ?', table);
          var called = 0;
          client.eachRow(query, [types.Uuid.random()], {prepare: true, traceQuery: true}, function () {
            called++;
          }, function (err, result) {
            assert.ifError(err);
            assert.ok(result);
            assert.strictEqual(called, 0);
            helper.assertInstanceOf(result.info.traceId, types.Uuid);
            next();
          });
        },
        function insertQuery(next) {
          var query = util.format('INSERT INTO %s (id) VALUES (?)', table);
          var called = 0;
          client.eachRow(query, [id], { prepare: true, traceQuery: true}, function () {
            called++;
          }, function (err, result) {
            assert.ifError(err);
            assert.ok(result);
            assert.strictEqual(called, 0);
            helper.assertInstanceOf(result.info.traceId, types.Uuid);
            next();
          });
        },
        function selectSingleRow(next) {
          var query = util.format('SELECT * FROM %s WHERE id = ?', table);
          var called = 0;
          client.eachRow(query, [id], { prepare: true, traceQuery: true}, function () {
            called++;
          }, function (err, result) {
            assert.ifError(err);
            assert.ok(result);
            assert.strictEqual(called, 1);
            assert.strictEqual(result.rowLength, 1);
            helper.assertInstanceOf(result.info.traceId, types.Uuid);
            next();
          });
        }
      ], done);
    });
    helper.vit('2.2', 'should include the warning in the ResultSet', function (done) {
      var client = newInstance();
      var loggedMessage = false;
      client.on('log', function (level, className, message) {
        if (loggedMessage || level !== 'warning') {
          return;
        }
        message = message.toLowerCase();
        if (message.indexOf('batch') >= 0 && message.indexOf('exceeding')) {
          loggedMessage = true;
        }
      });
      var query = util.format(
        "BEGIN UNLOGGED BATCH INSERT INTO %s (id, text_sample) VALUES (:id1, :sample)\n" +
        "INSERT INTO %s (id, text_sample) VALUES (:id2, :sample) APPLY BATCH",
        table,
        table
      );
      var params = { id1: types.Uuid.random(), id2: types.Uuid.random(), sample: utils.stringRepeat('c', 2562) };
      client.eachRow(query, params, {prepare: true}, function () {
        
      }, function (err, result) {
        assert.ifError(err);
        assert.ok(result.info.warnings);
        assert.strictEqual(result.info.warnings.length, 1);
        helper.assertContains(result.info.warnings[0], 'batch');
        helper.assertContains(result.info.warnings[0], 'exceeding');
        assert.ok(loggedMessage);
        client.shutdown(done);
      });
    });
    if (!helper.isWin()) {
      vit('2.0', 'should retrieve large result sets in parallel', function (done) {
        insertSelectTest(table, 50000, 20, 50000, { prepare: true }, done);
      });
      vit('2.0', 'should query multiple times in parallel with query tracing enabled', function (done) {
        insertSelectTest(table, 50000, 2000, 10, { prepare: true, traceQuery: true }, done);
      });
    }
  });
});

/**
 * @returns {Client}
 */
function newInstance(options) {
  options = options || {};
  options = utils.deepExtend(options, helper.baseOptions);
  return new Client(options);
}

function insertTestData(client, table, length, callback) {
  utils.series([
    function truncate(seriesNext) {
      client.eachRow('TRUNCATE ' + table, [], helper.noop, seriesNext);
    },
    function insertData(seriesNext) {
      var query = util.format('INSERT INTO %s (id, text_sample) VALUES (?, ?)', table);
      var value = utils.stringRepeat('abcdefghij', 10);
      utils.timesLimit(length, 100, function (n, next) {
        client.eachRow(query, [types.Uuid.random(), value], {prepare: 1}, helper.noop, next);
      }, seriesNext);
    }
  ], callback);
}

function insertSelectTest(table, rowLength, times, selectLimit, selectOptions, done) {
  var client = newInstance({
    queryOptions: {
      consistency: types.consistencies.quorum,
      fetchSize: rowLength
    },
    socketOptions: {
      readTimeout: 100000
    }
  });
  client.on('log', helper.log(['warning', 'error']));
  var query = util.format('SELECT * FROM %s LIMIT %d', table, selectLimit);
  utils.series([
    client.connect.bind(client),
    helper.toTask(insertTestData, null, client, table, rowLength),
    function selectData(seriesNext) {
      utils.timesLimit(times, 5, function (n, timesNext) {
        var counter = 0;
        client.eachRow(query, [], selectOptions, function (n, row) {
          assert.ok(row);
          counter++;
        }, function (err, result) {
          assert.ifError(err);
          assert.strictEqual(result.rowLength, counter);
          timesNext();
        });
      }, seriesNext);
    }
  ], helper.finish(client, done));
}