Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
workloadmgr / usr / lib / python3 / dist-packages / workloadmgr / triliopssh.py
Size: Mime:
#!/usr/bin/env python

# This file is part of parallel-ssh.

# Copyright (C) 2015 Panos Kittenis

# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation, version 2.1.

# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301  USA

"""Asynchronous parallel SSH library

parallel-ssh uses asychronous network requests - there is *no* multi-threading or multi-processing used.

This is a *requirement* for commands on many (hundreds/thousands/hundreds of thousands) of hosts which would grind a system to a halt simply by having so many processes/threads all wanting to execute if done with multi-threading/processing.

The `libev event loop library <http://software.schmorp.de/pkg/libev.html>`_ is utilised on nix systems. Windows is not supported.

See :mod:`pssh.ParallelSSHClient` and :mod:`pssh.SSHClient` for class documentation.
"""

from gevent import monkey
monkey.patch_all()
import gevent.pool
import warnings
from socket import gaierror as sock_gaierror, error as sock_error
import logging
import paramiko
import os

host_logger = logging.getLogger('pssh.host_logger')
handler = logging.StreamHandler()
host_log_format = logging.Formatter('%(message)s')
handler.setFormatter(host_log_format)
host_logger.addHandler(handler)
host_logger.setLevel(logging.INFO)
DEFAULT_RETRIES = 3

logger = logging.getLogger(__name__)


class UnknownHostException(Exception):
    """Raised when a host is unknown (dns failure)"""
    pass


class ConnectionErrorException(Exception):
    """Raised on error connecting (connection refused/timed out)"""
    pass


class AuthenticationException(Exception):
    """Raised on authentication error (user/password/ssh key error)"""
    pass


class SSHException(Exception):
    """Raised on SSHException error - error authenticating with SSH server"""
    pass


class SSHClient(object):
    """Wrapper class over paramiko.SSHClient with sane defaults
    Honours ~/.ssh/config and /etc/ssh/ssh_config entries for host username \
    overrides"""

    def __init__(self, host,
                 user=None, password=None, port=None,
                 pkey=None, forward_ssh_agent=True,
                 num_retries=DEFAULT_RETRIES, _agent=None, timeout=None,
                 proxy_host=None, proxy_port=22):
        """Connect to host honouring any user set configuration in ~/.ssh/config \
        or /etc/ssh/ssh_config

        :param host: Hostname to connect to
        :type host: str
        :param user: (Optional) User to login as. Defaults to logged in user or \
        user from ~/.ssh/config if set
        :type user: str
        :param password: (Optional) Password to use for login. Defaults to\
        no password
        :type password: str
        :param port: (Optional) Port number to use for SSH connection. Defaults\
        to None which uses SSH default
        :type port: int
        :param pkey: (Optional) Client's private key to be used to connect with
        :type pkey: :mod:`paramiko.PKey`
        :param num_retries: (Optional) Number of retries for connection attempts\
        before the client gives up. Defaults to 3.
        :type num_retries: int
        :param timeout: (Optional) Number of seconds to timout connection attempts\
        before the client gives up. Defaults to 10.
        :type timeout: int
        :param forward_ssh_agent: (Optional) Turn on SSH agent forwarding - \
        equivalent to `ssh -A` from the `ssh` command line utility. \
        Defaults to True if not set.
        :type forward_ssh_agent: bool
        :param _agent: (Optional) Override SSH agent object with the provided. \
        This allows for overriding of the default paramiko behaviour of \
        connecting to local SSH agent to lookup keys with our own SSH agent. \
        Only really useful for testing, hence the internal variable prefix.
        :type _agent: :mod:`paramiko.agent.Agent`
        :param proxy_host: (Optional) SSH host to tunnel connection through \
        so that SSH clients connects to self.host via client -> proxy_host -> host
        :type proxy_host: str
        :param proxy_port: (Optional) SSH port to use to login to proxy host if \
        set. Defaults to 22.
        :type proxy_port: int
        """
        ssh_config = paramiko.SSHConfig()
        _ssh_config_file = os.path.sep.join([os.path.expanduser('~'),
                                             '.ssh',
                                             'config'])
        # Load ~/.ssh/config if it exists to pick up username
        # and host address if set
        if os.path.isfile(_ssh_config_file):
            ssh_config.parse(open(_ssh_config_file))
        host_config = ssh_config.lookup(host)
        resolved_address = (host_config['hostname'] if
                            'hostname' in host_config
                            else host)
        _user = host_config['user'] if 'user' in host_config else None
        user = user if user else _user
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.MissingHostKeyPolicy())
        self.forward_ssh_agent = forward_ssh_agent
        self.client = client
        self.user = user
        self.password = password
        self.pkey = pkey
        self.port = port if port else 22
        self.host = resolved_address
        if _agent:
            self.client._agent = _agent
        self.num_retries = num_retries
        self.timeout = timeout
        self.proxy_host, self.proxy_port = proxy_host, proxy_port
        self.proxy_client = None
        if self.proxy_host and self.proxy_port:
            logger.debug(
                "Proxy configured for destination host %s - Proxy host: %s:%s",
                self.host,
                self.proxy_host,
                self.proxy_port,
            )
            self._connect_tunnel()
        else:
            self._connect(self.client, self.host, self.port)

    def _connect_tunnel(self):
        """Connects to SSH server via an intermediate SSH tunnel server.
        client (me) -> tunnel (ssh server to proxy through) -> \
        destination (ssh server to run command)
        :rtype: `:mod:paramiko.SSHClient` Client to remote SSH destination
        via intermediate SSH tunnel server."""
        self.proxy_client = paramiko.SSHClient()
        self.proxy_client.set_missing_host_key_policy(
            paramiko.MissingHostKeyPolicy())
        self._connect(self.proxy_client, self.proxy_host, self.proxy_port)
        logger.info("Connecting via SSH proxy %s:%s -> %s:%s", self.proxy_host,
                    self.proxy_port, self.host, self.port,)
        proxy_channel = self.proxy_client.get_transport().\
            open_channel('direct-tcpip', (self.host, self.port,),
                         ('127.0.0.1', 0))
        return self._connect(self.client, self.host,
                             self.port, sock=proxy_channel)

    def _connect(self, client, host, port, sock=None, retries=1):
        """Connect to host

        :raises: :mod:`pssh.AuthenticationException` on authentication error
        :raises: :mod:`pssh.UnknownHostException` on DNS resolution error
        :raises: :mod:`pssh.ConnectionErrorException` on error connecting
        :raises: :mod:`pssh.SSHException` on other undefined SSH errors
        """
        try:
            client.connect(host, username=self.user,
                           password=self.password, port=port,
                           pkey=self.pkey,
                           sock=sock, timeout=self.timeout)
        except sock_gaierror as ex:
            logger.error("Could not resolve host '%s' - retry %s/%s" %
                         (host, retries, self.num_retries))
            while retries < self.num_retries:
                gevent.sleep(5)
                return self._connect(client, host, port, sock=sock,
                                     retries=retries + 1)
            """
            raise UnknownHostException("%s - %s - retry %s/%s" %
                                       (str(ex.args[1]), host, retries, self.num_retries))
            """
            logger.info("%s - %s - retry %s/%s" %
                        (str(ex.args[1]), host, retries, self.num_retries))
            raise ConnectionErrorException("Unable to establish connection")
        except sock_error as ex:
            logger.error("Error connecting to host '%s:%s' - retry %s/%s" %
                         (host, port, retries, self.num_retries))
            while retries < self.num_retries:
                gevent.sleep(5)
                return self._connect(client, host, port, sock=sock,
                                     retries=retries + 1)
            error_type = ex.args[1] if len(ex.args) > 1 else ex.args[0]

            """
            raise ConnectionErrorException("%s for host '%s:%s' - retry %s/%s" %
                                           (str(error_type), host, port,
                                           retries, self.num_retries))
            """
            logger.info("%s for host '%s:%s' - retry %s/%s" %
                        (str(error_type), host, port,
                         retries, self.num_retries))
            raise ConnectionErrorException("Unable to establish connection")
        except paramiko.AuthenticationException as ex:
            raise AuthenticationException(ex)
        # SSHException is more general so should be below other types
        # of SSH failure
        except paramiko.SSHException as ex:
            logger.error("General SSH error - %s", ex)
            raise SSHException(ex)

    def exec_command(self, command, sudo=False, user=None, **kwargs):
        """Wrapper to :mod:`paramiko.SSHClient.exec_command`

        Opens a new SSH session with a new pty and runs command with given \
        `kwargs` if any. Greenlet then yields (sleeps) while waiting for \
        command to finish executing or channel to close indicating the same.

        :param command: Shell command to execute
        :type command: str
        :param sudo: (Optional) Run with sudo. Defaults to False
        :type sudo: bool
        :param kwargs: (Optional) Keyword arguments to be passed to remote \
        command
        :type kwargs: dict
        :rtype: Tuple of `(channel, hostname, stdout, stderr)`. \
        Channel is the remote SSH channel, needed to ensure all of stdout has \
        been got, hostname is remote hostname the copy is to, stdout and \
        stderr are buffers containing command output.
        """
        channel = self.client.get_transport().open_session()
        if self.forward_ssh_agent:
            agent_handler = paramiko.agent.AgentRequestHandler(channel)
        channel.get_pty()
        if self.timeout:
            channel.settimeout(self.timeout)
        _stdout, _stderr = channel.makefile('rb'), \
            channel.makefile_stderr('rb')
        stdout, stderr = self._read_output_buffer(_stdout,), \
            self._read_output_buffer(_stderr,
                                     prefix='\t[err]')
        if sudo and not user:
            command = 'sudo -S bash -c "%s"' % command.replace('"', '\\"')
        elif user:
            command = 'sudo -u %s -S bash -c "%s"' % (
                user, command.replace('"', '\\"'),)
        else:
            command = 'bash -c "%s"' % command.replace('"', '\\"')
        stdin = channel.makefile('wb', 8192)
        logger.debug("Running command %s on %s", command, self.host)
        channel.exec_command(command, **kwargs)
        gevent.sleep(.2)
        if sudo or user:
            if stdin.channel.closed is False:  # If stdin is still open then sudo is asking us for a password
                logger.debug("Writing credetials for sudo")
                stdin.write('%s\n' % self.password)
                stdin.flush()
        logger.debug("Command started")
        while not (channel.recv_ready() or channel.closed):
            gevent.sleep(.2)
        return channel, self.host, stdout, stderr

    def _read_output_buffer(self, output_buffer, prefix=''):
        """Read from output buffers and log to host_logger"""
        for line in output_buffer:
            output = line.strip()
            msg = output.replace(self.password, '******')
            host_logger.info("[%s]%s\t%s", self.host, prefix, msg,)
            yield output

    def _make_sftp(self):
        """Make SFTP client from open transport"""
        transport = self.client.get_transport()
        transport.open_session()
        return paramiko.SFTPClient.from_transport(transport)

    def mkdir(self, sftp, directory):
        """Make directory via SFTP channel

        :param sftp: SFTP client object
        :type sftp: :mod:`paramiko.SFTPClient`
        :param directory: Remote directory to create
        :type directory: str

        Catches and logs at error level remote IOErrors on creating directory."""
        try:
            sftp.mkdir(directory)
        except IOError as error:
            logger.error("Error occured creating directory on %s - %s",
                         self.host, error)

    def copy_file(self, local_file, remote_file):
        """Copy local file to host via SFTP/SCP

        Copy is done natively using SFTP/SCP version 2 protocol, no scp command \
        is used or required.

        :param local_file: Local filepath to copy to remote host
        :type local_file: str
        :param remote_file: Remote filepath on remote host to copy file to
        :type remote_file: str
        """
        sftp = self._make_sftp()
        destination = remote_file.split(os.path.sep)
        remote_file = os.path.sep.join(destination)
        destination = destination[:-1]
        for directory in destination:
            try:
                sftp.stat(directory)
            except IOError:
                self.mkdir(sftp, directory)
        try:
            sftp.put(local_file, remote_file)
        except Exception as error:
            logger.error("Error occured copying file to host %s - %s",
                         self.host, error)
        else:
            logger.info("Copied local file %s to remote destination %s:%s",
                        local_file, self.host, remote_file)


class ParallelSSHClient(object):
    """Uses :mod:`pssh.SSHClient`, performs tasks over SSH on multiple hosts in \
    parallel.

    Connections to hosts are established in parallel when ``run_command`` is called,
    therefor any connection and/or authentication exceptions will happen on the
    call to ``run_command`` and need to be caught."""

    def __init__(
            self,
            hosts,
            user=None,
            password=None,
            port=None,
            pkey=None,
            forward_ssh_agent=True,
            num_retries=DEFAULT_RETRIES,
            timeout=None,
            pool_size=10,
            proxy_host=None,
            proxy_port=22):
        """
        :param hosts: Hosts to connect to
        :type hosts: list(str)
        :param user: (Optional) User to login as. Defaults to logged in user or \
        user from ~/.ssh/config or /etc/ssh/ssh_config if set
        :type user: str
        :param password: (Optional) Password to use for login. Defaults to \
        no password
        :type password: str
        :param port: (Optional) Port number to use for SSH connection. Defaults \
        to None which uses SSH default
        :type port: int
        :param pkey: (Optional) Client's private key to be used to connect with
        :type pkey: :mod:`paramiko.PKey`
        :param num_retries: (Optional) Number of retries for connection attempts \
        before the client gives up. Defaults to 3.
        :type num_retries: int
        :param timeout: (Optional) Number of seconds to timout connection \
        attempts before the client gives up. Defaults to 10.
        :type timeout: int
        :param forward_ssh_agent: (Optional) Turn on SSH agent forwarding - \
        equivalent to `ssh -A` from the `ssh` command line utility. \
        Defaults to True if not set.
        :type forward_ssh_agent: bool
        :param pool_size: (Optional) Greenlet pool size. Controls on how many\
        hosts to execute tasks in parallel. Defaults to number of hosts or 10, \
        whichever is lower. Pool size will be *equal to* number of hosts if number\
        of hosts is lower than the pool size specified as that would only \
        increase overhead with no benefits.
        :type pool_size: int
        :param proxy_host: (Optional) SSH host to tunnel connection through \
        so that SSH clients connect to self.host via client -> proxy_host -> \
        host
        :type proxy_host: str
        :param proxy_port: (Optional) SSH port to use to login to proxy host if \
        set. Defaults to 22.
        :type proxy_port: int

        **Example**

        >>> from pssh import ParallelSSHClient, AuthenticationException, \
UnknownHostException, ConnectionErrorException
        >>> client = ParallelSSHClient(['myhost1', 'myhost2'])
        >>> try:
        >>> ... output = client.run_command('ls -ltrh /tmp/aasdfasdf', sudo=True)
        >>> except (AuthenticationException, UnknownHostException, ConnectionErrorException):
        >>> ... return
        >>> # Commands have started executing at this point
        >>> # Exit code will probably not be available immediately
        >>> print output
        >>> {'myhost1': {'exit_code': None,
                         'stdout' : <generator>,
                         'stderr' : <generator>,
                         'cmd' : <greenlet>,
                         },
             'myhost2': {'exit_code': None,
                         'stdout' : <generator>,
                         'stderr' : <generator>,
                         'cmd' : <greenlet>,
            }}
        >>> # Print output as it comes in.
        >>> for host in output: for line in output[host]['stdout']: print line
        [myhost1]     ls: cannot access /tmp/aasdfasdf: No such file or directory
        [myhost2]     ls: cannot access /tmp/aasdfasdf: No such file or directory
        >>> # Retrieve exit code after commands have finished
        >>> # `get_exit_code` will return `None` if command has not finished
        >>> print client.get_exit_code(output[host])
        0

        **Example with specified private key**

        >>> import paramiko
        >>> client_key = paramiko.RSAKey.from_private_key_file('user.key')
        >>> client = ParallelSSHClient(['myhost1', 'myhost2'], pkey=client_key)

        .. note ::

          **Connection persistence**

          Connections to hosts will remain established for the duration of the
          object's life. To close them, just `del` or reuse the object reference.

          >>> client = ParallelSSHClient(['localhost'])
          >>> output = client.run_command('ls -ltrh /tmp/aasdfasdf')
          >>> client.pool.join()

          :netstat: ``tcp        0      0 127.0.0.1:53054         127.0.0.1:22            ESTABLISHED``

          Connection remains active after commands have finished executing. Any \
          additional commands will use the same connection.

          >>> del client

          Connection is terminated.
        """
        self.pool_size = len(hosts) if len(hosts) < pool_size else pool_size
        self.pool = gevent.pool.Pool(size=self.pool_size)
        self.hosts = hosts
        self.user = user
        self.password = password
        self.forward_ssh_agent = forward_ssh_agent
        self.port = port
        self.pkey = pkey
        self.num_retries = num_retries
        self.timeout = timeout
        self.proxy_host, self.proxy_port = proxy_host, proxy_port
        # To hold host clients
        self.host_clients = dict((host, None) for host in hosts)

    def run_command(self, *args, **kwargs):
        """Run command on all hosts in parallel, honoring self.pool_size,
        and return output buffers.

        This function will block until all commands have **started** and
        then return immediately. Any connection and/or authentication exceptions
        will be raised here and need catching.

        :param args: Positional arguments for command
        :type args: tuple
        :param kwargs: Keyword arguments for command
        :type kwargs: dict

        :rtype: Dictionary with host as key as per :mod:`ParallelSSH.get_output`

        :raises: :mod:`pssh.AuthenticationException` on authentication error
        :raises: :mod:`pssh.UnknownHostException` on DNS resolution error
        :raises: :mod:`pssh.ConnectionErrorException` on error connecting
        :raises: :mod:`pssh.SSHException` on other undefined SSH errors

        **Example Usage**

        >>> output = client.run_command('ls -ltrh')

        print stdout for each command:

        >>> for host in output:
        >>>     for line in output[host]['stdout']: print line

        Get exit code after command has finished:

        >>> for host in output:
        >>>     for line in output[host]['stdout']: print line
        >>>     exit_code = client.get_exit_code(output[host])

        Wait for completion, no stdout:

        >>> client.pool.join()

        Capture stdout - **WARNING** - this will store the entirety of stdout
        into memory and may exhaust available memory if command output is
        large enough:

        >>> for host in output:
        >>>     stdout = list(output[host]['stdout'])
        >>>     print "Complete stdout for host %s is %s" % (host, stdout,)

        **Example Output**

        ::

          {'myhost1': {'exit_code': exit code if ready else None,
                       'channel' : SSH channel of command,
                       'stdout'  : <iterable>,
                       'stderr'  : <iterable>,
                       'cmd'     : <greenlet>}}

        """
        for host in self.hosts:
            self.pool.spawn(self._exec_command, host, *args, **kwargs)
        return self.get_output()

    def exec_command(self, *args, **kwargs):
        """Run command on all hosts in parallel, honoring `self.pool_size`

        **Deprecated by** :mod:`pssh.ParallelSSHClient.run_command`

        :param args: Position arguments for command
        :type args: tuple
        :param kwargs: Keyword arguments for command
        :type kwargs: dict

        :rtype: List of :mod:`gevent.Greenlet`"""
        warnings.warn("This method is being deprecated and will be removed in \
future releases - use self.run_command instead", DeprecationWarning)
        return [self.pool.spawn(self._exec_command, host, *args, **kwargs)
                for host in self.hosts]

    def _exec_command(self, host, *args, **kwargs):
        """Make SSHClient, run command on host"""
        if not self.host_clients[host]:
            self.host_clients[host] = SSHClient(
                host,
                user=self.user,
                password=self.password,
                port=self.port,
                pkey=self.pkey,
                forward_ssh_agent=self.forward_ssh_agent,
                num_retries=self.num_retries,
                timeout=self.timeout,
                proxy_host=self.proxy_host,
                proxy_port=self.proxy_port)
        return self.host_clients[host].exec_command(*args, **kwargs)

    def get_output(self, commands=None):
        """Get output from running commands.

        :param commands: (Optional) Override commands to get output from.\
        Uses running commands in pool if not given.
        :type commands: :mod:`gevent.Greenlet`
        :rtype: Dictionary with host as key as in:

        ::

          {'myhost1': {'exit_code': exit code if ready else None,
                       'channel' : SSH channel of command,
                       'stdout'  : <iterable>,
                       'stderr'  : <iterable>,
                       'cmd'     : <greenlet>}}

        Stdout and stderr are also logged via the logger named ``host_logger``
        which is enabled by default.
        ``host_logger`` output can be disabled by removing its handler.

        >>> logger = logging.getLogger('pssh.host_logger')
        >>> for handler in logger.handlers: logger.removeHandler(handler)

        **Example usage**:

        >>> output = client.get_output()
        >>> for host in output: print output[host]['stdout']
        <stdout>
        >>> # Get exit code after command has finished
        >>> self.get_exit_code(output[host])
        0
        """
        if not commands:
            commands = list(self.pool.greenlets)
        output = {}
        for cmd in commands:
            (channel, host, stdout, stderr) = cmd.get(timeout=self.timeout)
            output.setdefault(host, {})
            output[host].update({'exit_code': self._get_exit_code(channel),
                                 'channel': channel,
                                 'stdout': stdout,
                                 'stderr': stderr,
                                 'cmd': cmd, })
        return output

    def get_exit_code(self, host_output):
        """Get exit code from host output if available
        :param host_output: Per host output as returned by `self.get_output`
        :rtype: int or None if exit code not ready"""
        if 'channel' not in host_output:
            logger.error("%s does not look like host output..", host_output,)
            return
        channel = host_output['channel']
        return self._get_exit_code(channel)

    def _get_exit_code(self, channel):
        """Get exit code from channel if ready"""
        if not channel.exit_status_ready():
            return
        channel.close()
        return channel.recv_exit_status()

    def get_stdout(self, greenlet, return_buffers=False):
        """Get/print stdout from greenlet and return exit code for host

        **Deprecated** - use self.get_output() instead.

        :param greenlet: Greenlet object containing an \
        SSH channel reference, hostname, stdout and stderr buffers
        :type greenlet: :mod:`gevent.Greenlet`
        :param return_buffers: Flag to turn on returning stdout and stderr \
        buffers along with exit code. Defaults to off.
        :type return_buffers: bool
        :rtype: Dictionary containing ``{host: {'exit_code': exit code}}`` entry \
        for example ``{'myhost1': {'exit_code': 0}}``
        :rtype: With ``return_buffers=True``: ``{'myhost1': {'exit_code': 0,
                                                             'channel' : None or SSH channel of command if command is still executing,
                                                             'stdout' : <iterable>,
                                                             'stderr' : <iterable>,}}``
        """
        warnings.warn("This method is being deprecated and will be removed in \
future releases - use self.get_output instead", DeprecationWarning)
        gevent.sleep(.2)
        channel, host, stdout, stderr = greenlet.get()
        if channel.exit_status_ready():
            channel.close()
        else:
            logger.debug(
                "Command still executing on get_stdout call - not closing channel and returning None as exit code.")
            # If channel is not closed we cannot get full stdout/stderr so must
            # return buffers
            return_buffers = True
        # Channel must be closed or reading stdout/stderr will block forever
        if not return_buffers and channel.closed:
            for _ in stdout:
                pass
            for _ in stderr:
                pass
            return {host: {'exit_code': channel.recv_exit_status(), }}
        gevent.sleep(.2)
        return {
            host: {
                'exit_code': channel.recv_exit_status() if channel.exit_status_ready() else None,
                'channel': channel if not channel.closed else None,
                'stdout': stdout,
                'stderr': stderr,
            }}

    def copy_file(self, local_file, remote_file):
        """Copy local file to remote file in parallel

        :param local_file: Local filepath to copy to remote host
        :type local_file: str
        :param remote_file: Remote filepath on remote host to copy file to
        :type remote_file: str

        .. note ::
          Remote directories in `remote_file` that do not exist will be
          created as long as permissions allow.

        .. note ::
          Path separation is handled client side so it is possible to copy
          to/from hosts with differing path separators, like from/to Linux
          and Windows.

        :rtype: List(:mod:`gevent.Greenlet`) of greenlets for remote copy \
        commands
        """
        return [self.pool.spawn(self._copy_file, host, local_file, remote_file)
                for host in self.hosts]

    def _copy_file(self, host, local_file, remote_file):
        """Make sftp client, copy file"""
        if not self.host_clients[host]:
            self.host_clients[host] = SSHClient(
                host,
                user=self.user,
                password=self.password,
                port=self.port,
                pkey=self.pkey,
                forward_ssh_agent=self.forward_ssh_agent)
        return self.host_clients[host].copy_file(local_file, remote_file)