Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ray / _private / runtime_env / utils.py
Size: Mime:
import asyncio
import itertools
import logging
import subprocess
import textwrap
import types
from typing import List


class SubprocessCalledProcessError(subprocess.CalledProcessError):
    """The subprocess.CalledProcessError with stripped stdout."""

    LAST_N_LINES = 50

    def __init__(self, *args, cmd_index=None, **kwargs):
        self.cmd_index = cmd_index
        super().__init__(*args, **kwargs)

    @staticmethod
    def _get_last_n_line(str_data: str, last_n_lines: int) -> str:
        if last_n_lines < 0:
            return str_data
        lines = str_data.strip().split("\n")
        return "\n".join(lines[-last_n_lines:])

    def __str__(self):
        str_list = (
            []
            if self.cmd_index is None
            else [f"Run cmd[{self.cmd_index}] failed with the following details."]
        )
        str_list.append(super().__str__())
        out = {
            "stdout": self.stdout,
            "stderr": self.stderr,
        }
        for name, s in out.items():
            if s:
                subtitle = f"Last {self.LAST_N_LINES} lines of {name}:"
                last_n_line_str = self._get_last_n_line(s, self.LAST_N_LINES).strip()
                str_list.append(
                    f"{subtitle}\n{textwrap.indent(last_n_line_str, ' ' * 4)}"
                )
        return "\n".join(str_list)


async def check_output_cmd(
    cmd: List[str],
    *,
    logger: logging.Logger,
    cmd_index_gen: types.GeneratorType = itertools.count(1),
    **kwargs,
) -> str:
    """Run command with arguments and return its output.

    If the return code was non-zero it raises a CalledProcessError. The
    CalledProcessError object will have the return code in the returncode
    attribute and any output in the output attribute.

    Args:
        cmd: The cmdline should be a sequence of program arguments or else
            a single string or path-like object. The program to execute is
            the first item in cmd.
        logger: The logger instance.
        cmd_index_gen: The cmd index generator, default is itertools.count(1).
        kwargs: All arguments are passed to the create_subprocess_exec.

    Returns:
        The stdout of cmd.

    Raises:
        CalledProcessError: If the return code of cmd is not 0.
    """

    cmd_index = next(cmd_index_gen)
    logger.info("Run cmd[%s] %s", cmd_index, repr(cmd))

    proc = None
    try:
        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.STDOUT,
            **kwargs,
        )
        # Use communicate instead of polling stdout:
        #   * Avoid deadlocks due to streams pausing reading or writing and blocking the
        #     child process. Please refer to:
        #     https://docs.python.org/3/library/asyncio-subprocess.html#asyncio.asyncio.subprocess.Process.stderr
        #   * Avoid mixing multiple outputs of concurrent cmds.
        stdout, _ = await proc.communicate()
    except asyncio.exceptions.CancelledError as e:
        # since Python 3.9, when cancelled, the inner process needs to throw as it is
        # for asyncio to timeout properly https://bugs.python.org/issue40607
        raise e
    except BaseException as e:
        raise RuntimeError(f"Run cmd[{cmd_index}] got exception.") from e
    else:
        stdout = stdout.decode("utf-8")
        if stdout:
            logger.info("Output of cmd[%s]: %s", cmd_index, stdout)
        else:
            logger.info("No output for cmd[%s]", cmd_index)
        if proc.returncode != 0:
            raise SubprocessCalledProcessError(
                proc.returncode, cmd, output=stdout, cmd_index=cmd_index
            )
        return stdout
    finally:
        if proc is not None:
            # Kill process.
            try:
                proc.kill()
            except ProcessLookupError:
                pass
            # Wait process exit.
            await proc.wait()