Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
workloadmgr / workloadmgr / workflows / hadoopworkflow.py
Size: Mime:
# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright (c) 2013 TrilioData, Inc.
# All Rights Reserved.

'''
Workflow for taking snapshot of mongodb instances
'''

import contextlib
import os
import random
import sys
import time

import datetime
import paramiko
import uuid

from IPy import IP
from taskflow import engines
from taskflow.utils import misc
from taskflow.listeners import printing
from taskflow.patterns import unordered_flow as uf
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import graph_flow as gf
from taskflow import task
from taskflow import flow

from workloadmgr.openstack.common.gettextutils import _
from workloadmgr.openstack.common import log as logging
from workloadmgr.compute import nova
import workloadmgr.context as context
from workloadmgr import utils
from oslo_messaging._drivers import amqp

from . import vmtasks
from . import workflow

LOG = logging.getLogger(__name__)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)


def InitFlow(store):
    pass

#from workloadmgr import exception
#
# Function that create the connection to the mongos server to be use as
# primary source of information in order to pick the servers. host, port
# user and password should be fed into the flow and other tasks should use
# them as needed.
#


def connect_server(host, port, user, password):
    try:
        client = paramiko.SSHClient()
        client.load_system_host_keys()
        client.set_missing_host_key_policy(paramiko.WarningPolicy)
        client.connect(host, port, user, password, timeout=120)
        LOG.debug(_('Connected to ' + host + ' on port ' + str(port) + '...'))

    except Exception as e:
        LOG.error(
            _('There was an error connecting to hadoop namenode. Error %s. Try again...'),
            str(e))
    return client


def gethadoopcfg(connection):
    stdin, stdout, stderr = connection.exec_command(
        "hadoop/bin/hadoop dfsadmin -report")
    dfsout = stdout.read(),

    # parse dfs output for hadoop nodes
    dfsout = dfsout[0].split("\n")
    line = '-------------------------------------------------'
    hcluster = dfsout[:dfsout.index(line)]
    dfsout = dfsout[dfsout.index(line) + 1:]
    totalnode = int(dfsout.pop(0).split(":")[1].strip().split(" ")[0])

    hcfg = {}
    for h in hcluster:
        if len(h.split(":")) == 2:
            hcfg[h.split(":")[0]] = h.split(":")[1]

    dfsout.pop(0)
    datanodes = []
    for _ in range(totalnode):
        x = dfsout.index("")
        node = dfsout[:x]
        datanodes.append(node)
        x = x + 2
        dfsout = dfsout[x:]

    hnodes = []
    for node in datanodes:
        dnode = {}
        for s in node:
            dnode[s.split(":")[0].strip()] = s.split(":")[1]
        hnodes.append(dnode)

    return {'hconfig': hcfg, 'hnodes': hnodes}


def getnodenames(connection):
    stdin, stdout, stderr = connection.exec_command(
        "hadoop/bin/hadoop dfsadmin -report")
    dfsout = stdout.read(),

    # parse dfs output for hadoop nodes
    dfsout = dfsout[0].split("\n")
    line = '-------------------------------------------------'
    hcluster = dfsout[:dfsout.index(line)]
    dfsout = dfsout[dfsout.index(line) + 1:]
    totalnode = int(dfsout.pop(0).split(":")[1].strip().split(" ")[0])

    dfsout.pop(0)
    datanodes = []
    for _ in range(totalnode):
        x = dfsout.index("")
        node = dfsout[:x]
        datanodes.append(node)
        x = x + 2
        dfsout = dfsout[x:]

    hnodes = []
    for node in datanodes:
        dnode = {}
        for s in node:
            dnode[s.split(":")[0]] = s.split(":")[1]
        hnodes.append(dnode)

    nodenames = []
    for node in hnodes:
        nodenames.append(node["Name"].strip())

    return nodenames


def get_hadoop_nodes(cntx, host, port, username, password):
    #
    # Creating connection to hadoop namenode
    #
    connection = connect_server(host, port, username, password)
    LOG.debug(_('Connected to hadoop name server: ' + host))

    #
    # Getting sharding information
    #
    nodenames = getnodenames(connection)
    LOG.debug(_('Discovered hadoop nodes: ' + str(nodenames)))

    #
    # Resolve the node name to VMs
    # Usually Hadoop spits out nodes IP addresses. These
    # IP addresses need to be resolved to VM IDs by
    # querying the VM objects from nova
    #
    ips = {}
    for name in nodenames:
        # if the name is host name, resolve it to IP address
        try:
            IP(name)
            ips[name] = 1
        except Exception as e:
            # we got hostnames
            import socket
            ips[socket.gethostbyname(name)] = 1

    # call nova list
    compute_service = nova.API(production=True)
    instances = compute_service.get_servers(cntx, admin=True)
    hypervisors = compute_service.get_hypervisors(cntx)

    vms = []
    # call nova interface-list <instanceid> to build the list of instances ids
    # if node names are host names then lookup the VMid based on the
    for instance in instances:
        ifs = instance.addresses
        for addr in instance.addresses:
            # IP Addresses
            ifs = instance.addresses[addr]
            for _if in ifs:
                if _if['addr'] in ips:
                    # this is our vm
                    hypervisor_hostname = None
                    hypervisor_type = None
                    for hypervisor in hypervisors:
                        if hypervisor.hypervisor_hostname == instance.__dict__[
                                'OS-EXT-SRV-ATTR:hypervisor_hostname']:
                            hypervisor_hostname = hypervisor.hypervisor_hostname
                            hypervisor_type = hypervisor.hypervisor_type
                            break

                    utils.append_unique(
                        vms,
                        {
                            'vm_id': instance.id,
                            'vm_name': instance.name,
                            'vm_metadata': instance.metadata,
                            'vm_flavor_id': instance.flavor['id'],
                            'vm_power_state': instance.__dict__['OS-EXT-STS:power_state'],
                            'hypervisor_hostname': hypervisor_hostname,
                            'hypervisor_type': hypervisor_type},
                        key="vm_id")
    return vms


class EnableSafemode(task.Task):

    def execute(self, Namenode, NamenodeSSHPort, Username, Password):
        self.client = connect_server(
            Namenode, int(NamenodeSSHPort), Username, Password)
        # Make sure the cluster is set to safemode

        LOG.debug(_('EnableSafemode:' + Namenode))
        stdin, stdout, stderr = self.client.exec_command(
            "hadoop/bin/hadoop dfsadmin -safemode get")
        try:
            stdout.read().index("ON")
            safemode = True
        except Exception as e:
            safemode = False

        stdin, stdout, stderr = self.client.exec_command(
            "hadoop/bin/hadoop dfsadmin -safemode enter")
        stdout.read()

        return safemode

    def revert(self, *args, **kwargs):
        try:
            # Read profile level from the flow record?
            if not isinstance(kwargs['result'], misc.Failure):
                if not kwargs['result']:
                    # our workflow set the cluster to safemode
                    # revert back
                    stdin, stdout, stderr = self.client.exec_command(
                        "hadoop/bin/hadoop dfsadmin -safemode leave")
                    stdout.read()
        except Exception as ex:
            LOG.exception(ex)
        finally:
            pass


class DisableSafemode(task.Task):

    def execute(self, Namenode, NamenodeSSHPort, Username, Password, safemode):
        if not safemode:
            self.client = connect_server(
                Namenode, int(NamenodeSSHPort), Username, Password)
            # Make sure the cluster is set to safemode

            LOG.debug(_('DisableSafemode:' + Namenode))
            stdin, stdout, stderr = self.client.exec_command(
                "hadoop/bin/hadoop dfsadmin -safemode leave")
            mode = stdout.read()
            LOG.debug(_(mode))


class HadoopWorkflow(workflow.Workflow):
    """"
      Hadoop Workflow
    """

    def __init__(self, name, store):
        super(HadoopWorkflow, self).__init__(name)
        self._store = store

    #
    # Hadoop workflowflow is a linear flow
    # :param host - hadoop name node
    # : port - ssh port of hadoop
    # : usename/password - username and password to authenticate to the namenode
    #
    def initflow(self, composite=False):
        cntx = amqp.RpcContext.from_dict(self._store['context'])
        self._store['instances'] = get_hadoop_nodes(cntx, self._store['Namenode'], int(
            self._store['NamenodeSSHPort']), self._store['Username'], self._store['Password'])
        for index, item in enumerate(self._store['instances']):
            self._store['instance_' + str(index)] = item

        snapshotvms = lf.Flow('hadoopwf')

        # Enable safemode on the namenode
        snapshotvms.add(EnableSafemode('EnableSafemore', provides='safemode'))

        # This is an unordered pausing of VMs. This flow is created in
        # common tasks library. This routine takes instance ids from
        # openstack. Workload manager should provide the list of
        # instance ids
        snapshotvms.add(vmtasks.UnorderedPauseVMs(self._store['instances']))

        # This is again unorder snapshot of VMs. This flow is implemented in
        # common tasks library
        snapshotvms.add(vmtasks.UnorderedSnapshotVMs(self._store['instances']))

        # This is an unordered pausing of VMs.
        snapshotvms.add(vmtasks.UnorderedUnPauseVMs(self._store['instances']))

        # enable profiling to the level before the flow started
        snapshotvms.add(DisableSafemode('DisableSafemode'))

        super(HadoopWorkflow, self).initflow(snapshotvms, composite=composite)

    def topology(self):

        LOG.debug(_('Connecting to hadoop nameserver ' +
                    self._store['Namenode']))
        connection = connect_server(self._store['Namenode'], int(
            self._store['NamenodeSSHPort']), self._store['Username'], self._store['Password'])
        LOG.debug(_('Connected to hadoop name server: ' +
                    self._store['Namenode']))

        hadoopcfg = gethadoopcfg(connection)

        # Covert the topology into generic topology that can be
        # returned as restful payload

        hadoopstatus = []
        hadoopstatus.append(
            ['Capacity', hadoopcfg['hconfig']['Configured Capacity']])
        hadoopstatus.append(
            ['Remaining', hadoopcfg['hconfig']['DFS Remaining']])
        hadoopstatus.append(['Used', hadoopcfg['hconfig']['DFS Used']])

        for node in hadoopcfg['hnodes']:
            node["name"] = node.pop("Name")
            node["status"] = str(node["Decommission Status"])
            node["input"] = []
            node["input"].append(['Capacity', node['Configured Capacity']])
            node["input"].append(['Used', node['DFS Used']])
            node["input"].append(['Remaining', node['DFS Remaining']])

        hadoop = {
            "name": "Hadoop",
            "children": hadoopcfg['hnodes'],
            "input": hadoopstatus}
        return dict(topology=hadoop)

    def details(self):
        # workflow details based on the
        # current topology, number of VMs etc
        def recurseflow(item):
            if isinstance(item, task.Task):
                taskdetails = {
                    'name': item._name.split("_")[0],
                    'type': 'Task'}
                taskdetails['input'] = []
                if len(item._name.split('_')) == 2:
                    nodename = item._name.split("_")[1]
                    for n in nodes['instances']:
                        if n['vm_id'] == nodename:
                            nodename = n['vm_name']
                    taskdetails['input'] = [['vm', nodename]]
                return taskdetails

            flowdetails = {}
            flowdetails['name'] = str(item).split("==")[0]
            flowdetails['type'] = str(item).split('.')[2]
            flowdetails['children'] = []
            for it in item:
                flowdetails['children'].append(recurseflow(it))

            return flowdetails

        nodes = self.discover()
        workflow = recurseflow(self._flow)
        return dict(workflow=workflow)

    def discover(self):
        cntx = amqp.RpcContext.from_dict(self._store['context'])
        instances = get_hadoop_nodes(cntx,
                                     self._store['Namenode'],
                                     int(self._store['NamenodeSSHPort']),
                                     self._store['Username'],
                                     self._store['Password'])
        for instance in instances:
            del instance['hypervisor_hostname']
            del instance['hypervisor_type']
        return dict(instances=instances)

    def execute(self):
        if self._store['source_platform'] == "vmware":
            compute_service = nova.API(production=True)
            search_opts = {}
            search_opts['deep_discover'] = '1'
            cntx = amqp.RpcContext.from_dict(self._store['context'])
            compute_service.get_servers(cntx, search_opts=search_opts)
        vmtasks.CreateVMSnapshotDBEntries(
            self._store['context'],
            self._store['instances'],
            self._store['snapshot'])
        result = engines.run(
            self._flow,
            engine_conf='parallel',
            backend={
                'connection': self._store['connection']},
            store=self._store)


"""
#test code
import json

#HadoopWorkflow Requires the following inputs in store:

store = {
    'connection':'mysql://root:project1@10.6.255.110/workloadmgr?charset=utf8',
    # Instanceids will to be discovered automatically
    'Namenode': 'hadoop1',            # namenode
    'NamenodeSSHPort': '22',                   # ssh port of namenode
    'Username': 'hduser',         # namenode user
    'Password': 'ubuntu',         # namenode password if ssh key is not set
}

#c = nova.novaclient(None, production=True, admin=True);
#context = context.RequestContext("1c4c73dfb2724ef7a4e659999db40be5", #admin user id
                                 #c.client.projectid,
                                 #is_admin = True,
                                 #auth_token=c.client.auth_token)
#store["context"] = context.__dict__
#store["context"]["conf"] = None
hwf = HadoopWorkflow("testflow", store)
#print(json.dumps(hwf.discover()))
#hwf.initflow()
#print(json.dumps(hwf.details()))
print(json.dumps(hwf.topology()))

#result = engines.load(hwf._flow, engine_conf='parallel', backend={'connection':'mysql://root:project1@10.6.255.110/workloadmgr?charset=utf8'}, store=store)

#print(hwf.execute())
"""