Repository URL to install this package:
|
Version:
0.3.30 ▾
|
import json
import collections
import subprocess
import sys
import uuid
import yaml
import requests
import xml.etree.ElementTree as ET
import os.path
import multiprocessing
import traceback
from distutils.version import StrictVersion
from termcolor import colored
from httplib import OK
import docker.utils.ports
from . import VERSION
# Options in yaml definitions of applications that
# are passed as part of the 'HostConfig' of a container
HOST_CONFIG_KEYS = (
'publish_all_ports',
'restart_policy',
'extra_hosts',
'port_bindings',
'mem_limit'
)
class NiceRepr(object):
def __repr__(self):
return '{cls}({props})'.format(
cls=type(self).__name__,
props=', '.join(
'{k}={v!r}'.format(k=k, v=v)
for k, v in vars(self).items()
if k not in getattr(self, '__repr_blacklist__', ())
)
)
def normpath(pth):
return os.path.abspath(os.path.normpath(os.path.expanduser(os.path.expandvars(pth))))
def memoize(f):
'''
Basic memoize decorator. Returns previously computed values
based on arguments.
NOTE: will raise a TypeError if any argument values are non-hashable.
'''
def memoized(*a, **k):
call_key = (a, tuple(sorted(k.items())))
if not hasattr(f, '_memo_calls'):
f._memo_calls = {}
if call_key not in f._memo_calls:
f._memo_calls[call_key] = f(*a, **k)
return f._memo_calls[call_key]
return memoized
def autodict():
return collections.defaultdict(autodict)
def flatten(lst):
return [item for sublist in lst for item in sublist]
def get_host(name_or_ip, hosts):
matching = filter(lambda h: name_or_ip in (h.name, h.ip), hosts)
if not matching:
return None
if len(matching) == 1:
return matching[0]
else:
raise Exception(
"Too many hosts match {!r}: {!r}".format(
name_or_ip, [(h.name, h.ip) for h in hosts]
)
)
def list_containers(host, grep=None, show_all=False, show_restart=False, quiet=False):
containers = host.get_containers(grep, show_all, show_restart)
if quiet:
return "\n".join(c.container_id for c in containers)
else:
containers = [colored(str(host), 'yellow')] + [colored(str(c), 'green') for c in containers]
return "\n".join(containers)
def rm_containers(host, force, container_ids):
return host.remove_containers(container_ids, force=force)
def start_containers(host, container_ids):
return host.start_containers(container_ids)
def stop_containers(host, container_ids):
return host.stop_containers(container_ids)
def restart_containers(host, container_ids):
return host.restart_containers(container_ids)
def get_images(host, name=None):
ret = host.images(name)
return (json.dumps(x) for x in ret)
def update_pass():
try:
print("Updating secrets in Pass...")
proc = subprocess.Popen(
['pass', 'git', 'pull'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
stderr, stdout = proc.communicate()
print stdout
print stderr
if proc.returncode != 0:
raise Exception("Failed to update Pass: " + stdout)
except OSError:
raise Exception("Pass not installed, please install pass")
def get_existing_ports(containers):
existing_ports = []
for container in [x for x in containers if len(x.ports) > 0]:
existing_ports.append([x.get('PublicPort') for x in container.ports])
# flatten nested lists
return [item for sublist in existing_ports for item in sublist]
def build_port_bindings_from_dicts(ports):
if ports is None:
return {}
simple_ports = [
'{}:{}:{}'.format(
port.get('ip', ''),
port.get('publicports', [''])[0],
str(port['privateport'])
) for port in ports.values()
]
return docker.utils.ports.build_port_bindings(simple_ports)
def run_containers(host, image, container_type, number_containers, wait):
env_vars = build_envvars(host, image, container_type)
container_details = get_container_details(host, container_type, image)
# replace docker/docker-index:0.28.0
# with docker-index-0.28.0
name = "{}-{}".format(host.environment, image.replace(":", "-").split("/")[1])
container_ids = []
for i in xrange(number_containers):
unique_container_name = "{}_{}-{}".format(name, container_type, uuid.uuid1().hex[:8])
if isinstance(container_details.get('ports'), list):
container_details['port_bindings'] = docker.utils.ports.build_port_bindings(container_details['ports'])
else:
container_details['port_bindings'] = build_port_bindings_from_dicts(container_details.get('ports'))
# Fix 'host' binds to use the host's IP address
for port, binds in container_details['port_bindings'].items():
for i, bind in enumerate(binds):
if isinstance(bind, tuple) and len(bind) == 2 and bind[0] == 'host':
binds[i] = (host.ip, bind[1])
host_details = dict(
(k, container_details.get(k))
for k in HOST_CONFIG_KEYS
)
container_ids.append(host.create_container(
image,
container_details.get('command'),
environment=env_vars,
host_config=docker.utils.create_host_config(**host_details),
ports=container_details['port_bindings'].keys(),
name=unique_container_name
))
ret = []
for c_id in [x.get('Id') for x in container_ids]:
containers = host.get_containers(show_all=True)
idx = [container.container_id for container in containers].index(c_id)
try:
container = containers[idx]
except IndexError:
return "Couldn't find container."
ret.append(container.start())
print("Starting container: {}".format(c_id))
if wait:
# XXX: mixed return types are bad
print('Waiting for containers')
return dict((c_id['Id'], host.wait(c_id)) for c_id in container_ids)
return ret
def build_envvars(host, image, container_type):
# TODO: move to config or elsewhere? or pass in values from config
from . import config
print("Decrypting Secrets from Pass...")
image, version = image.split(':')
# 'docker/docker-index:0.28.0-nicktest'
environment = host.environment
path = config.get_environment_details(image, environment)
print("Constructing environment from {}".format(path))
try:
with open(path, 'r') as f:
env_vars = yaml.load(f)
except:
raise Exception("Error loading env_vars: `{}`".format(path))
for k, v in env_vars.iteritems():
if str(v).startswith('pass show'):
env_vars[k] = subprocess.check_output(v.split(" ")).strip()
consul_env_vars = {'SERVICE_22_NAME': "{}-ssh".format(config.get_service_name(image))}
if container_type == 'web':
consul_env_vars['SERVICE_80_NAME'] = config.get_service_name(image)
env_vars.update(consul_env_vars)
print("Finished decrypting secrets")
return env_vars
def get_container_details(host, container_type, image):
# TODO: move to config or elsewhere? or pass in values from config
from . import config
# TODO: standardize image/label parsing
image, version = image.split(':')
environment = host.environment
path = config.get_application_details(image, environment)
print("Gathering app config from {}".format(path))
try:
with open(path, 'r') as f:
app_details = yaml.load(f)
except Exception as e:
raise Exception("Error loading `{}`\n{}".format(path, e))
container_details = app_details.get(container_type)
return container_details
def check_for_update():
try:
resp = requests.get('https://pypi.fury.io/docker-io/hubboss?auth=sSKjtzxT1CdrpaZaEdcw', timeout=5)
html = ET.fromstring(resp.content)
if resp.status_code != OK:
pass
version_list = []
for anchor in html.findall('body/a'):
version_list.append(StrictVersion(anchor.text.split("-")[-1].replace(".tar.gz", "")))
version_list.sort()
latest_version = version_list[-1]
current_version = StrictVersion(VERSION)
if current_version == latest_version:
pass
elif current_version > latest_version:
print >>sys.stderr, "Currently using newer version than available on GemFury"
else:
print >>sys.stderr, "HubBoss-{} is installed. Consider upgrading to `{}`.".format(current_version, latest_version)
except Exception as exc:
print >>sys.stderr, "Failed to check for updates. You may not have the latest version."
print >>sys.stderr, "The exception encountered was: {!r}".format(exc)
def remove_dupes(iterable):
seen = set()
for x in iterable:
if x in seen:
continue
seen.add(x)
yield x
class MultiJSONDecoder(json.decoder.JSONDecoder):
def decode(self, s, _w=json.decoder.WHITESPACE.match):
"""
Generate a series of decoded objects from the JSON string.
"""
end = 0
while end != len(s):
obj, end = self.raw_decode(s, idx=_w(s, end).end())
end = _w(s, end).end()
yield obj
def json_loads_multi(data, **kw):
'''
>>> s = '{}{}{}'
>>> list(json_loads_multi(s)) == [{}, {}, {}]
True
'''
kw.setdefault('cls', MultiJSONDecoder)
return json.loads(data, **kw)
def json_load_multi(fp, **kw):
'''
>>> import io
>>> s = io.StringIO('{}{}{}')
>>> list(json_load_multi(s)) == [{}, {}, {}]
True
'''
kw.setdefault('cls', MultiJSONDecoder)
return json.load(fp, **kw)
class ParallelHostOperation(object):
STOP = 'STOP'
def __init__(self, hosts):
self.hosts = hosts
self.work_queue = multiprocessing.Queue()
self.done_queue = multiprocessing.Queue()
self.processes = []
self.num_workers = len(self.hosts)
def start(self, function, args, kwargs):
for host in self.hosts:
self.processes.append(Process(
target=self.work_loop,
name=host.name,
args=(self.work_queue, self.done_queue, function, host.to_dict())
))
self.work_queue.put((args, kwargs))
for proc in self.processes:
self.work_queue.put(self.STOP)
for proc in self.processes:
proc.start()
def finish(self):
for p in self.processes:
try:
p.join()
except Exception as e:
# import traceback; traceback.print_exc()
p.terminate()
self.done_queue.put(self.STOP)
return dict(
(host_name, (p_stdout, p_stderr))
for (host_name, p_stdout, p_stderr)
in iter(self.done_queue.get, self.STOP)
)
def work_loop(self, work_queue, done_queue, function, host_dict):
# TODO: restructure imports to break cycle
from . import hosts
host = hosts.Host.from_dict(host_dict)
for args, kwargs in iter(work_queue.get, 'STOP'):
stdout, stderr = self.do_work(function, host, *args, **kwargs)
self.done_queue.put((host.name, json.loads(stdout), json.loads(stderr)))
return True
def do_work(self, function, host, *args, **kwargs):
result = None
error = None
try:
result = function(host, *args, **kwargs)
except Exception as e:
error = dict(
cls=type(e).__name__,
message=str(e),
traceback=traceback.format_exc(),
)
return json.dumps(result), json.dumps(error)
# Thanks to http://blog.schettino72.net/posts/python-code-coverage-multiprocessing.html
def coverage_multiprocessing_process(): # pragma: no cover
try:
import coverage
except ImportError:
# give up monkey-patching if coverage not installed
return
from coverage.collector import Collector
try:
from coverage.control import coverage as coverage_control
except ImportError:
# This should work with newer versions of the coverage package
from coverage.control import Coverage as coverage_control
# detect if coverage was running in forked process
if Collector._collectors:
class Process_WithCoverage(multiprocessing.Process):
def _bootstrap(self):
cov = coverage_control(data_suffix=True)
cov.start()
try:
return multiprocessing.Process._bootstrap(self)
finally:
cov.stop()
cov.save()
return Process_WithCoverage
ProcessCoverage = coverage_multiprocessing_process()
if ProcessCoverage is not None:
Process = ProcessCoverage
else:
Process = multiprocessing.Process