Repository URL to install this package:
|
Version:
0.3.30 ▾
|
import types
import tempfile
from multiprocessing import current_process
from . import hosts
from . import config
def write_output(obj, f):
try:
if obj is None:
return True
elif isinstance(obj, basestring):
f.write(obj)
elif isinstance(obj, types.GeneratorType) or isinstance(obj, list):
for line in obj:
write_output(line, f)
elif isinstance(obj, dict):
for k, v in obj.iteritems():
write_output("{}: {}\n".format(k, v), f)
else:
raise Exception("unknown result; can't write to disk. result was: {!r}".format(obj))
except Exception as e:
raise Exception("Failed to write output. \n{}".format(e.message))
return True
def worker(work_queue, done_queue, function):
try:
for hostname, url, environment, ip, kwargs in iter(work_queue.get, 'STOP'):
# multiprocessing doesn't support shared state, so we have to construct the
# host objects again based upon the url passed in the queue
host = hosts.Host(name=hostname, base_url=url, tls=config.config.get_tls(url), environment=environment, ip=ip)
try:
if kwargs is not None:
ret = function(host, **kwargs)
else:
ret = function(host)
except Exception as e:
import traceback; traceback.print_exc()
done_queue.put("Failed to call `{}`: '{}'".format(function.__name__, e.message))
continue
try:
with tempfile.NamedTemporaryFile(delete=False) as output:
# TODO: send JSON blobs back
write_output(ret, output)
done_queue.put(output.name)
except Exception as e:
done_queue.put(e.message)
continue
except Exception as e:
import traceback; traceback.print_exc()
done_queue.put("{} failed with: {}".format(current_process().name, e.message))
return True