Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
#!/usr/bin/env python

# Copyright 2018 RethinkDB
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This file incorporates work covered by the following copyright:
# Copyright 2010-2016 RethinkDB, all rights reserved.


"""'rethinkdb index-rebuild' recreates outdated secondary indexes in a cluster.
  This should be used after upgrading to a newer version of rethinkdb.  There
  will be a notification in the web UI if any secondary indexes are out-of-date."""

from __future__ import print_function

import sys
import time
import traceback

from rethinkdb import query, utils_common

usage = "rethinkdb index-rebuild [-c HOST:PORT] [-n NUM] [-r (DB | DB.TABLE)] [--tls-cert FILENAME] [-p] " \
        "[--password-file FILENAME]..."
help_epilog = '''
FILE: the archive file to restore data from

EXAMPLES:
rethinkdb index-rebuild -c mnemosyne:39500
  rebuild all outdated secondary indexes from the cluster through the host 'mnemosyne',
  one at a time

rethinkdb index-rebuild -r test -r production.users -n 5
  rebuild all outdated secondary indexes from a local cluster on all tables in the
  'test' database as well as the 'production.users' table, five at a time
'''

# Prefix used for indexes that are being rebuilt
TMP_INDEX_PREFIX = '$reql_temp_index$_'


def parse_options(argv, prog=None):
    parser = utils_common.CommonOptionsParser(usage=usage, epilog=help_epilog, prog=prog)

    parser.add_option(
        "-r",
        "--rebuild",
        dest="db_table",
        metavar="DB|DB.TABLE",
        default=[],
        help="databases or tables to rebuild indexes on (default: all, may be specified multiple times)",
        action="append",
        type="db_table")
    parser.add_option(
        "-n",
        dest="concurrent",
        metavar="NUM",
        default=1,
        help="concurrent indexes to rebuild (default: 1)",
        type="pos_int")
    parser.add_option("--force", dest="force", action="store_true", default=False, help="rebuild non-outdated indexes")

    options, args = parser.parse_args(argv)

    # Check validity of arguments
    if len(args) != 0:
        parser.error("Error: No positional arguments supported. Unrecognized option '%s'" % args[0])

    return options


def rebuild_indexes(options):

    # flesh out options.db_table
    if not options.db_table:
        options.db_table = [
            utils_common.DbTable(x['db'], x['name']) for x in
            options.retryQuery('all tables', query.db('rethinkdb').table('table_config').pluck(['db', 'name']))
        ]
    else:
        for db_table in options.db_table[:]:  # work from a copy
            if not db_table[1]:
                options.db_table += [
                    utils_common.DbTable(db_table[0], x) for x in options.retryQuery('table list of %s' % db_table[0],
                                                                                     query.db(db_table[0]).table_list())
                ]
                del options.db_table[db_table]

    # wipe out any indexes with the TMP_INDEX_PREFIX
    for db, table in options.db_table:
        for index in options.retryQuery('list indexes on %s.%s' % (db, table), query.db(db).table(table).index_list()):
            if index.startswith(TMP_INDEX_PREFIX):
                options.retryQuery(
                    'drop index: %s.%s:%s' %
                    (db,
                     table,
                     index),
                    query.db(
                        index['db']).table(
                        index['table']).index_drop(
                        index['name']))

    # get the list of indexes to rebuild
    indexes_to_build = []
    for db, table in options.db_table:
        indexes = None
        if not options.force:
            indexes = options.retryQuery('get outdated indexes from %s.%s' % (db, table), query.db(
                db).table(table).index_status().filter({'outdated': True}).get_field('index'))
        else:
            indexes = options.retryQuery('get all indexes from %s.%s' %
                                         (db, table), query.db(db).table(table).index_status().get_field('index'))
        for index in indexes:
            indexes_to_build.append({'db': db, 'table': table, 'name': index})

    # rebuild selected indexes

    total_indexes = len(indexes_to_build)
    indexes_completed = 0
    progress_ratio = 0.0
    highest_progress = 0.0
    indexes_in_progress = []

    if not options.quiet:
        print("Rebuilding %d index%s: %s" % (total_indexes, 'es' if total_indexes > 1 else '',
                                             ", ".join(["`%(db)s.%(table)s:%(name)s`" % i for i in indexes_to_build])))

    while len(indexes_to_build) > 0 or len(indexes_in_progress) > 0:
        # Make sure we're running the right number of concurrent index rebuilds
        while len(indexes_to_build) > 0 and len(indexes_in_progress) < options.concurrent:
            index = indexes_to_build.pop()
            indexes_in_progress.append(index)
            index['temp_name'] = TMP_INDEX_PREFIX + index['name']
            index['progress'] = 0
            index['ready'] = False

            existing_indexes = dict(
                (x['index'],
                 x['function']) for x in options.retryQuery(
                    'existing indexes',
                    query.db(
                        index['db']).table(
                        index['table']).index_status().pluck(
                        'index',
                        'function')))

            if index['name'] not in existing_indexes:
                raise AssertionError('{index_name} is not part of existing indexes {indexes}'.format(
                    index_name=index['name'],
                    indexes=', '.join(existing_indexes)
                ))

            if index['temp_name'] not in existing_indexes:
                options.retryQuery('create temp index: %(db)s.%(table)s:%(name)s' % index, query.db(index['db']).table(
                    index['table']).index_create(index['temp_name'], existing_indexes[index['name']]))

        # Report progress
        highest_progress = max(highest_progress, progress_ratio)
        if not options.quiet:
            utils_common.print_progress(highest_progress)

        # Check the status of indexes in progress
        progress_ratio = 0.0
        for index in indexes_in_progress:
            status = options.retryQuery(
                "progress `%(db)s.%(table)s` index `%(name)s`" % index,
                query.db(index['db']).table(index['table']).index_status(index['temp_name']).nth(0)
            )
            if status['ready']:
                index['ready'] = True
                options.retryQuery(
                    "rename `%(db)s.%(table)s` index `%(name)s`" %
                    index,
                    query.db(
                        index['db']).table(
                        index['table']).index_rename(
                        index['temp_name'],
                        index['name'],
                        overwrite=True))
            else:
                progress_ratio += status.get('progress', 0) / total_indexes

        indexes_in_progress = [index for index in indexes_in_progress if not index['ready']]
        indexes_completed = total_indexes - len(indexes_to_build) - len(indexes_in_progress)
        progress_ratio += float(indexes_completed) / total_indexes

        if len(indexes_in_progress) == options.concurrent or \
           (len(indexes_in_progress) > 0 and len(indexes_to_build) == 0):
            # Short sleep to keep from killing the CPU
            time.sleep(0.1)

    # Make sure the progress bar says we're done and get past the progress bar line
    if not options.quiet:
        utils_common.print_progress(1.0)
        print("")


def main(argv=None, prog=None):
    options = parse_options(argv or sys.argv[1:], prog=prog)
    start_time = time.time()
    try:
        rebuild_indexes(options)
    except Exception as ex:
        if options.debug:
            traceback.print_exc()
        if not options.quiet:
            print(ex, file=sys.stderr)
        return 1
    if not options.quiet:
        print("Done (%d seconds)" % (time.time() - start_time))
    return 0


if __name__ == "__main__":
    sys.exit(main())