Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
hubboss / boss / decorators.py
Size: Mime:
from multiprocessing import Queue

# TODO: config should not be necessary for a basic functional decorator.
from . import utils
from . import config
from . import worker


class parallel(object):
    def __init__(self, task):
        self.task = task

    def __call__(self, func):
        def wrapper(hosts, app, **kwargs):
            work_queue = Queue()
            done_queue = Queue()
            processes = []
            workers = 0

            if isinstance(hosts, basestring):
                hosts = [hosts, ]

            for host_name in hosts:
                if host_name.lower() in config.get_host_groups():
                    for _host in config.get_hosts_for_group(host_name):
                        work_queue.put((_host.name, _host.base_url, _host.environment, _host.ip, func(**kwargs)))
                        workers += 1
                else:
                    host = utils.get_host(
                        host_name, utils.flatten(app.config.hosts.values())
                    )

                    if host:
                        work_queue.put((host.name, host.base_url, host.environment, host.ip, func(**kwargs)))
                        workers += 1
                    else:
                        raise ValueError("Couldn't find host")

            for w in xrange(workers):
                p = utils.Process(target=worker.worker, args=(work_queue, done_queue, self.task))
                p.start()
                processes.append(p)

            for w in xrange(workers):
                work_queue.put('STOP')

            for p in processes:
                try:
                    p.join()
                except Exception as e:
                    print e
                    p.terminate()
            done_queue.put('STOP')
            return [status for status in iter(done_queue.get, 'STOP')]

        return wrapper