Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
from __future__ import absolute_import, unicode_literals

import os
import pipes
import signal
import subprocess
import sys
import time
from contextlib import contextmanager
from threading import Thread

import py

from tox import reporter
from tox.constants import INFO
from tox.exception import InvocationError
from tox.reporter import Verbosity
from tox.util.lock import get_unique_file
from tox.util.stdlib import is_main_thread


class Action(object):
    """Action is an effort to group operations with the same goal (within reporting)"""

    def __init__(
        self,
        name,
        msg,
        args,
        log_dir,
        generate_tox_log,
        command_log,
        popen,
        python,
        suicide_timeout,
        interrupt_timeout,
        terminate_timeout,
    ):
        self.name = name
        self.args = args
        self.msg = msg
        self.activity = self.msg.split(" ", 1)[0]
        self.log_dir = log_dir
        self.generate_tox_log = generate_tox_log
        self.via_popen = popen
        self.command_log = command_log
        self._timed_report = None
        self.python = python
        self.suicide_timeout = suicide_timeout
        self.interrupt_timeout = interrupt_timeout
        self.terminate_timeout = terminate_timeout
        if is_main_thread():
            # python allows only main thread to install signal handlers
            # see https://docs.python.org/3/library/signal.html#signals-and-threads
            self._install_sigterm_handler()

    def __enter__(self):
        msg = "{} {}".format(self.msg, " ".join(map(str, self.args)))
        self._timed_report = reporter.timed_operation(self.name, msg)
        self._timed_report.__enter__()

        return self

    def __exit__(self, type, value, traceback):
        self._timed_report.__exit__(type, value, traceback)

    def setactivity(self, name, msg):
        self.activity = name
        if msg:
            reporter.verbosity0("{} {}: {}".format(self.name, name, msg), bold=True)
        else:
            reporter.verbosity1("{} {}: {}".format(self.name, name, msg), bold=True)

    def info(self, name, msg):
        reporter.verbosity1("{} {}: {}".format(self.name, name, msg), bold=True)

    def popen(
        self,
        args,
        cwd=None,
        env=None,
        redirect=True,
        returnout=False,
        ignore_ret=False,
        capture_err=True,
        callback=None,
        report_fail=True,
    ):
        """this drives an interaction with a subprocess"""
        cwd = py.path.local() if cwd is None else cwd
        cmd_args = [str(x) for x in self._rewrite_args(cwd, args)]
        cmd_args_shell = " ".join(pipes.quote(i) for i in cmd_args)
        stream_getter = self._get_standard_streams(
            capture_err,
            cmd_args_shell,
            redirect,
            returnout,
            cwd,
        )
        exit_code, output = None, None
        with stream_getter as (fin, out_path, stderr, stdout):
            try:
                process = self.via_popen(
                    cmd_args,
                    stdout=stdout,
                    stderr=stderr,
                    cwd=str(cwd),
                    env=os.environ.copy() if env is None else env,
                    universal_newlines=True,
                    shell=False,
                    creationflags=(
                        subprocess.CREATE_NEW_PROCESS_GROUP
                        if sys.platform == "win32"
                        else 0
                        # needed for Windows signal send ability (CTRL+C)
                    ),
                )
            except OSError as exception:
                exit_code = exception.errno
            else:
                if callback is not None:
                    callback(process)
                reporter.log_popen(cwd, out_path, cmd_args_shell, process.pid)
                output = self.evaluate_cmd(fin, process, redirect)
                exit_code = process.returncode
            finally:
                if out_path is not None and out_path.exists():
                    lines = out_path.read_text("UTF-8").split("\n")
                    # first three lines are the action, cwd, and cmd - remove it
                    output = "\n".join(lines[3:])
                try:
                    if exit_code and not ignore_ret:
                        if report_fail:
                            msg = "invocation failed (exit code {:d})".format(exit_code)
                            if out_path is not None:
                                msg += ", logfile: {}".format(out_path)
                                if not out_path.exists():
                                    msg += " warning log file missing"
                            reporter.error(msg)
                            if out_path is not None and out_path.exists():
                                reporter.separator("=", "log start", Verbosity.QUIET)
                                reporter.quiet(output)
                                reporter.separator("=", "log end", Verbosity.QUIET)
                        raise InvocationError(cmd_args_shell, exit_code, output)
                finally:
                    self.command_log.add_command(cmd_args, output, exit_code)
        return output

    def evaluate_cmd(self, input_file_handler, process, redirect):
        try:
            if self.generate_tox_log and not redirect:
                if process.stderr is not None:
                    # prevent deadlock
                    raise ValueError("stderr must not be piped here")
                # we read binary from the process and must write using a binary stream
                buf = getattr(sys.stdout, "buffer", sys.stdout)
                last_time = time.time()
                while True:
                    # we have to read one byte at a time, otherwise there
                    # might be no output for a long time with slow tests
                    data = input_file_handler.read(1)
                    if data:
                        buf.write(data)
                        if b"\n" in data or (time.time() - last_time) > 1:
                            # we flush on newlines or after 1 second to
                            # provide quick enough feedback to the user
                            # when printing a dot per test
                            buf.flush()
                            last_time = time.time()
                    elif process.poll() is not None:
                        if process.stdout is not None:
                            process.stdout.close()
                        break
                    else:
                        time.sleep(0.1)
                        # the seek updates internal read buffers
                        input_file_handler.seek(0, 1)
                input_file_handler.close()
            out, _ = process.communicate()  # wait to finish
        except KeyboardInterrupt as exception:
            reporter.error("got KeyboardInterrupt signal")
            main_thread = is_main_thread()
            while True:
                try:
                    if main_thread:
                        # spin up a new thread to disable further interrupt on main thread
                        stopper = Thread(target=self.handle_interrupt, args=(process,))
                        stopper.start()
                        stopper.join()
                    else:
                        self.handle_interrupt(process)
                except KeyboardInterrupt:
                    continue
                break
            raise exception
        return out

    def handle_interrupt(self, process):
        """A three level stop mechanism for children - INT -> TERM -> KILL"""
        msg = "from {} {{}} pid {}".format(os.getpid(), process.pid)
        if self._wait(process, self.suicide_timeout) is None:
            self.info("KeyboardInterrupt", msg.format("SIGINT"))
            process.send_signal(signal.CTRL_C_EVENT if sys.platform == "win32" else signal.SIGINT)
            if self._wait(process, self.interrupt_timeout) is None:
                self.info("KeyboardInterrupt", msg.format("SIGTERM"))
                process.terminate()
                if self._wait(process, self.terminate_timeout) is None:
                    self.info("KeyboardInterrupt", msg.format("SIGKILL"))
                    process.kill()
                    process.communicate()

    @staticmethod
    def _wait(process, timeout):
        if sys.version_info >= (3, 3):
            # python 3 has timeout feature built-in
            try:
                process.communicate(timeout=timeout)
            except subprocess.TimeoutExpired:
                pass
        else:
            # on Python 2 we need to simulate it
            delay = 0.01
            while process.poll() is None and timeout > 0:
                time.sleep(delay)
                timeout -= delay
        return process.poll()

    @contextmanager
    def _get_standard_streams(self, capture_err, cmd_args_shell, redirect, returnout, cwd):
        stdout = out_path = input_file_handler = None
        stderr = subprocess.STDOUT if capture_err else None

        if self.generate_tox_log or redirect:
            out_path = self.get_log_path(self.name)
            with out_path.open("wt") as stdout, out_path.open("rb") as input_file_handler:
                msg = "action: {}, msg: {}\ncwd: {}\ncmd: {}\n".format(
                    self.name.replace("\n", " "),
                    self.msg.replace("\n", " "),
                    str(cwd).replace("\n", " "),
                    cmd_args_shell.replace("\n", " "),
                )
                stdout.write(msg)
                stdout.flush()
                input_file_handler.read()  # read the header, so it won't be written to stdout
                yield input_file_handler, out_path, stderr, stdout
                return

        if returnout:
            stdout = subprocess.PIPE

        yield input_file_handler, out_path, stderr, stdout

    def get_log_path(self, actionid):
        log_file = get_unique_file(self.log_dir, prefix=actionid, suffix=".log")
        return log_file

    def _rewrite_args(self, cwd, args):

        executable = None
        if INFO.IS_WIN:
            # shebang lines are not adhered on Windows so if it's a python script
            # pre-pend the interpreter
            ext = os.path.splitext(str(args[0]))[1].lower()
            if ext == ".py":
                executable = str(self.python)
        if executable is None:
            executable = args[0]
            args = args[1:]

        new_args = [executable]

        # to make the command shorter try to use relative paths for all subsequent arguments
        # note the executable cannot be relative as the Windows applies cwd after invocation
        for arg in args:
            if arg and os.path.isabs(str(arg)):
                arg_path = py.path.local(arg)
                if arg_path.exists() and arg_path.common(cwd) is not None:
                    potential_arg = cwd.bestrelpath(arg_path)
                    if len(potential_arg.split("..")) < 2:
                        # just one parent directory accepted as relative path
                        arg = potential_arg
            new_args.append(str(arg))

        return new_args

    def _install_sigterm_handler(self):
        """Handle sigterm as if it were a keyboardinterrupt"""

        def sigterm_handler(signum, frame):
            reporter.error("Got SIGTERM, handling it as a KeyboardInterrupt")
            raise KeyboardInterrupt()

        signal.signal(signal.SIGTERM, sigterm_handler)