Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
hubboss / boss / tls_config.py
Size: Mime:
import sys
import urlparse
import os
import os.path
import ssl
import docker
import warnings

from . import utils


class TLSProvider(object):
    config_section = 'tls'
    fields = ()

    def __init__(self, config):
        self.config = config

        for field in self.fields:
            setattr(self, field, config.get(self.config_section, field))

        missing = [x for x in self.fields if getattr(self, x, None) is None]

        if any(missing):
            raise Exception(
                "The following fields are missing from"
                "the {section!r} config section: {fields}".format(
                    section=self.config_section,
                    fields=', '.join(missing)
                )
            )

    def get_config(self, url):
        raise NotImplementedError

    @staticmethod
    def to_docker_tls(client_cert, client_key, server_cert):
        return docker.tls.TLSConfig(
            client_cert=(client_cert, client_key),
            # ca_cert=server_cert,
            verify=False,
            ssl_version=ssl.PROTOCOL_TLSv1
        )

    @classmethod
    def get_tls_config(cls, config, url):
        exc_info = None
        provider = None

        providers = (MachineTLSProvider, BasicTLSProvider)
        if config.default_get('tls', 'env', None):
            providers += (EnvTLSProvider,)

        for kls in providers:
            try:
                provider = kls(config)
            except Exception:
                exc_info = sys.exc_info()
                continue
            else:
                break

        if provider is None and exc_info is not None:
            raise exc_info[0], exc_info[1], exc_info[2]

        if isinstance(provider, EnvTLSProvider):
            warnings.warn("using TLS settings from environment")

        return provider.get_config(url)


class BasicTLSProvider(TLSProvider):
    fields = (
        'server_cert',
        'client_cert',
        'client_key'
    )

    def __init__(self, config):
        super(BasicTLSProvider, self).__init__(config)
        self.client_cert = utils.normpath(self.client_cert.format('$HOME'))
        self.client_key = utils.normpath(self.client_key.format('$HOME'))
        self.server_cert = utils.normpath(self.server_cert.format('$HOME'))

    def get_config(self, url):
        return self.to_docker_tls(
            self.client_cert,
            self.client_key,
            self.server_cert
        )


class MachineTLSProvider(TLSProvider):
    fields = (
        'base_dir',
        'server_cert_name',
        'client_cert_name',
        'client_key_name',
    )

    def __init__(self, config):
        self.base_dir = '{}/.docker/machine'
        self.client_cert_name = None
        self.client_key_name = None
        self.server_cert_name = None

        super(MachineTLSProvider, self).__init__(config)

        self.base_dir = utils.normpath(self.base_dir.format('$HOME'))

    def get_config(self, url):
        hostname = urlparse.urlparse(url).hostname
        machine_dir = os.path.join(self.base_dir, 'machines', hostname)
        default_dir = os.path.join(self.base_dir, 'certs')

        tls_dir = machine_dir
        if not os.path.isdir(machine_dir):
            tls_dir = default_dir

        client_cert = os.path.join(tls_dir, self.client_cert_name)
        client_key = os.path.join(tls_dir, self.client_key_name)
        server_cert = os.path.join(tls_dir, self.server_cert_name)

        missing = []
        for fname in (client_cert, client_key, server_cert):
            if not os.path.isfile(fname):
                missing.append(fname)

        if missing:
            raise Exception(
                "Could not found TLS info for "
                "{host} in {dirs}. Missing files: {missing!r}".format(
                    host=hostname,
                    dirs=list(set((self.base_dir, default_dir))),
                    missing=missing
                )
            )

        return self.to_docker_tls(client_cert, client_key, server_cert)


class EnvTLSProvider(TLSProvider):
    def __init__(self, *a, **k):
        self.tls_enabled = os.environ.get('DOCKER_TLS_VERIFY', None) is not None
        self.cert_path = None
        if self.tls_enabled:
            self.cert_path = os.environ.get('DOCKER_CERT_PATH', None)

        TLSProvider.__init__(self, *a, **k)

    def get_config(self, url):
        if self.cert_path is None:
            return None

        paths = [
            os.path.join(self.cert_path, fname)
            for fname in ('cert.pem', 'key.pem', 'ca.pem')
        ]

        return self.to_docker_tls(*paths)