Repository URL to install this package:
|
Version:
0.3.30 ▾
|
import sys
import os
import requests
from ConfigParser import SafeConfigParser, NoOptionError
from . import git_utils
from . import hosts
from . import utils
from . import tls_config
from . import aws_utils
CONFIG_PARTS = dict(
org='docker',
repo='saas-config',
path='hubboss/hubboss-config.ini',
ref='master'
)
BOSS_CONFIG_URL = 'https://api.github.com/repos/{org}/{repo}/contents/{path}?ref={ref}'.format(**CONFIG_PARTS)
class GithubTokenAuth(requests.auth.AuthBase):
def __init__(self, token):
super(GithubTokenAuth, self).__init__()
self.token = token
def __call__(self, r):
r.headers['Authorization'] = 'token {}'.format(self.token)
return r
class BossConfigException(Exception):
pass
class BossConfig(SafeConfigParser, object):
sentinel = object()
@classmethod
def default_path(cls):
return utils.normpath(os.environ.get('HUBBOSS_CONFIG', '$HOME/.hubboss/config.ini'))
def __init__(self, path=None):
SafeConfigParser.__init__(self)
object.__init__(self)
if path is None:
path = type(self).default_path()
self.path = utils.normpath(path)
if not os.path.isfile(path):
self.update()
self.read(path)
def get(self, section, option, raw=False, vars=None):
try:
return SafeConfigParser.get(self, section, option, raw, vars)
except NoOptionError:
raise KeyError('{}.{}'.format(section, option))
def default_get(self, section, option, default=sentinel):
try:
return self.get(section, option)
except KeyError:
if default is self.sentinel:
raise
else:
return default
@property
def home_dir(self):
return os.path.dirname(self.path)
def update(self, replace=False):
if replace:
# download and replace the config file
print("Replacing existing config ({})...".format(self.path))
else:
# check if file exists, if it does, raise an error and pass
# if it does not download it.
print("Checking for existing config...")
if os.path.isfile(self.path):
print("Existing config file ({}) found. Not updating".format(self.path))
print("If you want to replace this file, use the `--replace` flag")
return ''
token = os.environ.get('GITHUB_TOKEN', None)
if token is None:
print('Please set GITHUB_TOKEN for access to {url}'.format(url=BOSS_CONFIG_URL))
return ''
config_json = requests.get(BOSS_CONFIG_URL, auth=GithubTokenAuth(token))
r = requests.get(config_json.json()['download_url'])
if not os.path.exists(self.home_dir):
os.makedirs(self.home_dir)
with open(self.path, 'w') as f:
f.write(r.content)
return ''
def keys(self, group):
return iter(x[0] for x in self.items(group))
def values(self, group):
return iter(x[1] for x in self.items(group))
@property
def host_groups(self):
return self.keys('host_groups')
@property
def hosts(self):
return dict(
(host_group, list(self.get_hosts_for_group(host_group)))
for host_group in self.host_groups
)
def get_host(self, name):
name, base_url, ip = self.get_urls_and_ips(short_names=[name]).next()
return hosts.Host(name=name, base_url=base_url, tls=self.get_tls(base_url), ip=ip)
def get_hosts(self, group_name, short_names=None):
url_ip_pairs = list(self.get_urls_and_ips(short_names=short_names))
for short_name, url, ip in url_ip_pairs:
yield hosts.Host(name=short_name, base_url=url, tls=self.get_tls(url), environment=group_name, ip=ip)
def get_urls_and_ips(self, short_names=None):
known_hosts = self.items('hosts')
for short_name, locations in known_hosts:
if not short_names:
yield [short_name] + locations.split(",")
elif short_name in short_names:
yield [short_name] + locations.split(",")
def get_hosts_for_group(self, group_name):
try:
short_names = self.get('host_groups', group_name)
except NoOptionError:
raise NoOptionError('No group named {} in `host_groups`'.format(group_name))
if short_names == 'DYNAMIC':
return self.get_dynamic_hosts(group_name)
else:
short_names = short_names.split(',')
return self.get_hosts(group_name, short_names)
def get_dynamic_hosts(self, group_name):
for host in aws_utils.get_aws_hosts(self, group_name):
yield host
def get_tls(self, url):
return tls_config.TLSProvider.get_tls_config(self, url)
def update_config(replace=False):
config.update(replace=replace)
config = BossConfig()
def get_build_dir():
build_dir = config.get('build', 'build_dir')
if not os.path.exists(build_dir):
os.makedirs(build_dir)
return build_dir
def _get_repo(project):
return config.get(project, 'git_url')
def get_hub_url():
return _get_repo('hub')
def get_reghub_url():
return _get_repo('reghub')
def get_host_groups():
return config.host_groups
def get_hosts_for_group(group_name):
return config.get_hosts_for_group(group_name)
def checkout_tag(image):
repo_name, tag = image.split(':')
git_url = config.get(repo_name, 'git_url')
repo = git_utils.get_working_repo(git_url, config.home_dir)
tag = git_utils.get_tag(repo, "{}".format(tag))
# set head reference to tag commit
repo.head.reference = tag.commit
assert repo.head.is_detached
repo.head.reset(index=True, working_tree=True)
def get_container_config_dir(project):
try:
config_dir = config.get(project, 'container_config_dir')
except:
return "container_configs"
return config_dir
def get_environment_details(project, env):
repo_dir = git_utils.get_repo_dir(
config.get(project, 'git_url'),
config.home_dir
)
return os.path.join(
repo_dir,
get_container_config_dir(project),
"{}.yml".format(env)
)
def get_application_details(project, env):
repo_dir = git_utils.get_repo_dir(
config.get(project, 'git_url'),
config.home_dir
)
return os.path.join(
repo_dir,
get_container_config_dir(project),
"app-{}.yml".format(env)
)
def get_git_url(repository):
return config.get(repository, 'git_url')
def get_service_name(project):
try:
service_name = config.get(project, 'service_name')
except:
raise Exception('Could not find service_name in config. Update config?')
return service_name