Repository URL to install this package:
|
Version:
0.3.30 ▾
|
import sys
import urlparse
import os
import os.path
import ssl
import docker
import warnings
from . import utils
class TLSProvider(object):
config_section = 'tls'
fields = ()
def __init__(self, config):
self.config = config
for field in self.fields:
setattr(self, field, config.get(self.config_section, field))
missing = [x for x in self.fields if getattr(self, x, None) is None]
if any(missing):
raise Exception(
"The following fields are missing from"
"the {section!r} config section: {fields}".format(
section=self.config_section,
fields=', '.join(missing)
)
)
def get_config(self, url):
raise NotImplementedError
@staticmethod
def to_docker_tls(client_cert, client_key, server_cert):
return docker.tls.TLSConfig(
client_cert=(client_cert, client_key),
# ca_cert=server_cert,
verify=False,
ssl_version=ssl.PROTOCOL_TLSv1
)
@classmethod
def get_tls_config(cls, config, url):
exc_info = None
provider = None
providers = (MachineTLSProvider, BasicTLSProvider)
if config.default_get('tls', 'env', None):
providers += (EnvTLSProvider,)
for kls in providers:
try:
provider = kls(config)
except Exception:
exc_info = sys.exc_info()
continue
else:
break
if provider is None and exc_info is not None:
raise exc_info[0], exc_info[1], exc_info[2]
if isinstance(provider, EnvTLSProvider):
warnings.warn("using TLS settings from environment")
return provider.get_config(url)
class BasicTLSProvider(TLSProvider):
fields = (
'server_cert',
'client_cert',
'client_key'
)
def __init__(self, config):
super(BasicTLSProvider, self).__init__(config)
self.client_cert = utils.normpath(self.client_cert.format('$HOME'))
self.client_key = utils.normpath(self.client_key.format('$HOME'))
self.server_cert = utils.normpath(self.server_cert.format('$HOME'))
def get_config(self, url):
return self.to_docker_tls(
self.client_cert,
self.client_key,
self.server_cert
)
class MachineTLSProvider(TLSProvider):
fields = (
'base_dir',
'server_cert_name',
'client_cert_name',
'client_key_name',
)
def __init__(self, config):
self.base_dir = '{}/.docker/machine'
self.client_cert_name = None
self.client_key_name = None
self.server_cert_name = None
super(MachineTLSProvider, self).__init__(config)
self.base_dir = utils.normpath(self.base_dir.format('$HOME'))
def get_config(self, url):
hostname = urlparse.urlparse(url).hostname
machine_dir = os.path.join(self.base_dir, 'machines', hostname)
default_dir = os.path.join(self.base_dir, 'certs')
tls_dir = machine_dir
if not os.path.isdir(machine_dir):
tls_dir = default_dir
client_cert = os.path.join(tls_dir, self.client_cert_name)
client_key = os.path.join(tls_dir, self.client_key_name)
server_cert = os.path.join(tls_dir, self.server_cert_name)
missing = []
for fname in (client_cert, client_key, server_cert):
if not os.path.isfile(fname):
missing.append(fname)
if missing:
raise Exception(
"Could not found TLS info for "
"{host} in {dirs}. Missing files: {missing!r}".format(
host=hostname,
dirs=list(set((self.base_dir, default_dir))),
missing=missing
)
)
return self.to_docker_tls(client_cert, client_key, server_cert)
class EnvTLSProvider(TLSProvider):
def __init__(self, *a, **k):
self.tls_enabled = os.environ.get('DOCKER_TLS_VERIFY', None) is not None
self.cert_path = None
if self.tls_enabled:
self.cert_path = os.environ.get('DOCKER_CERT_PATH', None)
TLSProvider.__init__(self, *a, **k)
def get_config(self, url):
if self.cert_path is None:
return None
paths = [
os.path.join(self.cert_path, fname)
for fname in ('cert.pem', 'key.pem', 'ca.pem')
]
return self.to_docker_tls(*paths)