Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
#!/usr/bin/env python

# Copyright 2018 RethinkDB
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This file incorporates work covered by the following copyright:
# Copyright 2010-2016 RethinkDB, all rights reserved.

from __future__ import print_function

import csv
import ctypes
import datetime
import json
import multiprocessing
import numbers
import optparse
import os
import platform
import signal
import sys
import tempfile
import time
import traceback
from multiprocessing.queues import SimpleQueue

import six

from rethinkdb import errors, query, utils_common
from rethinkdb.logger import default_logger

try:
    unicode
except NameError:
    unicode = str


usage = """rethinkdb export [-c HOST:PORT] [-p] [--password-file FILENAME] [--tls-cert filename] [-d DIR]
      [-e (DB | DB.TABLE)]...
      [--format (csv | json | ndjson)] [--fields FIELD,FIELD...] [--delimiter CHARACTER]
      [--clients NUM]"""
help_description = '`rethinkdb export` exports data from a RethinkDB cluster into a directory'
help_epilog = '''
EXAMPLES:
rethinkdb export -c mnemosyne:39500
  Export all data from a cluster running on host 'mnemosyne' with a client port at 39500.

rethinkdb export -e test -d rdb_export
  Export only the 'test' database on a local cluster into a named directory.

rethinkdb export -c hades -e test.subscribers -p
  Export a specific table from a cluster running on host 'hades' which requires a password.

rethinkdb export --format csv -e test.history --fields time,message --delimiter ';'
  Export a specific table from a local cluster in CSV format with the fields 'time' and 'message',
  using a semicolon as field delimiter (rather than a comma).

rethinkdb export --fields id,value -e test.data
  Export a specific table from a local cluster in JSON format with only the fields 'id' and 'value'.
'''


def parse_options(argv, prog=None):
    if platform.system() == "Windows" or platform.system().lower().startswith('cygwin'):
        # no colons in name
        default_dir = "rethinkdb_export_%s" % datetime.datetime.today().strftime("%Y-%m-%dT%H-%M-%S")
    else:
        # "
        default_dir = "rethinkdb_export_%s" % datetime.datetime.today().strftime("%Y-%m-%dT%H:%M:%S")

    parser = utils_common.CommonOptionsParser(usage=usage, description=help_description, epilog=help_epilog, prog=prog)

    parser.add_option("-d", "--directory", dest="directory", metavar="DIRECTORY", default=default_dir,
                      help='directory to output to (default: rethinkdb_export_DATE_TIME)', type="new_file")
    parser.add_option(
        "-e",
        "--export",
        dest="db_tables",
        metavar="DB|DB.TABLE",
        default=[],
        help='limit dump to the given database or table (may be specified multiple times)',
        action="append",
        type="db_table")
    parser.add_option("--fields", dest="fields", metavar="<FIELD>,...", default=None,
                      help='export only specified fields (required for CSV format)')
    parser.add_option(
        "--format",
        dest="format",
        metavar="json|csv|ndjson",
        default="json",
        help='format to write (defaults to json. ndjson is newline delimited json.)',
        type="choice",
        choices=[
            'json',
            'csv',
            'ndjson'])
    parser.add_option(
        "--clients",
        dest="clients",
        metavar="NUM",
        default=3,
        help='number of tables to export simultaneously (default: 3)',
        type="pos_int")
    parser.add_option(
        "--read-outdated",
        dest="outdated",
        default=False,
        help='use outdated read mode',
        action="store_true")

    csvGroup = optparse.OptionGroup(parser, 'CSV options')
    csvGroup.add_option("--delimiter", dest="delimiter", metavar="CHARACTER", default=None,
                        help="character to be used as field delimiter, or '\\t' for tab (default: ',')")
    parser.add_option_group(csvGroup)

    options, args = parser.parse_args(argv)

    # -- Check validity of arguments

    if len(args) != 0:
        parser.error("No positional arguments supported. Unrecognized option(s): %s" % args)

    if options.fields:
        if len(options.db_tables) != 1 or options.db_tables[0].table is None:
            parser.error("The --fields option can only be used when exporting a single table")
        options.fields = options.fields.split(",")

    # - format specific validation

    if options.format == "csv":
        if options.fields is None:
            parser.error("CSV files require the '--fields' option to be specified.")

        if options.delimiter is None:
            options.delimiter = ","
        elif options.delimiter == "\\t":
            options.delimiter = "\t"
        elif len(options.delimiter) != 1:
            parser.error("Specify exactly one character for the --delimiter option: %s" % options.delimiter)
    else:
        if options.delimiter:
            parser.error("--delimiter option is only valid for CSV file formats")

    # -

    return options


def json_writer(filename, fields, task_queue, error_queue, format):
    try:
        with open(filename, "w") as out:
            first = True
            if format != "ndjson":
                out.write("[")
            item = task_queue.get()
            while not isinstance(item, StopIteration):
                row = item[0]
                if fields is not None:
                    for item in list(row.keys()):
                        if item not in fields:
                            del row[item]
                if first:
                    if format == "ndjson":
                        out.write(json.dumps(row))
                    else:
                        out.write("\n" + json.dumps(row))
                    first = False
                elif format == "ndjson":
                    out.write("\n" + json.dumps(row))
                else:
                    out.write(",\n" + json.dumps(row))

                item = task_queue.get()
            if format != "ndjson":
                out.write("\n]\n")
    except BaseException:
        ex_type, ex_class, tb = sys.exc_info()
        error_queue.put((ex_type, ex_class, traceback.extract_tb(tb)))

        # Read until the exit task so the readers do not hang on pushing onto the queue
        while not isinstance(task_queue.get(), StopIteration):
            pass


def csv_writer(filename, fields, delimiter, task_queue, error_queue):
    try:
        with open(filename, "w") as out:
            out_writer = csv.writer(out, delimiter=delimiter)
            out_writer.writerow(fields)

            item = task_queue.get()
            while not isinstance(item, StopIteration):
                row = item[0]
                info = []
                # If the data is a simple type, just write it directly, otherwise, write it as json
                for field in fields:
                    if field not in row:
                        info.append(None)
                    elif isinstance(row[field], numbers.Number):
                        info.append(str(row[field]))
                    elif isinstance(row[field], str):
                        info.append(row[field])
                    elif isinstance(row[field], unicode):
                        info.append(row[field].encode('utf-8'))
                    else:
                        if str == unicode:
                            info.append(json.dumps(row[field]))
                        else:
                            info.append(json.dumps(row[field]).encode('utf-8'))
                out_writer.writerow(info)
                item = task_queue.get()
    except BaseException:
        ex_type, ex_class, tb = sys.exc_info()
        error_queue.put((ex_type, ex_class, traceback.extract_tb(tb)))

        # Read until the exit task so the readers do not hang on pushing onto the queue
        while not isinstance(task_queue.get(), StopIteration):
            pass


def export_table(db, table, directory, options, error_queue, progress_info, sindex_counter, hook_counter, exit_event):
    signal.signal(signal.SIGINT, signal.SIG_DFL)  # prevent signal handlers from being set in child processes

    writer = None

    has_write_hooks = utils_common.check_minimum_version(options, '2.3.7', False)

    try:
        # -- get table info

        table_info = options.retryQuery('table info: %s.%s' % (db, table), query.db(db).table(table).info())

        # Rather than just the index names, store all index information
        table_info['indexes'] = options.retryQuery(
            'table index data %s.%s' % (db, table),
            query.db(db).table(table).index_status(),
            run_options={'binary_format': 'raw'}
        )

        sindex_counter.value += len(table_info["indexes"])

        if has_write_hooks:
            table_info['write_hook'] = options.retryQuery(
                'table write hook data %s.%s' % (db, table),
                query.db(db).table(table).get_write_hook(),
                run_options={'binary_format': 'raw'})

            if table_info['write_hook'] is not None:
                hook_counter.value += 1

        with open(os.path.join(directory, db, table + '.info'), 'w') as info_file:
            info_file.write(json.dumps(table_info) + "\n")
        with sindex_counter.get_lock():
            sindex_counter.value += len(table_info["indexes"])
        # -- start the writer
        if six.PY3:
            ctx = multiprocessing.get_context(multiprocessing.get_start_method())
            task_queue = SimpleQueue(ctx=ctx)
        else:
            task_queue = SimpleQueue()

        writer = None
        if options.format == "json":
            filename = directory + "/%s/%s.json" % (db, table)
            writer = multiprocessing.Process(
                target=json_writer,
                args=(
                    filename,
                    options.fields,
                    task_queue,
                    error_queue,
                    options.format))
        elif options.format == "csv":
            filename = directory + "/%s/%s.csv" % (db, table)
            writer = multiprocessing.Process(
                target=csv_writer,
                args=(
                    filename,
                    options.fields,
                    options.delimiter,
                    task_queue,
                    error_queue))
        elif options.format == "ndjson":
            filename = directory + "/%s/%s.ndjson" % (db, table)
            writer = multiprocessing.Process(
                target=json_writer,
                args=(
                    filename,
                    options.fields,
                    task_queue,
                    error_queue,
                    options.format))
        else:
            raise RuntimeError("unknown format type: %s" % options.format)
        writer.start()

        # -- read in the data source

        # -

        lastPrimaryKey = None
        read_rows = 0
        run_options = {
            "time_format": "raw",
            "binary_format": "raw"
        }
        if options.outdated:
            run_options["read_mode"] = "outdated"
        cursor = options.retryQuery(
            'inital cursor for %s.%s' % (db, table),
            query.db(db).table(table).order_by(index=table_info["primary_key"]),
            run_options=run_options
        )
        while not exit_event.is_set():
            try:
                for row in cursor:
                    # bail on exit
                    if exit_event.is_set():
                        break

                    # add to the output queue
                    task_queue.put([row])
                    lastPrimaryKey = row[table_info["primary_key"]]
                    read_rows += 1

                    # Update the progress every 20 rows
                    if read_rows % 20 == 0:
                        progress_info[0].value = read_rows

                else:
                    # Export is done - since we used estimates earlier, update the actual table size
                    progress_info[0].value = read_rows
                    progress_info[1].value = read_rows
                    break

            except (errors.ReqlTimeoutError, errors.ReqlDriverError):
                # connection problem, re-setup the cursor
                try:
                    cursor.close()
                except errors.ReqlError as exc:
                    default_logger.exception(exc)

                cursor = options.retryQuery(
                    'backup cursor for %s.%s' %
                    (db, table), query.db(db).table(table).between(
                        lastPrimaryKey, query.maxval, left_bound="open").order_by(
                        index=table_info["primary_key"]), run_options=run_options)

    except (errors.ReqlError, errors.ReqlDriverError) as ex:
        error_queue.put((RuntimeError, RuntimeError(ex.message), traceback.extract_tb(sys.exc_info()[2])))
    except BaseException:
        ex_type, ex_class, tb = sys.exc_info()
        error_queue.put((ex_type, ex_class, traceback.extract_tb(tb)))
    finally:
        if writer and writer.is_alive():
            task_queue.put(StopIteration())
            writer.join()


def abort_export(signum, frame, exit_event, interrupt_event):
    interrupt_event.set()
    exit_event.set()

# We sum up the row count from all tables for total percentage completion
#  This is because table exports can be staggered when there are not enough clients
#  to export all of them at once.  As a result, the progress bar will not necessarily
#  move at the same rate for different tables.


def update_progress(progress_info, options):
    rows_done = 0
    total_rows = 1
    for current, max_count in progress_info:
        curr_val = current.value
        max_val = max_count.value
        if curr_val < 0:
            # There is a table that hasn't finished counting yet, we can't report progress
            rows_done = 0
            break
        else:
            rows_done += curr_val
            total_rows += max_val

    if not options.quiet:
        utils_common.print_progress(float(rows_done) / total_rows, indent=4)


def run_clients(options, workingDir, db_table_set):
    # Spawn one client for each db.table, up to options.clients at a time
    exit_event = multiprocessing.Event()
    processes = []
    if six.PY3:
        ctx = multiprocessing.get_context(multiprocessing.get_start_method())
        error_queue = SimpleQueue(ctx=ctx)
    else:
        error_queue = SimpleQueue()
    interrupt_event = multiprocessing.Event()
    sindex_counter = multiprocessing.Value(ctypes.c_longlong, 0)
    hook_counter = multiprocessing.Value(ctypes.c_longlong, 0)

    signal.signal(signal.SIGINT, lambda a, b: abort_export(a, b, exit_event, interrupt_event))
    errors = []

    try:
        progress_info = []
        arg_lists = []
        for db, table in db_table_set:

            tableSize = int(options.retryQuery("count", query.db(db).table(table).info()['doc_count_estimates'].sum()))

            progress_info.append((multiprocessing.Value(ctypes.c_longlong, 0),
                                  multiprocessing.Value(ctypes.c_longlong, tableSize)))
            arg_lists.append((db, table,
                              workingDir,
                              options,
                              error_queue,
                              progress_info[-1],
                              sindex_counter,
                              hook_counter,
                              exit_event,
                              ))

        # Wait for all tables to finish
        while processes or arg_lists:
            time.sleep(0.1)

            while not error_queue.empty():
                exit_event.set()  # Stop immediately if an error occurs
                errors.append(error_queue.get())

            processes = [process for process in processes if process.is_alive()]

            if len(processes) < options.clients and len(arg_lists) > 0:
                new_process = multiprocessing.Process(target=export_table, args=arg_lists.pop(0))
                new_process.start()
                processes.append(new_process)

            update_progress(progress_info, options)

        # If we were successful, make sure 100% progress is reported
        # (rows could have been deleted which would result in being done at less than 100%)
        if len(errors) == 0 and not interrupt_event.is_set() and not options.quiet:
            utils_common.print_progress(1.0, indent=4)

        # Continue past the progress output line and print total rows processed
        def plural(num, text, plural_text):
            return "%d %s" % (num, text if num == 1 else plural_text)

        if not options.quiet:
            print("\n    %s exported from %s, with %s, and %s" %
                  (plural(sum([max(0, info[0].value) for info in progress_info]), "row", "rows"),
                   plural(len(db_table_set), "table", "tables"),
                   plural(sindex_counter.value, "secondary index", "secondary indexes"),
                   plural(hook_counter.value, "hook function", "hook functions")
                   ))
    finally:
        signal.signal(signal.SIGINT, signal.SIG_DFL)

    if interrupt_event.is_set():
        raise RuntimeError("Interrupted")

    if len(errors) != 0:
        # multiprocessing queues don't handle tracebacks, so they've already been stringified in the queue
        for error in errors:
            print("%s" % error[1], file=sys.stderr)
            if options.debug:
                print("%s traceback: %s" % (error[0].__name__, error[2]), file=sys.stderr)
        raise RuntimeError("Errors occurred during export")


def run(options):
    # Make sure this isn't a pre-`reql_admin` cluster - which could result in data loss
    # if the user has a database named 'rethinkdb'
    utils_common.check_minimum_version(options, '1.6')

    # get the complete list of tables
    db_table_set = set()
    all_tables = [utils_common.DbTable(x['db'], x['name']) for x in options.retryQuery(
        'list tables', query.db('rethinkdb').table('table_config').pluck(['db', 'name']))]
    if not options.db_tables:
        db_table_set = all_tables  # default to all tables
    else:
        all_databases = options.retryQuery('list dbs', query.db_list().filter(query.row.ne('rethinkdb')))
        for db_table in options.db_tables:
            db, table = db_table

            if db == 'rethinkdb':
                raise AssertionError('Can not export tables from the system database')

            if db not in all_databases:
                raise RuntimeError("Error: Database '%s' not found" % db)

            if table is None:  # This is just a db name, implicitly selecting all tables in that db
                db_table_set.update(set([x for x in all_tables if x.db == db]))
            else:
                if utils_common.DbTable(db, table) not in all_tables:
                    raise RuntimeError("Error: Table not found: '%s.%s'" % (db, table))
                db_table_set.add(db_table)

    # Determine the actual number of client processes we'll have
    options.clients = min(options.clients, len(db_table_set))

    # create the working directory and its structure
    parent_dir = os.path.dirname(options.directory)
    if not os.path.exists(parent_dir):
        if os.path.isdir(parent_dir):
            raise RuntimeError("Output parent directory is not a directory: %s" % parent_dir)
        try:
            os.makedirs(parent_dir)
        except OSError as e:
            raise optparse.OptionValueError("Unable to create parent directory for %s: %s" % (parent_dir, e.strerror))
    working_dir = tempfile.mkdtemp(
        prefix=os.path.basename(
            options.directory) +
        '_partial_',
        dir=os.path.dirname(
            options.directory))
    try:
        for db in set([database for database, _ in db_table_set]):
            os.makedirs(os.path.join(working_dir, str(db)))
    except OSError as e:
        raise RuntimeError("Failed to create temporary directory (%s): %s" % (e.filename, e.strerror))

    # Run the export
    run_clients(options, working_dir, db_table_set)

    # Move the temporary directory structure over to the original output directory
    try:
        if os.path.isdir(options.directory):
            os.rmdir(options.directory)  # an empty directory is created here when using _dump
        elif os.path.exists(options.directory):
            raise Exception('There was a file at the output location: %s' % options.directory)
        os.rename(working_dir, options.directory)
    except OSError as e:
        raise RuntimeError(
            "Failed to move temporary directory to output directory (%s): %s" %
            (options.directory, e.strerror))


def main(argv=None, prog=None):
    options = parse_options(argv or sys.argv[1:], prog=prog)

    start_time = time.time()
    try:
        run(options)
    except Exception as ex:
        if options.debug:
            traceback.print_exc()
        print(ex, file=sys.stderr)
        return 1
    if not options.quiet:
        print("  Done (%.2f seconds)" % (time.time() - start_time))
    return 0


if __name__ == "__main__":
    sys.exit(main())