Repository URL to install this package:
|
Version:
4.0.115 ▾
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 TrilioData, Inc.
# All Rights Reserved.
'''
Workflow for taking snapshot of mongodb instances
'''
import contextlib
import os
import random
import sys
import time
import json
import datetime
import paramiko
import uuid
from tempfile import mkstemp
import subprocess
from taskflow import engines
from taskflow.utils import misc
from taskflow.listeners import printing
from taskflow.patterns import unordered_flow as uf
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import graph_flow as gf
from taskflow import task
from taskflow import flow
from workloadmgr.openstack.common.gettextutils import _
from workloadmgr.openstack.common import log as logging
from workloadmgr.compute import nova
import workloadmgr.context as context
from workloadmgr.db.workloadmgrdb import WorkloadMgrDB
from workloadmgr import utils
from workloadmgr import exception
from workloadmgr import settings
from oslo_messaging._drivers import amqp
from . import vmtasks
from . import workflow
import pymongo
from pymongo import MongoClient
from pymongo import MongoReplicaSetClient
from pymongo import MongoClient, ReadPreference
LOG = logging.getLogger(__name__)
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir,
os.pardir))
sys.path.insert(0, top_dir)
#from workloadmgr import exception
#
# Function that create the connection to the mongos server to be use as
# primary source of information in order to pick the servers. host, port
# user and password should be fed into the flow and other tasks should use
# them as needed.
#
def connect_server(host, port, user, password, verbose=False):
try:
connection = None
if user != '':
auth = 'mongodb://' + user + ':' + \
password + '@' + host + ':' + str(port)
connection = MongoClient(auth)
else:
auth = ''
connection = MongoClient(host, int(port))
if verbose:
LOG.debug(_('Connected to ' + host + ' on port ' + port + '...'))
except Exception as ex:
LOG.error(_('Oops! There was an error. Try again...'))
LOG.error(_(ex))
if ex.__class__.__name__ == 'ConnectionFailure':
error = _('Failed to connect MongoDB node %s') % (str(host))
elif ex.__class__.__name__ == 'ConfigurationError':
error = _(
'Wrong username/password entered for MongoDB node %s') % (str(host))
else:
error = _('Failed to connect MongoDB node %s') % (str(host))
raise exception.ErrorOccurred(reason=error)
return connection
def isShardedCluster(conn):
try:
status = conn.admin.command("ismaster")
return not ('primary' in status and 'secondary' in status)
except Exception as ex:
LOG.exception(ex)
raise exception.ErrorOccurred(reason=_(
"Cannot connect to mongos server.Check database settings in Credentials tab and try again"))
def getShards(conn):
try:
db = conn.config
collection = db.shards
shards = collection.find()
return shards
except Exception as e:
LOG.error(
'There was an error getting shards:' +
str(e) +
'Try again...')
class DisableProfiling(task.Task):
def execute(self, DBHost, DBPort, DBUser, DBPassword):
self.client = connect_server(DBHost, DBPort, DBUser, DBPassword)
# Make sure profile is disabled, but also save current
# profiling state in the flow record? so revert as well
# as ResumeDB task sets the right profiling level
LOG.debug(_('DisableProfiling:'))
dbmap = self.client.admin.command('getShardMap')
cfgsrvs = dbmap['map']['config'].split(',')
for cfgsrv in cfgsrvs:
cfghost = cfgsrv.split(':')[0]
cfgport = cfgsrv.split(':')[1]
try:
self.cfgclient = connect_server(cfghost, int(cfgport),
DBUser, DBPassword)
proflevel = self.cfgclient.admin.profiling_level()
# diable profiling
self.cfgclient.admin.set_profiling_level(pymongo.OFF)
return proflevel
except BaseException:
LOG.debug(_('"' + cfghost + '" appears to be offline'))
pass
LOG.error(_("Cannot find config server to disable profiling. \
Make sure your mongodb cluster is up and running"))
raise Exception(_("Cannot find config server to disable profiling. \
Make sure your mongodb cluster is up and running"))
def revert(self, *args, **kwargs):
try:
# Read profile level from the flow record?
if not isinstance(kwargs['result'], misc.Failure):
self.cfgclient.admin.set_profiling_level(kwargs['result'])
except Exception as ex:
LOG.exception(ex)
finally:
pass
class EnableProfiling(task.Task):
def execute(self, DBHost, DBPort, DBUser, DBPassword, proflevel):
LOG.debug(_('EnableProfiling'))
self.client = connect_server(DBHost, DBPort, DBUser, DBPassword)
dbmap = self.client.admin.command('getShardMap')
cfgsrvs = dbmap['map']['config'].split(',')
for cfgsrv in cfgsrvs:
cfghost = cfgsrv.split(':')[0]
cfgport = cfgsrv.split(':')[1]
try:
self.cfgclient = connect_server(cfghost, int(cfgport),
DBUser, DBPassword)
# Read profile level from the flow record?
self.cfgclient.admin.set_profiling_level(proflevel)
return
except BaseException:
LOG.debug(_('"' + cfghost + '" appears to be offline'))
pass
LOG.error(_("Cannot enable profiling. \
Make sure your mongodb cluster is up and running"))
raise Exception(_("Cannot enable profiling. \
Make sure your mongodb cluster is up and running"))
class PauseDBInstance(task.Task):
def execute(self, h, DBUser, DBPassword):
LOG.debug(_('PauseDBInstance'))
# Flush the database and hold the write # lock the instance.
host_info = h['secondaryReplica'].split(':')
LOG.debug(_(host_info))
self.client = connect_server(
host_info[0], int(
host_info[1]), DBUser, DBPassword)
self.client.fsync(lock=True)
# Add code to wait until the fsync operations is complete
def revert(self, *args, **kwargs):
try:
# Resume DB
if self.client.is_locked:
self.client.unlock()
except Exception as ex:
LOG.exception(ex)
finally:
pass
class ResumeDBInstance(task.Task):
def execute(self, h, DBUser, DBPassword):
LOG.debug(_('ResumeDBInstance'))
host_info = h['secondaryReplica'].split(':')
LOG.debug(_(host_info))
self.client = connect_server(
host_info[0], int(
host_info[1]), DBUser, DBPassword)
self.client.unlock()
class PauseBalancer(task.Task):
def execute(self, DBHost, DBPort, DBUser, DBPassword):
LOG.debug(_('PauseBalancer'))
self.client = connect_server(DBHost, DBPort, DBUser, DBPassword)
# Pause the DB
dbmap = self.client.admin.command('getShardMap')
cfgsrvs = dbmap['map']['config'].split(',')
for cfgsrv in cfgsrvs:
cfghost = cfgsrv.split(':')[0]
cfgport = cfgsrv.split(':')[1]
try:
self.client = connect_server(
cfghost, cfgport, DBUser, DBPassword)
db = self.client.config
timeout = settings.get_settings().get('mongodb_stop_balancer_timeout', '300')
currtime = time.time()
db.settings.update({'_id': 'balancer'}, {
'$set': {'stopped': True}}, True)
balancer_info = db.locks.find_one({'_id': 'balancer'})
while int(str(balancer_info['state'])) > 0 and\
time.time() - currtime < timeout:
time.sleep(5)
LOG.debug(_('\t\twaiting for balancer to stop...'))
balancer_info = db.locks.find_one({'_id': 'balancer'})
if int(str(balancer_info['state'])) > 0:
LOG.error(_("Cannot stop the balancer with in the \
mongodb_stop_balancer_timeout(%d) interval") %
mongodb_stop_balancer_timeout)
raise Exception(_("Cannot stop the balancer with in the \
mongodb_stop_balancer_timeout(%d) interval") %
mongodb_stop_balancer_timeout)
return
except BaseException:
LOG.debug(_('"' + cfghost + '" appears to be offline'))
pass
LOG.error(_("Cannot pause balancer. \
Make sure your mongodb cluster is up and running"))
raise Exception(_("Cannot pause balancer. \
Make sure your mongodb cluster is up and running"))
def revert(self, *args, **kwargs):
try:
# Resume DB
db = self.client.config
db.settings.update({'_id': 'balancer'}, {
'$set': {'stopped': False}}, True)
except Exception as ex:
LOG.exception(ex)
finally:
pass
class ResumeBalancer(task.Task):
def execute(self, DBHost, DBPort, DBUser, DBPassword):
LOG.debug(_('ResumeBalancer'))
self.client = connect_server(DBHost, DBPort, DBUser, DBPassword)
# Resume DB
# Pause the DB
dbmap = self.client.admin.command('getShardMap')
cfgsrvs = dbmap['map']['config'].split(',')
for cfgsrv in cfgsrvs:
cfghost = cfgsrv.split(':')[0]
cfgport = cfgsrv.split(':')[1]
try:
self.client = connect_server(
cfghost, cfgport, DBUser, DBPassword)
db = self.client.config
db.settings.update({'_id': 'balancer'}, {
'$set': {'stopped': False}}, True)
return
except BaseException:
LOG.debug(_('"' + cfghost + '" appears to be offline'))
pass
LOG.error(_("Cannot resume balancer. \
Make sure your mongodb cluster is up and running"))
raise Exception(_("Cannot resume balancer. \
Make sure your mongodb cluster is up and running"))
class ShutdownConfigServer(task.Task):
#
# db.runCommand('getShardMap')
#{
#'map' : {
#'node2:27021' : 'node2:27021',
#'node3:27021' : 'node3:27021',
#'node4:27021' : 'node4:27021',
#'config' : 'node2:27019,node3:27019,node4:27019',
#'shard0000' : 'node2:27021',
#'shard0001' : 'node3:27021',
#'shard0002' : 'node4:27021'
#},
#'ok' : 1
#}
#'''
def execute(self, DBHost, DBPort, DBUser, DBPassword,
HostUsername, HostPassword, HostSSHPort=22, RunAsRoot=False):
# Get the list of config servers
# shutdown one of them
self.client = connect_server(DBHost, DBPort, DBUser, DBPassword)
dbmap = self.client.admin.command('getShardMap')
cfgsrvs = dbmap['map']['config'].split(',')
for cfgsrv in cfgsrvs:
cfghost = cfgsrv.split(':')[0]
cfgport = cfgsrv.split(':')[1]
try:
self.cfgclient = connect_server(cfghost, int(cfgport),
DBUser, DBPassword)
cmdlineopts = self.cfgclient.admin.command('getCmdLineOpts')
command = 'mongod --shutdown --port ' + cfgport + ' --configsvr'
if RunAsRoot:
command = 'sudo ' + command
LOG.debug(_('ShutdownConfigServer'))
try:
client = paramiko.SSHClient()
client.load_system_host_keys()
if HostPassword == '':
client.set_missing_host_key_policy(
paramiko.WarningPolicy())
else:
client.set_missing_host_key_policy(
paramiko.AutoAddPolicy())
client.connect(
cfghost,
port=HostSSHPort,
username=HostUsername,
password=HostPassword,
timeout=120)
stdin, stdout, stderr = client.exec_command(
command, timeout=120)
LOG.debug(_(stdout.read()))
finally:
client.close()
# Also make sure the config server command line operations are
# saved
return cfgsrv, cmdlineopts
except BaseException:
LOG.debug(_('"' + cfghost + '" appears to be offline'))
pass
LOG.error(_("Cannot shutdown configsrv. \
Make sure your mongodb cluster is up and running"))
raise Exception(_("Cannot shutdown configsrv. \
Make sure your mongodb cluster is up and running"))
def revert(self, *args, **kwargs):
client = None
try:
# Make sure all config servers are resumed
cfghost = kwargs['result']['cfgsrv'].split(':')[0]
# ssh into the cfg host and start the config server
if not isinstance(kwargs['result'], misc.Failure):
port = kwargs['HostSSHPort']
command = ''
for c in kwargs['cfgsrvcmdline']['argv']:
command = command + c + ' '
client = paramiko.SSHClient()
client.load_system_host_keys()
if HostPassword == '':
client.set_missing_host_key_policy(
paramiko.WarningPolicy())
else:
client.set_missing_host_key_policy(
paramiko.AutoAddPolicy())
client.connect(
cfghost,
port=kwargs['HostSSHPort'],
username=kwargs['HostUsername'],
password=kwargs['HostPassword'],
timeout=120)
stdin, stdout, stderr = client.exec_command(
command, timeout=120)
LOG.debug(_(stdout.read()))
LOG.debug(_('ShutdownConfigServer:revert'))
except Exception as ex:
LOG.exception(ex)
finally:
if client:
client.close()
class ResumeConfigServer(task.Task):
def execute(self, cfgsrv, cfgsrvcmdline, HostUsername,
HostPassword, HostSSHPort=22, RunAsRoot=False):
# Make sure all config servers are resumed
cfghost = cfgsrv.split(':')[0]
port = 22
command = ''
for c in cfgsrvcmdline['argv']:
command = command + c + ' '
if RunAsRoot:
command = 'sudo ' + command
try:
client = paramiko.SSHClient()
client.load_system_host_keys()
if HostPassword == '':
client.set_missing_host_key_policy(paramiko.WarningPolicy())
else:
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(
cfghost,
port=HostSSHPort,
username=HostUsername,
password=HostPassword,
timeout=120)
stdin, stdout, stderr = client.exec_command(command, timeout=120)
LOG.debug(_(stdout.read()))
finally:
client.close()
# ssh into the cfg host and start the config server
LOG.debug(_('ResumeConfigServer'))
# Assume there is no ordering dependency between instances
# pause each VM in parallel.
def PauseDBInstances(hosts_list):
flow = uf.Flow('PauseDBInstances')
for index, h in enumerate(hosts_list):
host_info = h['secondaryReplica'].split(':')
flow.add(
PauseDBInstance(
'PauseDBInstance_' +
h['secondaryReplica'],
rebind=[
'secondary_' +
str(index),
'DBUser',
'DBPassword']))
return flow
def ResumeDBInstances(hosts_list):
flow = uf.Flow('ResumeDBInstances')
for index, h in enumerate(hosts_list):
host_info = h['secondaryReplica'].split(':')
flow.add(
ResumeDBInstance(
'ResumeDBInstance_' +
h['secondaryReplica'],
rebind=[
'secondary_' +
str(index),
'DBUser',
'DBPassword']))
return flow
def secondaryhosts_to_backup(
cntx, host, port, username, password, preferredgroup):
#
# Creating connection to mongos server
#
LOG.debug(_('Connecting to mongos server ' + host))
connection = connect_server(host, port, username, password)
pgroup = []
if preferredgroup:
pgroup = json.loads(preferredgroup)
#
# Getting the secondaries list
#
hosts_to_backup = []
if isShardedCluster(connection):
#
# Getting sharding information
#
LOG.debug(_('Getting sharding configuration'))
shards = getShards(connection)
for s in shards:
hosts = str(s['host'])
hosts = hosts.replace(str(s['_id']), '').strip()
hosts = hosts.replace('/', '').strip()
# print('Getting secondary from hosts in ', hosts)
# Get the replica set for each shard
if username != '':
c = pymongo.MongoClient(
"mongodb://" + username + ":" + password + "@" + hosts,
read_preference=ReadPreference.SECONDARY)
else:
c = pymongo.MongoClient(
hosts, read_preference=ReadPreference.SECONDARY)
status = c.admin.command('replSetGetStatus')
# If user specified preferred group, backup only those
# replicas
if preferredgroup and len(pgroup) > 0:
preferredreplica = None
for member in pgroup:
if member['replica'] == status['set']:
preferredreplica = member['name']
# Select a replica member only when user specifies a replica
if preferredreplica:
for m in status['members']:
if m['name'] != preferredreplica:
continue
if m['stateStr'] == 'SECONDARY':
hosts_to_backup.append({'replicaSetName': status['set'],
'secondaryReplica': m['name']})
else:
LOG.error(_(preferredreplica +
" state is " +
m['stateStr'] +
". Will pick next secondary for backup"))
for m in status['members']:
if m['stateStr'] == 'SECONDARY':
hosts_to_backup.append({'replicaSetName': status['set'],
'secondaryReplica': m['name']})
break
break
else:
# if user did not specify preferred group, backup entire
# cluster
for m in status['members']:
if m['stateStr'] == 'SECONDARY':
hosts_to_backup.append({'replicaSetName': status['set'],
'secondaryReplica': m['name']})
break
else:
status = connection.admin.command('replSetGetStatus')
preferredreplica = None
if preferredgroup and len(pgroup) > 0:
for member in pgroup:
if member['replica'] == status['set']:
preferredreplica = member['name']
# Select a replica member only when user specifies a replica
if preferredreplica:
for m in status['members']:
if m['name'] != preferredreplica:
continue
if m['stateStr'] == 'SECONDARY':
hosts_to_backup.append({'replicaSetName': status['set'],
'secondaryReplica': m['name']})
else:
LOG.error(_(preferredreplica + " state is " +
m['stateStr'] +
". Will pick next secondary for backup"))
for m in status['members']:
if m['stateStr'] == 'SECONDARY':
hosts_to_backup.append({'replicaSetName': status['set'],
'secondaryReplica': m['name']})
break
break
else:
# if user did not specify preferred group, backup entire cluster
for m in status['members']:
if m['stateStr'] == 'SECONDARY':
hosts_to_backup.append({'replicaSetName': status['set'],
'secondaryReplica': m['name']})
break
if len(hosts_to_backup) == 0:
raise Exception(_("Could not identify any hosts to backup. \
Please make sure mongodb cluster is in a stable \
and try again"))
return hosts_to_backup
def get_vms(cntx, dbhost, dbport, mongodbusername,
mongodbpassword, sshport,
hostusername, hostpassword):
#
# Creating connection to mongos server
#
LOG.debug(_('Connecting to mongos server ' + dbhost))
connection = connect_server(
dbhost,
dbport,
mongodbusername,
mongodbpassword)
#
# Getting sharding information
#
LOG.debug(_('Getting sharding configuration'))
shards = getShards(connection)
#
# Getting the secondaries list
#
hostnames = {}
# This is a sharded cluster
if isShardedCluster(connection):
for s in shards:
hosts = str(s['host'])
hosts = hosts.replace(str(s['_id']), '').strip()
hosts = hosts.replace('/', '').strip()
for h in hosts.split(','):
hostname = h.split(':')[0]
if hostname not in hostnames:
hostnames[hostname] = 1
# Add config servers to the mix
dbmap = connection.admin.command('getShardMap')
cfgsrvs = dbmap['map']['config'].split(',')
for cfgsrv in cfgsrvs:
cfghost = cfgsrv.split(':')[0]
cfgport = cfgsrv.split(':')[1]
if cfghost not in hostnames:
hostnames[cfghost] = 1
if dbhost not in hostnames:
hostnames[dbhost] = 1
else:
# this is a replica set
status = connection.admin.command('replSetGetStatus')
# If user specified preferred group, backup only those
# replicas
for m in status['members']:
hostname = m['name'].split(":")[0]
hostnames[hostname] = 1
interfaces = {}
for hostname in hostnames:
try:
mac_addresses = utils.get_mac_addresses(
hostname, sshport, username=hostusername, password=hostpassword, timeout=120)
for mac in mac_addresses:
interfaces[mac.lower()] = hostname
except Exception as ex:
LOG.exception(ex)
LOG.info(
_('"' + hostname + '" appears to be offline. Cannot exec ifconfig'))
if len(interfaces) == 0:
LOG.info(
_("Unabled to login to VMs to discover MAC Addresses. Please check username/passwor and try again."))
raise Exception(
_("Unabled to login to VMs to discover MAC Addresses. Please check username/passwor and try again."))
# query VM by ethernet and get instance info here
# call nova list
compute_service = nova.API(production=True)
instances = compute_service.get_servers(cntx, admin=True)
vms = []
# call nova interface-list <instanceid> to build the list of instances ids
for instance in instances:
# The following logic helps for VMware VMs. For OpenStack instances,
# look at the instance interfaces.
for addr in json.loads(instance.metadata['networks']):
# IP Addresses
# this is our vm
if addr['macAddress'].lower() in interfaces:
hypervisor_hostname = None
hypervisor_type = "VMware vCenter Server"
clustername = "Unknown"
if 'cluster' in instance.metadata and instance.metadata['cluster']:
if json.loads(instance.metadata['cluster']):
clusprop = json.loads(instance.metadata['cluster'])[0]
clustername = clusprop['name']
hypervisor_hostname = clustername
utils.append_unique(vms,
{'vm_id': instance.id,
'vm_name': instance.name,
'vm_metadata': instance.metadata,
'vm_flavor_id': instance.flavor['id'],
'hostname': interfaces[addr['macAddress'].lower()],
'vm_power_state': instance.__dict__['OS-EXT-STS:power_state'],
'hypervisor_hostname': hypervisor_hostname,
'hypervisor_type': hypervisor_type},
"vm_id")
break
if len(vms) == 0:
LOG.info(
_("No VMs are discovered in tvault inventory. Please run discover and try again"))
raise Exception(
_("No instances are discovered in tvault inventory. Please run discover and try again"))
return vms
"""
MongoDBWorkflow Requires the following inputs in store:
'connection': FLAGS.sql_connection, # taskflow persistence connection
'context': context_dict, # context dictionary
'snapshot': snapshot, # snapshot dictionary
# Instanceids will to be discovered automatically
'host': 'mongodb1', # one of the nodes of mongodb cluster
'port': 27017, # listening port of mongos service
'username': 'ubuntu', # mongodb admin user
'password': 'ubuntu', # mongodb admin password
'hostusername': 'ubuntu', # username on the host for ssh operations
'hostpassword': '', # username on the host for ssh operations
'sshport' : 22, # ssh port that defaults to 22
'usesudo' : True, # use sudo when shutdown and restart of mongod instances
"""
class MongoDBWorkflow(workflow.Workflow):
"""
MongoDB workflow
"""
def __init__(self, name, store):
super(MongoDBWorkflow, self).__init__(name)
self._store = store
def find_first_alive_node(self):
# Iterate thru all hosts and pick the one that is alive
if 'hostnames' in self._store:
for host in [self._store['DBHost']] + \
json.loads(self._store['hostnames']):
try:
connection = connect_server(host,
int(self._store['DBPort']),
self._store['DBUser'],
self._store['DBPassword'])
self._store['DBHost'] = host
LOG.debug(_('Chose "' + host + '" for mongodb connection'))
return
except BaseException:
LOG.debug(_('"' + host + '" appears to be offline'))
pass
#LOG.warning(_( 'MongoDB cluster appears to be offline'))
#
# MongoDB flow is an directed acyclic flow.
# :param host - One of the mongo db node in the cluster that has mongos
# service running and will be used to discover the mongo db
# shards, their replicas etc
# : port - port at which mongos service is running
# : usename/password - username and password to authenticate to the database
#
def initflow(self, composite=False):
connection = connect_server(
self._store['DBHost'],
self._store['DBPort'],
self._store['DBUser'],
self._store['DBPassword'])
isMongos = isShardedCluster(connection)
self.find_first_alive_node()
cntx = amqp.RpcContext.from_dict(self._store['context'])
instances = get_vms(cntx, self._store['DBHost'],
self._store['DBPort'],
self._store['DBUser'],
self._store['DBPassword'],
self._store['HostSSHPort'],
self._store['HostUsername'],
self._store['HostPassword'])
self._store['topology'] = self.topology()
hosts_to_backup = secondaryhosts_to_backup(
cntx,
self._store['DBHost'],
self._store['DBPort'],
self._store['DBUser'],
self._store['DBPassword'],
self._store['preferredgroup'])
self._store['instances'] = []
# Filter the VMs based on the preferred secondary replica
for index, vm in enumerate(instances):
for index, srep in enumerate(hosts_to_backup):
if srep['secondaryReplica'].split(':')[0] == vm['hostname']:
self._store['instance_' + vm['vm_id']] = vm
self._store['instances'].append(vm)
break
for index, item in enumerate(hosts_to_backup):
self._store['secondary_' + str(index)] = item
# Add config server if the server is not already part
# of the instances
if isMongos:
dbmap = connection.admin.command('getShardMap')
cfgsrvs = dbmap['map']['config'].split(',')
cfgincluded = False
for cfgsrv in cfgsrvs:
cfghost = cfgsrv.split(':')[0]
cfgport = cfgsrv.split(':')[1]
for inst in self._store['instances']:
if inst['hostname'] == cfghost:
cfgincluded = True
break
if cfgincluded:
break
if not cfgincluded:
cfgadded = False
for index, vm in enumerate(instances):
for cfgsrv in cfgsrvs:
cfghost = cfgsrv.split(':')[0]
cfgport = cfgsrv.split(':')[1]
if cfghost == vm['hostname']:
self._store['instance_' + vm['vm_id']] = vm
self._store['instances'].append(vm)
cfgadded = True
break
if cfgadded:
break
snapshotvms = lf.Flow('mongodbwf')
# Add disable profile task. Stopping balancer fails if profile process
# is running
if isMongos:
snapshotvms.add(
DisableProfiling(
'DisableProfiling',
provides='proflevel'))
snapshotvms.add(PauseBalancer('PauseBalancer'))
# This will be a flow that needs to be added to mongo db flow.
# This is a flow that pauses all related VMs in unordered pattern
snapshotvms.add(PauseDBInstances(hosts_to_backup))
if isMongos:
snapshotvms.add(
ShutdownConfigServer(
'ShutdownConfigServer',
provides=(
'cfgsrv',
'cfgsrvcmdline')))
# This is an unordered pausing of VMs. This flow is created in
# common tasks library. This routine takes instance ids from
# openstack. Workload manager should provide the list of
# instance ids
snapshotvms.add(vmtasks.UnorderedPauseVMs(self._store['instances']))
# This is again unorder snapshot of VMs. This flow is implemented in
# common tasks library
snapshotvms.add(vmtasks.UnorderedSnapshotVMs(self._store['instances']))
# This is an unordered pausing of VMs.
snapshotvms.add(vmtasks.UnorderedUnPauseVMs(self._store['instances']))
# Restart the config servers so metadata changes can happen
if isMongos:
snapshotvms.add(ResumeConfigServer('ResumeConfigServer'))
# unlock all locekd replicas so it starts receiving all updates from primary and
# will eventually get into sync with primary
snapshotvms.add(ResumeDBInstances(hosts_to_backup))
if isMongos:
snapshotvms.add(ResumeBalancer('ResumeBalancer'))
# enable profiling to the level before the flow started
snapshotvms.add(EnableProfiling('EnableProfiling'))
super(MongoDBWorkflow, self).initflow(snapshotvms, composite=composite)
def get_databases(self):
LOG.info(_('Enter get_databases'))
outfile_path = ''
errfile_path = ''
try:
cntx = amqp.RpcContext.from_dict(self._store['context'])
db = WorkloadMgrDB().db
if 'snapshot' in self._store:
db.snapshot_update(
cntx, self._store['snapshot']['id'], {
'progress_msg': 'Discovering MongoDB Databases'})
fh, outfile_path = mkstemp()
os.close(fh)
fh, errfile_path = mkstemp()
os.close(fh)
cmdspec = [
"python",
"/opt/stack/workloadmgr/workloadmgr/workflows/mongodbnodes.py",
"--config-file",
"/etc/workloadmgr/workloadmgr.conf",
"--defaultnode",
self._store['DBHost'],
"--port",
self._store['HostSSHPort'],
"--username",
self._store['HostUsername'],
"--password",
"******",
"--dbport",
self._store["DBPort"],
"--dbuser",
self._store["DBUser"],
"--dbpassword",
self._store["DBPassword"],
]
if self._store.get('hostnames', None):
hosts = ""
for host in json.loads(self._store.get('hostnames', "")):
hosts += host + ';'
cmdspec.extend(["--addlnodes", hosts])
cmdspec.extend(["--outfile", outfile_path,
"--errfile", errfile_path,
])
cmd = " ".join(cmdspec)
for idx, opt in enumerate(cmdspec):
if opt == "--password":
cmdspec[idx + 1] = self._store['HostPassword']
break
LOG.debug(_('Executing: ' + " ".join(cmdspec)))
process = subprocess.Popen(cmdspec, shell=False)
stdoutdata, stderrdata = process.communicate()
if process.returncode != 0:
reason = 'Error discovering MongoDB Databases'
try:
with open(errfile_path, 'r') as fh:
reason = fh.read()
if len(reason) == 0:
reason = 'Error discovering MongoDB Databases'
LOG.info(
_('Error discovering MongoDB Databases: ' + reason))
os.remove(errfile_path)
finally:
raise exception.ErrorOccurred(reason=reason)
databases = None
with open(outfile_path, 'r') as fh:
databases = json.loads(fh.read())
LOG.info(_('Discovered MongoDB Databases: ' + str(databases)))
return databases
finally:
if os.path.isfile(outfile_path):
os.remove(outfile_path)
if os.path.isfile(errfile_path):
os.remove(errfile_path)
LOG.info(_('Exit get_databases'))
def topology(self):
# Discover the shards
# Discover the replicaset of each shard
# since mongodb supports javascript and json
# this implementation should be pretty
# straight forward
# discover the number of VMs part of this flow
# This usually boils down to number of shards of the cluster
#
# Creating connection to mongos server
#
self.find_first_alive_node()
LOG.debug(_('Connecting to mongos server ' + self._store['DBHost']))
connection = connect_server(
self._store['DBHost'],
self._store['DBPort'],
self._store['DBUser'],
self._store['DBPassword'])
replicas = []
replicahosts = {}
if isShardedCluster(connection):
#
# Getting sharding information
#
LOG.debug(_('Getting sharding configuration'))
shards = getShards(connection)
# Get the replica set for each shard
for s in shards:
hosts = str(s['host'])
hosts = hosts.replace(str(s['_id']), '').strip()
hosts = hosts.replace('/', '').strip()
replicahosts[s['_id']] = hosts
else:
# Get the replica set for each shard
status = connection.admin.command('replSetGetStatus')
s = []
for m in status['members']:
s.append(m['name'])
replicahosts[status['set']] = ",".join(s)
for replica, hosts in replicahosts.items():
LOG.debug(_('Getting secondary from hosts in ' + hosts +
" for replica " + replica))
if self._store['DBUser'] != '':
repl = pymongo.MongoClient(
"mongodb://" +
self._store['DBUser'] +
":" +
self._store['DBPassword'] +
"@" +
hosts,
read_preference=ReadPreference.SECONDARY)
else:
repl = pymongo.MongoClient(
hosts, read_preference=ReadPreference.SECONDARY)
status = repl.admin.command('replSetGetStatus')
replstatus = {}
replstatus["date"] = str(status["date"])
replstatus["children"] = []
replstatus["name"] = status.pop("set")
replstatus["status"] = "OK:" + str(status["ok"])
replstatus["input"] = []
replstatus["input"].append([])
replstatus["input"][0].append("myState")
replstatus["input"][0].append(status["myState"])
replstatus['children'] = []
for m in status["members"]:
replchild = {}
replchild['name'] = m['name']
replchild['configVersion'] = m['configVersion']
replchild["optimeDate"] = m["optimeDate"].strftime("%B %d, %Y")
replchild["status"] = m.pop("stateStr")
replchild["uptime"] = m.pop("uptime")
if ('electionDate' in m):
m.pop('electionDate')
if ('electionTime' in m):
m.pop('electionTime')
if ("lastHeartbeatRecv" in m):
replchild["lastHeartbeatRecv"] = str(
m["lastHeartbeatRecv"])
if ("lastHeartbeat" in m):
replchild["lastHeartbeat"] = str(m["lastHeartbeat"])
if ("optime" in m):
replchild["optime"] = m['optime'].as_datetime(
).strftime("%B %d, %Y")
replchild["input"] = []
replchild["input"].append([])
replchild["input"].append([])
replchild["input"].append([])
if ("syncingTo" in m):
replchild["input"][0].append("syncingTo")
replchild["input"][0].append(m["syncingTo"])
replchild["input"][1].append("state")
replchild["input"][1].append(m["state"])
replchild["input"][2].append("health")
replchild["input"][2].append(m["health"])
replstatus['children'].append(replchild)
replicas.append(replstatus)
# Covert the topology into generic topology that can be
# returned as restful payload
mongodb = {"name": "MongoDB", "children": replicas, "input": []}
return dict(topology=mongodb,
databases=self.get_databases()['databases'])
def details(self):
# workflow details based on the
# current topology, number of VMs etc
def recurseflow(item):
if isinstance(item, task.Task):
taskdetails = {
'name': item._name.split("_")[0],
'type': 'Task'}
taskdetails['input'] = []
if len(item._name.split('_')) == 2:
nodename = item._name.split("_")[1]
for n in nodes['instances']:
if n['vm_id'] == nodename:
nodename = n['vm_name']
taskdetails['input'] = [['vm', nodename]]
return taskdetails
flowdetails = {}
flowdetails['name'] = str(item).split("==")[0]
flowdetails['type'] = str(item).split('.')[2]
flowdetails['children'] = []
for it in item:
flowdetails['children'].append(recurseflow(it))
return flowdetails
nodes = self.discover()
workflow = recurseflow(self._flow)
return dict(workflow=workflow)
def discover(self):
self.find_first_alive_node()
cntx = amqp.RpcContext.from_dict(self._store['context'])
instances = get_vms(cntx, self._store['DBHost'],
self._store['DBPort'], self._store['DBUser'],
self._store['DBPassword'],
self._store['HostSSHPort'],
self._store['HostUsername'],
self._store['HostPassword'])
for instance in instances:
del instance['hypervisor_hostname']
del instance['hypervisor_type']
return dict(instances=instances, topology=self.topology())
def execute(self):
if self._store['source_platform'] == "vmware":
compute_service = nova.API(production=True)
search_opts = {}
search_opts['deep_discover'] = '1'
cntx = amqp.RpcContext.from_dict(self._store['context'])
compute_service.get_servers(cntx, search_opts=search_opts)
self.find_first_alive_node()
vmtasks.CreateVMSnapshotDBEntries(
self._store['context'],
self._store['instances'],
self._store['snapshot'])
result = engines.run(
self._flow,
engine_conf='parallel',
backend={
'connection': self._store['connection']},
store=self._store)
# workloadmgr --os-auth-url http://$KEYSTONE_AUTH_HOST:5000/v2.0
# --os-tenant-name admin --os-username admin --os-password
# $ADMIN_PASSWORD workload-type-create --metadata HostUsername=string
# --metadata HostPassword=password --metadata HostSSHPort=string
# --metadata DBHost=string --metadata DBPort=string --metadata
# DBUser=string --metadata DBPassword=password --metadata
# RunAsRoot=boolean --metadata capabilities='discover:topology'
# --display-name "MongoDB" --display-description "MongoDB workload
# description" --is-public True
"""
#test code
import json
#MongoDBWorkflow Requires the following inputs in store:
store = {
'connection':'mysql://root:project1@10.6.255.110/workloadmgr?charset=utf8',
#'context': context_dict, # context dictionary
#'snapshot': snapshot, # snapshot dictionary
# Instanceids will to be discovered automatically
'host': 'mongodb1', # one of the nodes of mongodb cluster
'port': 27017, # listening port of mongos service
'username': 'ubuntu', # mongodb admin user
'password': 'ubuntu', # mongodb admin password
'hostusername': 'ubuntu', # username on the host for ssh operations
'hostpassword': '', # username on the host for ssh operations
'sshport' : 22, # ssh port that defaults to 22
'usesudo' : True, # use sudo when shutdown and restart of mongod instances
}
c = nova.novaclient(None, production=True, admin=True);
context = context.RequestContext("fc5e4f521b6a464ca401c456d59a3f61",
c.client.projectid,
is_admin = True,
auth_token=c.client.auth_token)
store["context"] = context.__dict__
mwf = MongoDBWorkflow("testflow", context)
print(json.dumps(mwf.details()))
print(json.dumps(mwf.discover()))
print(json.dumps(mwf.topology()))
result = engines.load(mwf._flow, engine_conf='parallel', backend={'connection':'mysql://root:project1@10.6.255.110/workloadmgr?charset=utf8'}, store=store)
print(mwf.execute())
"""