Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
hubboss / boss / utils.py
Size: Mime:
import json
import collections
import subprocess
import sys
import uuid
import yaml
import requests
import xml.etree.ElementTree as ET
import os.path
import multiprocessing
import traceback


from distutils.version import StrictVersion
from termcolor import colored
from httplib import OK

import docker.utils.ports

from . import VERSION


# Options in yaml definitions of applications that
# are passed as part of the 'HostConfig' of a container
HOST_CONFIG_KEYS = (
    'publish_all_ports',
    'restart_policy',
    'extra_hosts',
    'port_bindings',
    'mem_limit'
)


class NiceRepr(object):
    def __repr__(self):
        return '{cls}({props})'.format(
            cls=type(self).__name__,
            props=', '.join(
                '{k}={v!r}'.format(k=k, v=v)
                for k, v in vars(self).items()
                if k not in getattr(self, '__repr_blacklist__', ())
            )
        )


def normpath(pth):
    return os.path.abspath(os.path.normpath(os.path.expanduser(os.path.expandvars(pth))))


def memoize(f):
    '''
    Basic memoize decorator. Returns previously computed values
    based on arguments.

    NOTE: will raise a TypeError if any argument values are non-hashable.
    '''
    def memoized(*a, **k):
        call_key = (a, tuple(sorted(k.items())))
        if not hasattr(f, '_memo_calls'):
            f._memo_calls = {}

        if call_key not in f._memo_calls:
            f._memo_calls[call_key] = f(*a, **k)

        return f._memo_calls[call_key]

    return memoized


def autodict():
    return collections.defaultdict(autodict)


def flatten(lst):
    return [item for sublist in lst for item in sublist]


def get_host(name_or_ip, hosts):
    matching = filter(lambda h: name_or_ip in (h.name, h.ip), hosts)
    if not matching:
        return None

    if len(matching) == 1:
        return matching[0]

    else:
        raise Exception(
            "Too many hosts match {!r}: {!r}".format(
                name_or_ip, [(h.name, h.ip) for h in hosts]
            )
        )


def list_containers(host, grep=None, show_all=False, show_restart=False, quiet=False):
    containers = host.get_containers(grep, show_all, show_restart)
    if quiet:
        return "\n".join(c.container_id for c in containers)
    else:
        containers = [colored(str(host), 'yellow')] + [colored(str(c), 'green') for c in containers]
        return "\n".join(containers)


def rm_containers(host, force, container_ids):
    return host.remove_containers(container_ids, force=force)


def start_containers(host, container_ids):
    return host.start_containers(container_ids)


def stop_containers(host, container_ids):
    return host.stop_containers(container_ids)


def restart_containers(host, container_ids):
    return host.restart_containers(container_ids)


def get_images(host, name=None):
    ret = host.images(name)
    return (json.dumps(x) for x in ret)


def update_pass():
    try:
        print("Updating secrets in Pass...")
        proc = subprocess.Popen(
            ['pass', 'git', 'pull'],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        stderr, stdout = proc.communicate()
        print stdout
        print stderr
        if proc.returncode != 0:
            raise Exception("Failed to update Pass: " + stdout)
    except OSError:
        raise Exception("Pass not installed, please install pass")


def get_existing_ports(containers):
    existing_ports = []

    for container in [x for x in containers if len(x.ports) > 0]:
        existing_ports.append([x.get('PublicPort') for x in container.ports])

    # flatten nested lists
    return [item for sublist in existing_ports for item in sublist]

def build_port_bindings_from_dicts(ports):
    if ports is None:
        return {}

    simple_ports = [
        '{}:{}:{}'.format(
            port.get('ip', ''),
            port.get('publicports', [''])[0],
            str(port['privateport'])
        ) for port in ports.values()
    ]

    return docker.utils.ports.build_port_bindings(simple_ports)


def run_containers(host, image, container_type, number_containers, wait):
    env_vars = build_envvars(host, image, container_type)
    container_details = get_container_details(host, container_type, image)
    # replace docker/docker-index:0.28.0
    # with docker-index-0.28.0

    name = "{}-{}".format(host.environment, image.replace(":", "-").split("/")[1])
    container_ids = []
    for i in xrange(number_containers):
        unique_container_name = "{}_{}-{}".format(name, container_type, uuid.uuid1().hex[:8])

        if isinstance(container_details.get('ports'), list):
            container_details['port_bindings'] = docker.utils.ports.build_port_bindings(container_details['ports'])
        else:
            container_details['port_bindings'] = build_port_bindings_from_dicts(container_details.get('ports'))

        # Fix 'host' binds to use the host's IP address
        for port, binds in container_details['port_bindings'].items():
            for i, bind in enumerate(binds):
                if isinstance(bind, tuple) and len(bind) == 2 and bind[0] == 'host':
                    binds[i] = (host.ip, bind[1])

        host_details = dict(
            (k, container_details.get(k))
            for k in HOST_CONFIG_KEYS
        )

        container_ids.append(host.create_container(
            image,
            container_details.get('command'),
            environment=env_vars,
            host_config=docker.utils.create_host_config(**host_details),
            ports=container_details['port_bindings'].keys(),
            name=unique_container_name
        ))

    ret = []

    for c_id in [x.get('Id') for x in container_ids]:
        containers = host.get_containers(show_all=True)
        idx = [container.container_id for container in containers].index(c_id)
        try:
            container = containers[idx]
        except IndexError:
            return "Couldn't find container."

        ret.append(container.start())
        print("Starting container: {}".format(c_id))

    if wait:
        # XXX: mixed return types are bad
        print('Waiting for containers')
        return dict((c_id['Id'], host.wait(c_id)) for c_id in container_ids)

    return ret


def build_envvars(host, image, container_type):
    # TODO: move to config or elsewhere? or pass in values from config
    from . import config

    print("Decrypting Secrets from Pass...")
    image, version = image.split(':')
    # 'docker/docker-index:0.28.0-nicktest'

    environment = host.environment
    path = config.get_environment_details(image, environment)
    print("Constructing environment from {}".format(path))
    try:
        with open(path, 'r') as f:
            env_vars = yaml.load(f)
    except:
        raise Exception("Error loading env_vars: `{}`".format(path))
    for k, v in env_vars.iteritems():
        if str(v).startswith('pass show'):
            env_vars[k] = subprocess.check_output(v.split(" ")).strip()

    consul_env_vars = {'SERVICE_22_NAME': "{}-ssh".format(config.get_service_name(image))}
    if container_type == 'web':
        consul_env_vars['SERVICE_80_NAME'] = config.get_service_name(image)

    env_vars.update(consul_env_vars)

    print("Finished decrypting secrets")
    return env_vars


def get_container_details(host, container_type, image):
    # TODO: move to config or elsewhere? or pass in values from config
    from . import config

    # TODO: standardize image/label parsing
    image, version = image.split(':')
    environment = host.environment
    path = config.get_application_details(image, environment)
    print("Gathering app config from {}".format(path))
    try:
        with open(path, 'r') as f:
            app_details = yaml.load(f)
    except Exception as e:
        raise Exception("Error loading `{}`\n{}".format(path, e))

    container_details = app_details.get(container_type)

    return container_details


def check_for_update():
    try:
        resp = requests.get('https://pypi.fury.io/docker-io/hubboss?auth=sSKjtzxT1CdrpaZaEdcw', timeout=5)
        html = ET.fromstring(resp.content)
        if resp.status_code != OK:
            pass
        version_list = []
        for anchor in html.findall('body/a'):
            version_list.append(StrictVersion(anchor.text.split("-")[-1].replace(".tar.gz", "")))
        version_list.sort()

        latest_version = version_list[-1]
        current_version = StrictVersion(VERSION)

        if current_version == latest_version:
            pass
        elif current_version > latest_version:
            print >>sys.stderr, "Currently using newer version than available on GemFury"
        else:
            print >>sys.stderr, "HubBoss-{} is installed. Consider upgrading to `{}`.".format(current_version, latest_version)
    except Exception as exc:
        print >>sys.stderr, "Failed to check for updates. You may not have the latest version."
        print >>sys.stderr, "The exception encountered was: {!r}".format(exc)


def remove_dupes(iterable):
    seen = set()
    for x in iterable:
        if x in seen:
            continue

        seen.add(x)
        yield x


class MultiJSONDecoder(json.decoder.JSONDecoder):
    def decode(self, s, _w=json.decoder.WHITESPACE.match):
        """
        Generate a series of decoded objects from the JSON string.
        """
        end = 0

        while end != len(s):
            obj, end = self.raw_decode(s, idx=_w(s, end).end())
            end = _w(s, end).end()
            yield obj


def json_loads_multi(data, **kw):
    '''
    >>> s = '{}{}{}'
    >>> list(json_loads_multi(s)) == [{}, {}, {}]
    True
    '''
    kw.setdefault('cls', MultiJSONDecoder)
    return json.loads(data, **kw)


def json_load_multi(fp, **kw):
    '''
    >>> import io
    >>> s = io.StringIO('{}{}{}')
    >>> list(json_load_multi(s)) == [{}, {}, {}]
    True
    '''
    kw.setdefault('cls', MultiJSONDecoder)
    return json.load(fp, **kw)


class ParallelHostOperation(object):
    STOP = 'STOP'

    def __init__(self, hosts):
        self.hosts = hosts
        self.work_queue = multiprocessing.Queue()
        self.done_queue = multiprocessing.Queue()
        self.processes = []
        self.num_workers = len(self.hosts)

    def start(self, function, args, kwargs):
        for host in self.hosts:
            self.processes.append(Process(
                target=self.work_loop,
                name=host.name,
                args=(self.work_queue, self.done_queue, function, host.to_dict())
            ))
            self.work_queue.put((args, kwargs))

        for proc in self.processes:
            self.work_queue.put(self.STOP)

        for proc in self.processes:
            proc.start()

    def finish(self):
        for p in self.processes:
            try:
                p.join()
            except Exception as e:
                # import traceback; traceback.print_exc()
                p.terminate()

        self.done_queue.put(self.STOP)

        return dict(
            (host_name, (p_stdout, p_stderr))
            for (host_name, p_stdout, p_stderr)
            in iter(self.done_queue.get, self.STOP)
        )

    def work_loop(self, work_queue, done_queue, function, host_dict):
        # TODO: restructure imports to break cycle

        from . import hosts
        host = hosts.Host.from_dict(host_dict)
        for args, kwargs in iter(work_queue.get, 'STOP'):
            stdout, stderr = self.do_work(function, host, *args, **kwargs)
            self.done_queue.put((host.name, json.loads(stdout), json.loads(stderr)))

        return True

    def do_work(self, function, host, *args, **kwargs):
        result = None
        error = None
        try:
            result = function(host, *args, **kwargs)
        except Exception as e:
            error = dict(
                cls=type(e).__name__,
                message=str(e),
                traceback=traceback.format_exc(),
            )

        return json.dumps(result), json.dumps(error)


# Thanks to http://blog.schettino72.net/posts/python-code-coverage-multiprocessing.html
def coverage_multiprocessing_process():  # pragma: no cover
    try:
        import coverage
    except ImportError:
        # give up monkey-patching if coverage not installed
        return

    from coverage.collector import Collector
    try:
        from coverage.control import coverage as coverage_control
    except ImportError:
        # This should work with newer versions of the coverage package
        from coverage.control import Coverage as coverage_control
    # detect if coverage was running in forked process
    if Collector._collectors:
        class Process_WithCoverage(multiprocessing.Process):
            def _bootstrap(self):
                cov = coverage_control(data_suffix=True)
                cov.start()
                try:
                    return multiprocessing.Process._bootstrap(self)
                finally:
                    cov.stop()
                    cov.save()
        return Process_WithCoverage

ProcessCoverage = coverage_multiprocessing_process()
if ProcessCoverage is not None:
    Process = ProcessCoverage
else:
    Process = multiprocessing.Process