Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
tvault_configurator / workflows / cassnodes.py
Size: Mime:

import sys
import getopt
import socket
import itertools
import re
import json
from workloadmgr.triliopssh import ParallelSSHClient, AuthenticationException
from workloadmgr.triliopssh import UnknownHostException, ConnectionErrorException, SSHException
from workloadmgr.openstack.common import log as logging
from workloadmgr import autolog
from workloadmgr import exception
from workloadmgr.openstack.common.gettextutils import _
from workloadmgr import flags
from workloadmgr import settings

LOG = logging.getLogger(__name__)
Logger = autolog.Logger(LOG)


@autolog.log_method(Logger)
def find_alive_nodes(defaultnode, SSHPort, Username, Password, addlnodes=None):
    # Iterate thru all hosts and identify the valid list of cassandra hosts
    # will start with last known hosts
    # the cassandra service need not be running. This routine
    # only identifies the nodes that are up and running and take
    # ssh session
    error_msg = 'Unknown Error'
    nodelist = []
    if not addlnodes or len(addlnodes) == 0:
        LOG.info(
            _("'addlnodes' is empty. Defaulting to defaultnode %s atribute") %
            defaultnode)
        if not defaultnode:
            raise exception.InvalidState(
                "Cassandra workload is in invalid state. Do not have any node information set")
        addlnodes = defaultnode

    try:
        nodes = addlnodes.split(";")
        if '' in nodes:
            nodes.remove('')
        output = pssh_exec_command(nodes,
                                   int(SSHPort),
                                   Username,
                                   Password,
                                   "nodetool status")
        nodelist = nodes
    except AuthenticationException as ex:
        raise
    except Exception as ex:
        error_msg = _("Failed to execute '%s' on host(s) '%s' with error: %s") % (
            'nodetool status', str(addlnodes), str(ex))
        LOG.info(error_msg)
        nodes = addlnodes.split(";")
        if '' in nodes:
            nodes.remove('')
        if defaultnode not in nodes:
            nodes.append(defaultnode)

        for host in nodes:
            try:
                LOG.info(_('Connecting to Cassandra node %s') % host)
                pssh_exec_command([host],
                                  int(SSHPort),
                                  Username,
                                  Password,
                                  "nodetool status")
                LOG.info(_("Selected '" + host + "' for Cassandra nodetool"))
                nodelist.append(host)
            except AuthenticationException as ex:
                error_msg = _("Failed to execute '%s' on host '%s' with error: %s") % (
                    'nodetool status', host, str(ex))
                raise exception.ErrorOccurred(reason=error_msg)
            except Exception as ex:
                error_msg = _("Failed to execute '%s' on host '%s' with error: %s") % (
                    'nodetool status', host, str(ex))
                LOG.info(error_msg)
                pass

    if len(nodelist) == 0:
        LOG.info(error_msg)
        raise Exception(error_msg)

    LOG.info(_("Seed nodes of the Cassandra cluster are '%s'") % str(nodelist))
    return nodelist


@autolog.log_method(Logger)
def pssh_exec_command(hosts, port, user, password, command, sudo=False):
    try:
        LOG.info(_("pssh_exec_command - hosts: %s") % (str(hosts)))
        timeout = settings.get_settings().get('cassandra_discovery_timeout', '120')
        client = ParallelSSHClient(
            hosts,
            user=user,
            password=password,
            port=int(port),
            timeout=int(timeout))
        LOG.info(_("pssh_exec_command: %s") % (command))
        output = client.run_command(command, sudo=sudo)
        # dump environment if any node fails with command not found
        for host in output:
            if output[host]['exit_code']:
                envoutput = client.run_command('env')
                LOG.info(_("Environment dump:"))
                for h in envoutput:
                    for line in envoutput[h]['stdout']:
                        LOG.info(_("[%s]: %s") % (h, line))

                break
        # Dump every command output here for diagnostics puposes
        for host in output:
            output[host]['stdout'], iter1 = itertools.tee(
                output[host]['stdout'])
            output_filtered = []
            for line in iter1:
                if password == line:
                    continue
                output_filtered.append(line)
                LOG.info(_("[%s]\t%s") % (host, line))
            output[host]['stdout'] = output_filtered

    except (AuthenticationException, UnknownHostException, ConnectionErrorException) as ex:
        LOG.exception(ex)
        raise Exception(str(ex))
    except Exception as ex:
        LOG.exception(ex)
        raise Exception(str(ex))

    return output


@autolog.log_method(Logger)
def getclusterinfo(hosts, port, username, password):
    output = pssh_exec_command(
        hosts,
        port,
        username,
        password,
        "nodetool describecluster")
    for host in output:
        if output[host]['exit_code']:
            LOG.info(_("'nodetool describecluster' on %s cannot be executed. Error %s" % (
                host, str(output[host]['exit_code']))))
            continue

        clusterinfo = {}
        for line in output[host]['stdout']:
            if len(line.split(":")) < 2:
                continue
            clusterinfo[line.split(":")[0].strip()] = line.split(":")[
                1].strip()

        return clusterinfo

    msg = _("Failed to execute 'nodetool describecluster' successfully.")
    LOG.error(msg)
    raise exception.ErrorOccurred(msg)


@autolog.log_method(Logger)
def discovercassandranodes(hosts, port, username, password):
    LOG.info(_('Enter discovercassandranodes'))

    # nodetool status Sample output
    # Datacenter: 17
    #==============
    # Status=Up/Down
    #|/ State=Normal/Leaving/Joining/Moving
    #--  Address      Load       Owns (effective)  Host ID                               Token                                    Rack
    # UN  172.17.17.2  55.56 KB   0.2%              7d62d900-f99d-4b88-8012-f06cb639fc02  0                                        17
    # UN  172.17.17.4  76.59 KB   100.0%            75917649-6caa-4c66-b003-71c0eb8c09e8  -9210152678340971410                     17
    # UN  172.17.17.5  86.46 KB   99.8%
    # a03a1287-7d32-42ed-9018-8206fc295dd9  -9218601096928798970
    # 17

    nodelist = []
    output = pssh_exec_command(
        hosts,
        port,
        username,
        password,
        "nodetool status")
    currentdc = ""
    for host in output:
        if output[host]['exit_code'] is not None and output[host]['exit_code'] != 0:
            LOG.info(_("'nodetool status' on %s cannot be executed. Error %s" % (
                host, str(output[host]['exit_code']))))
            continue

        for line in output[host]['stdout']:

            line = line.replace(" KB", "KB")
            line = line.replace(" MB", "MB")
            line = line.replace(" GB", "GB")
            line = line.replace(" (", "(")
            line = line.replace(" ID", "ID")

            if line.startswith("--"):
                casskeys = line.split()
                continue

            if line.startswith("Datacenter"):
                currentdc = line.split(':')[1].strip()
                continue

            desc = line.split()
            if len(desc) == 0:
                continue

            if not desc[0] in ("UN", "UL", "UJ", "UM", "DN", "DL", "DJ", "DM"):
                continue

            node = {}
            node['Data Center'] = currentdc
            for idx, k in enumerate(casskeys):
                node[k] = desc[idx]
            nodelist.append(node)

        break

    if len(nodelist) == 0:
        msg = _('Failed to connect to Cassandra cluster. Please check the status of the cluster and try the operation again')
        LOG.error(msg)
        raise exception.ErrorOccurred(msg)

    cassandranodes = []
    availablenodes = []
    # Put nodes that are marked down in the cassandra nodetool status
    # into list of cassandra nodes. Gather more information
    # on other nodes using 'nodetool info'
    for n in nodelist:
        if n['--'] in ("DN", "DL", "DJ", "DM"):
            LOG.info(_("'%s' is marked down") % n['Address'])
            cassandranodes.append(n)
        else:
            availablenodes.append(n['Address'])

        # Sample output
        # =============
        # Token            : (invoke with -T/--tokens to see all 256 tokens)
        # ID               : f64ced33-2c01-40a3-9979-cf0a0b60d7af
        # Gossip active    : true
        # Thrift active    : true
        # Native Transport active: true
        # Load             : 148.13 KB
        # Generation No    : 1399521595
        # Uptime (seconds) : 36394
        # Heap Memory (MB) : 78.66 / 992.00
        # Data Center      : 17
        # Rack             : 17
        # Exceptions       : 0
        # Key Cache        : size 1400 (bytes), capacity 51380224 (bytes), 96 hits, 114 requests, 0.842 recent hit rate, 14400 save period in seconds
        # Row Cache        : size 0 (bytes), capacity 0 (bytes), 0 hits, 0
        # requests, NaN recent hit rate, 0 save period in seconds

    output = pssh_exec_command(
        availablenodes,
        port,
        username,
        password,
        "nodetool info")
    for host in output:
        if output[host]['exit_code'] is not None and output[host]['exit_code'] != 0:
            LOG.info(
                _("Cannot execute 'nodetool info' on %s. Error: ") %
                (host))
            for line in output[host]['stdout']:
                LOG.info(_("%s") % (line))
            continue

        for node in nodelist:
            if node['Address'] == host:
                break

        for line in output[host]['stdout']:
            fields = line.split(":")
            if len(fields) > 1:
                node[fields[0].strip()] = fields[1].strip()
        cassandranodes.append(node)

    LOG.info(_('Discovered Cassandra Nodes: %s') % str(len(cassandranodes)))
    LOG.info(_('Discovered Cassandra Nodes: ' + str(cassandranodes)))
    LOG.info(_('Exit discovercassandranodes'))

    clusterinfo = getclusterinfo(hosts, port, username, password)

    return cassandranodes, clusterinfo


@autolog.log_method(Logger)
def get_cassandra_nodes(alivenodes, port, username, password,
                        preferredgroups=None, findpartitiontype=False):
    LOG.info(_('Enter get_cassandra_nodes'))
    try:
        #
        # Getting sharding information
        #
        allnodes, clusterinfo = discovercassandranodes(
            alivenodes, port, username, password)

        # filter out nodes that are not in preferred datacenter
        preferrednodes = []
        if preferredgroups and len(preferredgroups):
            datacenters = preferredgroups.split(';')
            datacenters.remove('')
            for dc in datacenters:
                for node in allnodes:
                    if dc == node['Data Center']:
                        preferrednodes.append(node)
                # temporarily we only consider the first group
                # we will figure out how to support second group
                # incase the first one is unavailable

            downnodes = 0
            for node in preferrednodes:
                if node['--'] in ("DN", "DL", "DJ", "DM"):
                    downnodes += 1

            # This one is under the assumption that all keyspaces have a replica factor of 2
            # or more. We need to automatically determine this later
            if downnodes > len(preferrednodes) / 2:
                raise exception.InvalidState(_("More than half the nodes are down in the Data Center %s. \
                                     Choose a new data center or fix the current data center before\
                                     doing backup") % dc)
        else:
            preferrednodes = allnodes

        #
        # Resolve the node name to VMs
        # Usually Hadoop spits out nodes IP addresses. These
        # IP addresses need to be resolved to VM IDs by
        # querying the VM objects from nova
        #
        ips = {}
        for node in preferrednodes:
            # if the node is host name, resolve it to IP address
            try:
                # Make sure the node address is an IP address
                IP(node['Address'])
                node['IPAddress'] = node['Address']
                if node['--'] not in ("DN", "DL", "DJ", "DM"):
                    ips[node['IPAddress']] = 1
            except Exception as e:
                # we got hostnames
                node['IPAddress'] = socket.gethostbyname(node['Address'])
                if node['--'] not in ("DN", "DL", "DJ", "DM"):
                    ips[node['IPAddress']] = 1

        output = pssh_exec_command(
            ips,
            port,
            username,
            password,
            "ifconfig | grep -o -E '([[:xdigit:]]{1,2}:){5}[[:xdigit:]]{1,2}'",
            sudo=True)
        for host in output:
            for line in output[host]['stdout']:
                LOG.info(_('%s') % line)
            if output[host]['exit_code']:
                LOG.info(_('ifconfig failed on host %s') % host)
                raise Exception(
                    _("ifconfig failed on host '%s' Error Code %s") %
                    (host, str(
                        output[host]['exit_code'])))
            MacAddresses = []
            for line in output[host]['stdout']:
                LOG.info(_('%s') % line)
                MacAddress = line.lower()
                LOG.info(
                    _("Found mac address %s on host %s") %
                    (MacAddress, host))
                MacAddresses.append(MacAddress)
            if len(MacAddresses) == 0:
                LOG.info(
                    _("Strange... No MAC addresses were detected on host %s") %
                    (host))
                raise Exception(
                    _("No MAC addresses were detected on host %s") %
                    (host))
            else:
                for node in preferrednodes:
                    if node['IPAddress'] == host:
                        node['MacAddresses'] = MacAddresses
                        break
        for node in preferrednodes:
            node['root_partition_type'] = "lvm"

        if findpartitiontype is True:
            try:
                output = pssh_exec_command(
                    ips, port, username, password, 'df /')
                for host in output:
                    if output[host]['exit_code']:
                        LOG.info(_('"df /" on host %s') % host)
                        for line in output[host]['stdout']:
                            LOG.info(_('%s') % line)

                        continue

                    for line in output[host]['stdout']:
                        try:
                            # find the type of the root partition
                            m = re.search(r'(/[^\s]+)\s', str(line))
                            if m:
                                mp = m.group(1)

                                lvoutput = pssh_exec_command(
                                    [host], port, username, password, "lvdisplay " + mp, sudo=True)
                                LOG.info(
                                    _('lvdisplay: return value %d') %
                                    lvoutput[host]['exit_code'])
                                LOG.info(_('lvdisplay: output\n'))
                                for l in lvoutput[host]['stdout']:
                                    # remove password from the stdout
                                    l.replace(password, '******')
                                    LOG.info(_(l))
                                for node in preferrednodes:
                                    if node['IPAddress'] == host:
                                        if lvoutput[host]['exit_code'] is None or lvoutput[host]['exit_code'] == 0:
                                            node['root_partition_type'] = "lvm"
                                        else:
                                            node['root_partition_type'] = "Linux"
                                        LOG.info(
                                            _('%s: root partition is %s') %
                                            (node['IPAddress'], node['root_partition_type']))
                                        break

                        except Exception as ex:
                            LOG.info(
                                _("Cannot execute lvdisplay command on %s") %
                                host)
                            LOG.exception(ex)
            except Exception as ex:
                LOG.info(_("Failed to find partition type on %s") % host)
                LOG.exception(ex)

        LOG.info(_('Preferred Cassandra Nodes: %s') % str(len(preferrednodes)))
        for node in preferrednodes:
            LOG.info(_(node))
        LOG.info(_('Exit get_cassandra_nodes'))

        return preferrednodes, allnodes, clusterinfo
    except Exception as ex:
        LOG.info(_("Unexpected Error in get_cassandra_nodes"))
        LOG.exception(ex)
        raise

#exec_cqlsh_command(['cass1'], 22, 'ubuntu', 'project1', 'SELECT * FROM system.schema_keyspaces where keyspace_name=\'"\'Keyspace1\'"\'')
# cmd += SELECT * FROM system.schema_keyspaces where
# keyspace_name=\'"\'Keyspace1\'"\';


@autolog.log_method(Logger)
def exec_cqlsh_command(hosts, port, user, password, cqlshcommand):
    cmd = 'bash -c \'echo "'
    cmd += cqlshcommand
    cmd += ';" > /tmp/tvault-keyspace ; cqlsh ' + \
        hosts[0] + ' -f /tmp/tvault-keyspace\''

    return pssh_exec_command(hosts, port, user, password, cmd)


@autolog.log_method(Logger)
def get_keyspaces(alivenodes, port, username, password):
    keyspaces = []
    for alive in alivenodes:
        output = exec_cqlsh_command(
            [alive],
            port,
            username,
            password,
            'SELECT * FROM system.schema_keyspaces')

        for host in output:
            if "Connection error" in output[host]['stdout'][0]:
                continue

            if len(output[host]['stdout']) < 5:
                continue

            output[host]['stdout'].pop(0)
            output[host]['stdout'].pop()
            output[host]['stdout'].pop()
            output[host]['stdout'].pop()

            fieldsout = output[host]['stdout'][0].split('|')

            fields = []
            for f in fieldsout:
                fields.append(f.strip())

            for ksidx, ks in enumerate(output[host]['stdout'][2:]):
                ksfieldsout = ks.split('|')

                ksdict = {}
                for idx, ksfield in enumerate(ksfieldsout):
                    ksdict[fields[idx]] = ksfield.strip()
                keyspaces.append(ksdict)

            tmp = keyspaces
            keyspaces = []
            for idx, key in enumerate(tmp):
                if key['keyspace_name'].lower() not in [
                        'system', 'system_traces', 'dse_system']:
                    keyspaces.append(key)

            return keyspaces

    return keyspaces


@autolog.log_method(Logger)
def main(argv):
    try:
        errfile = '/tmp/cassnodes_errors.txt'
        outfile = '/tmp/cassnodes_output.txt'
        addlnodes = None
        preferredgroups = None
        findpartitiontype = False

        opts, args = getopt.getopt(argv, "", ["defaultnode=", "port=", "username=", "password=",
                                              "addlnodes=", "preferredgroups=", "findpartitiontype=", "outfile=", "errfile="])
        for opt, arg in opts:
            if opt == '--defaultnode':
                defaultnode = arg
            elif opt == '--port':
                port = arg
            elif opt == '--username':
                username = arg
            elif opt == '--password':
                password = arg
            elif opt == '--addlnodes':
                addlnodes = arg
            elif opt == '--preferredgroups':
                preferredgroups = arg
            elif opt == '--findpartitiontype':
                findpartitiontype = (arg == 'True')
            elif opt == '--outfile':
                outfile = arg
            elif opt == '--errfile':
                errfile = arg

        with open(outfile, 'w') as outfilehandle:
            pass

        alivenodes = find_alive_nodes(
            defaultnode, port, username, password, addlnodes)
        cassandranodes, allnodes, clusterinfo = get_cassandra_nodes(alivenodes, port, username, password,
                                                                    preferredgroups=preferredgroups,
                                                                    findpartitiontype=findpartitiontype)

        clusterinfo['preferrednodes'] = cassandranodes
        clusterinfo['allnodes'] = allnodes
        clusterinfo['keyspaces'] = get_keyspaces(
            alivenodes, port, username, password)

        with open(outfile, 'w') as outfilehandle:
            outfilehandle.write(json.dumps(clusterinfo))

    except getopt.GetoptError as ex:
        LOG.exception(ex)
        usage = _(
            "Usage: cassnodes.py --config-file /etc/workloadmgr/workloadmgr.conf --defaultnode cassandra1 "
            "--port 22 --username ubuntu --password password "
            "--addlnodes 'cassandra1;cassandra2;cassandra3' --preferredgroups 'DC1;DC2' "
            "--findpartitiontype False --outfile /tmp/cassnodes.txt --outfile /tmp/cassnodes_errors.txt")
        LOG.info(usage)
        with open(errfile, 'w') as errfilehandle:
            errfilehandle.write(usage)
            errfilehandle.write(str(ex))
        exit(1)
    except Exception as ex:
        LOG.exception(ex)
        with open(errfile, 'w') as errfilehandle:
            errfilehandle.write(str(ex))
        exit(1)


if __name__ == "__main__":
    flags.parse_args(sys.argv[1:2])
    logging.setup("workloadmgr")
    LOG = logging.getLogger('workflows.cassnodes')
    Logger = autolog.Logger(LOG)
    main(sys.argv[3:])