Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
hubboss / boss / worker.py
Size: Mime:
import types
import tempfile
from multiprocessing import current_process
from . import hosts
from . import config


def write_output(obj, f):
    try:
        if obj is None:
            return True
        elif isinstance(obj, basestring):
            f.write(obj)
        elif isinstance(obj, types.GeneratorType) or isinstance(obj, list):
            for line in obj:
                write_output(line, f)
        elif isinstance(obj, dict):
            for k, v in obj.iteritems():
                write_output("{}: {}\n".format(k, v), f)
        else:
            raise Exception("unknown result; can't write to disk. result was: {!r}".format(obj))
    except Exception as e:
        raise Exception("Failed to write output. \n{}".format(e.message))

    return True


def worker(work_queue, done_queue, function):
    try:
        for hostname, url, environment, ip, kwargs in iter(work_queue.get, 'STOP'):
            # multiprocessing doesn't support shared state, so we have to construct the
            # host objects again based upon the url passed in the queue
            host = hosts.Host(name=hostname, base_url=url, tls=config.config.get_tls(url), environment=environment, ip=ip)

            try:
                if kwargs is not None:
                    ret = function(host, **kwargs)
                else:
                    ret = function(host)
            except Exception as e:
                import traceback; traceback.print_exc()
                done_queue.put("Failed to call `{}`: '{}'".format(function.__name__, e.message))
                continue

            try:
                with tempfile.NamedTemporaryFile(delete=False) as output:
                    # TODO: send JSON blobs back
                    write_output(ret, output)
                    done_queue.put(output.name)
            except Exception as e:
                done_queue.put(e.message)
                continue

    except Exception as e:
        import traceback; traceback.print_exc()
        done_queue.put("{} failed with: {}".format(current_process().name, e.message))
    return True