Repository URL to install this package:
|
Version:
0.3.30 ▾
|
from multiprocessing import Queue
# TODO: config should not be necessary for a basic functional decorator.
from . import utils
from . import config
from . import worker
class parallel(object):
def __init__(self, task):
self.task = task
def __call__(self, func):
def wrapper(hosts, app, **kwargs):
work_queue = Queue()
done_queue = Queue()
processes = []
workers = 0
if isinstance(hosts, basestring):
hosts = [hosts, ]
for host_name in hosts:
if host_name.lower() in config.get_host_groups():
for _host in config.get_hosts_for_group(host_name):
work_queue.put((_host.name, _host.base_url, _host.environment, _host.ip, func(**kwargs)))
workers += 1
else:
host = utils.get_host(
host_name, utils.flatten(app.config.hosts.values())
)
if host:
work_queue.put((host.name, host.base_url, host.environment, host.ip, func(**kwargs)))
workers += 1
else:
raise ValueError("Couldn't find host")
for w in xrange(workers):
p = utils.Process(target=worker.worker, args=(work_queue, done_queue, self.task))
p.start()
processes.append(p)
for w in xrange(workers):
work_queue.put('STOP')
for p in processes:
try:
p.join()
except Exception as e:
print e
p.terminate()
done_queue.put('STOP')
return [status for status in iter(done_queue.get, 'STOP')]
return wrapper