Repository URL to install this package:
|
Version:
1:7.26.0-1 ▾
|
datadog-agent
/
opt
/
datadog-agent
/
embedded
/
lib
/
python3.8
/
site-packages
/
rethinkdb
/
_index_rebuild.py
|
|---|
#!/usr/bin/env python
# Copyright 2018 RethinkDB
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This file incorporates work covered by the following copyright:
# Copyright 2010-2016 RethinkDB, all rights reserved.
"""'rethinkdb index-rebuild' recreates outdated secondary indexes in a cluster.
This should be used after upgrading to a newer version of rethinkdb. There
will be a notification in the web UI if any secondary indexes are out-of-date."""
from __future__ import print_function
import sys
import time
import traceback
from rethinkdb import query, utils_common
usage = "rethinkdb index-rebuild [-c HOST:PORT] [-n NUM] [-r (DB | DB.TABLE)] [--tls-cert FILENAME] [-p] " \
"[--password-file FILENAME]..."
help_epilog = '''
FILE: the archive file to restore data from
EXAMPLES:
rethinkdb index-rebuild -c mnemosyne:39500
rebuild all outdated secondary indexes from the cluster through the host 'mnemosyne',
one at a time
rethinkdb index-rebuild -r test -r production.users -n 5
rebuild all outdated secondary indexes from a local cluster on all tables in the
'test' database as well as the 'production.users' table, five at a time
'''
# Prefix used for indexes that are being rebuilt
TMP_INDEX_PREFIX = '$reql_temp_index$_'
def parse_options(argv, prog=None):
parser = utils_common.CommonOptionsParser(usage=usage, epilog=help_epilog, prog=prog)
parser.add_option(
"-r",
"--rebuild",
dest="db_table",
metavar="DB|DB.TABLE",
default=[],
help="databases or tables to rebuild indexes on (default: all, may be specified multiple times)",
action="append",
type="db_table")
parser.add_option(
"-n",
dest="concurrent",
metavar="NUM",
default=1,
help="concurrent indexes to rebuild (default: 1)",
type="pos_int")
parser.add_option("--force", dest="force", action="store_true", default=False, help="rebuild non-outdated indexes")
options, args = parser.parse_args(argv)
# Check validity of arguments
if len(args) != 0:
parser.error("Error: No positional arguments supported. Unrecognized option '%s'" % args[0])
return options
def rebuild_indexes(options):
# flesh out options.db_table
if not options.db_table:
options.db_table = [
utils_common.DbTable(x['db'], x['name']) for x in
options.retryQuery('all tables', query.db('rethinkdb').table('table_config').pluck(['db', 'name']))
]
else:
for db_table in options.db_table[:]: # work from a copy
if not db_table[1]:
options.db_table += [
utils_common.DbTable(db_table[0], x) for x in options.retryQuery('table list of %s' % db_table[0],
query.db(db_table[0]).table_list())
]
del options.db_table[db_table]
# wipe out any indexes with the TMP_INDEX_PREFIX
for db, table in options.db_table:
for index in options.retryQuery('list indexes on %s.%s' % (db, table), query.db(db).table(table).index_list()):
if index.startswith(TMP_INDEX_PREFIX):
options.retryQuery(
'drop index: %s.%s:%s' %
(db,
table,
index),
query.db(
index['db']).table(
index['table']).index_drop(
index['name']))
# get the list of indexes to rebuild
indexes_to_build = []
for db, table in options.db_table:
indexes = None
if not options.force:
indexes = options.retryQuery('get outdated indexes from %s.%s' % (db, table), query.db(
db).table(table).index_status().filter({'outdated': True}).get_field('index'))
else:
indexes = options.retryQuery('get all indexes from %s.%s' %
(db, table), query.db(db).table(table).index_status().get_field('index'))
for index in indexes:
indexes_to_build.append({'db': db, 'table': table, 'name': index})
# rebuild selected indexes
total_indexes = len(indexes_to_build)
indexes_completed = 0
progress_ratio = 0.0
highest_progress = 0.0
indexes_in_progress = []
if not options.quiet:
print("Rebuilding %d index%s: %s" % (total_indexes, 'es' if total_indexes > 1 else '',
", ".join(["`%(db)s.%(table)s:%(name)s`" % i for i in indexes_to_build])))
while len(indexes_to_build) > 0 or len(indexes_in_progress) > 0:
# Make sure we're running the right number of concurrent index rebuilds
while len(indexes_to_build) > 0 and len(indexes_in_progress) < options.concurrent:
index = indexes_to_build.pop()
indexes_in_progress.append(index)
index['temp_name'] = TMP_INDEX_PREFIX + index['name']
index['progress'] = 0
index['ready'] = False
existing_indexes = dict(
(x['index'],
x['function']) for x in options.retryQuery(
'existing indexes',
query.db(
index['db']).table(
index['table']).index_status().pluck(
'index',
'function')))
if index['name'] not in existing_indexes:
raise AssertionError('{index_name} is not part of existing indexes {indexes}'.format(
index_name=index['name'],
indexes=', '.join(existing_indexes)
))
if index['temp_name'] not in existing_indexes:
options.retryQuery('create temp index: %(db)s.%(table)s:%(name)s' % index, query.db(index['db']).table(
index['table']).index_create(index['temp_name'], existing_indexes[index['name']]))
# Report progress
highest_progress = max(highest_progress, progress_ratio)
if not options.quiet:
utils_common.print_progress(highest_progress)
# Check the status of indexes in progress
progress_ratio = 0.0
for index in indexes_in_progress:
status = options.retryQuery(
"progress `%(db)s.%(table)s` index `%(name)s`" % index,
query.db(index['db']).table(index['table']).index_status(index['temp_name']).nth(0)
)
if status['ready']:
index['ready'] = True
options.retryQuery(
"rename `%(db)s.%(table)s` index `%(name)s`" %
index,
query.db(
index['db']).table(
index['table']).index_rename(
index['temp_name'],
index['name'],
overwrite=True))
else:
progress_ratio += status.get('progress', 0) / total_indexes
indexes_in_progress = [index for index in indexes_in_progress if not index['ready']]
indexes_completed = total_indexes - len(indexes_to_build) - len(indexes_in_progress)
progress_ratio += float(indexes_completed) / total_indexes
if len(indexes_in_progress) == options.concurrent or \
(len(indexes_in_progress) > 0 and len(indexes_to_build) == 0):
# Short sleep to keep from killing the CPU
time.sleep(0.1)
# Make sure the progress bar says we're done and get past the progress bar line
if not options.quiet:
utils_common.print_progress(1.0)
print("")
def main(argv=None, prog=None):
options = parse_options(argv or sys.argv[1:], prog=prog)
start_time = time.time()
try:
rebuild_indexes(options)
except Exception as ex:
if options.debug:
traceback.print_exc()
if not options.quiet:
print(ex, file=sys.stderr)
return 1
if not options.quiet:
print("Done (%d seconds)" % (time.time() - start_time))
return 0
if __name__ == "__main__":
sys.exit(main())