Repository URL to install this package:
|
Version:
0.3.30 ▾
|
import docker
from docker.errors import APIError
from . import utils
class Container(utils.NiceRepr):
show_restart = None # bool
config = None # docker.tls.TLSConfig
host = None # boss.hosts.Host
container_id = None # str
created = None # int
command = None # str
labels = None # dict
status = None # str
image = None # str
ports = None # list
names = None # list
def __init__(self, host, config, show_restart, **kwargs):
# Rename 'id' field so as not to conflict with built-in function
self.container_id = kwargs.pop('id', None)
self.host = host
self.config = config
self.show_restart = show_restart
self.__dict__.update(kwargs)
self.client = self.create_client(base_url=self.host, tls=self.config)
def create_client(self, base_url, tls):
return docker.Client(base_url=base_url, tls=tls)
@property
def display_ports(self):
ports = []
for port_dict in self.ports:
ports.append(
"{0}{1}{2}{3}".format(
"{0}->".format(port_dict.get('IP', '')),
"{0}:".format(port_dict.get('PublicPort', '')),
"{0}".format(port_dict.get('PrivatePort', '')),
"/{0}".format(port_dict.get('Type', '')),
)
)
return ", ".join(ports)
@property
def display_names(self):
return self.names[0][1:]
@property
def display_image(self):
return self.image
@property
def display_restart_policy(self):
if self.show_restart:
container_details = self.client.inspect_container(self.container_id)
try:
restart_policy = container_details.get('HostConfig').get('RestartPolicy').get('Name')
except AttributeError:
return ''
return restart_policy
else:
return "N/A"
@property
def display_status(self):
return self.status
def __str__(self):
return u"{0: <13} {1: <8} {2: <38} {3: <60} {4: <20} {5: <30}".format(
self.container_id[:12],
self.display_restart_policy,
self.display_image,
self.display_names,
self.display_status,
self.display_ports,
)
def get_logs(self, stream=True, stdout=True, stderr=True, timestamps=False):
return self.client.logs(container=self.container_id, stream=stream, stdout=stdout, stderr=stderr, timestamps=timestamps)
def pause(self):
return self.client.pause(self.container_id)
def unpause(self):
return self.client.unpause(self.container_id)
def restart(self):
return self.client.restart(self.container_id)
def stop(self):
return self.client.stop(self.container_id)
def top(self):
return self.client.top(self.container_id)
def wait(self):
return self.client.wait(self.container_id)
def start(self):
try:
return self.client.start(self.container_id)
except APIError as e:
print e
return e
def log(self, stdout=True, stderr=True, stream=True, timestamps=False):
return self.client.logs(self.container_id, stdout=stdout, stderr=stderr, stream=stream, timestamps=timestamps)