Repository URL to install this package:
|
Version:
3.0.0.dev0 ▾
|
import asyncio
import itertools
import logging
import subprocess
import textwrap
import types
from typing import List
class SubprocessCalledProcessError(subprocess.CalledProcessError):
"""The subprocess.CalledProcessError with stripped stdout."""
LAST_N_LINES = 50
def __init__(self, *args, cmd_index=None, **kwargs):
self.cmd_index = cmd_index
super().__init__(*args, **kwargs)
@staticmethod
def _get_last_n_line(str_data: str, last_n_lines: int) -> str:
if last_n_lines < 0:
return str_data
lines = str_data.strip().split("\n")
return "\n".join(lines[-last_n_lines:])
def __str__(self):
str_list = (
[]
if self.cmd_index is None
else [f"Run cmd[{self.cmd_index}] failed with the following details."]
)
str_list.append(super().__str__())
out = {
"stdout": self.stdout,
"stderr": self.stderr,
}
for name, s in out.items():
if s:
subtitle = f"Last {self.LAST_N_LINES} lines of {name}:"
last_n_line_str = self._get_last_n_line(s, self.LAST_N_LINES).strip()
str_list.append(
f"{subtitle}\n{textwrap.indent(last_n_line_str, ' ' * 4)}"
)
return "\n".join(str_list)
async def check_output_cmd(
cmd: List[str],
*,
logger: logging.Logger,
cmd_index_gen: types.GeneratorType = itertools.count(1),
**kwargs,
) -> str:
"""Run command with arguments and return its output.
If the return code was non-zero it raises a CalledProcessError. The
CalledProcessError object will have the return code in the returncode
attribute and any output in the output attribute.
Args:
cmd: The cmdline should be a sequence of program arguments or else
a single string or path-like object. The program to execute is
the first item in cmd.
logger: The logger instance.
cmd_index_gen: The cmd index generator, default is itertools.count(1).
kwargs: All arguments are passed to the create_subprocess_exec.
Returns:
The stdout of cmd.
Raises:
CalledProcessError: If the return code of cmd is not 0.
"""
cmd_index = next(cmd_index_gen)
logger.info("Run cmd[%s] %s", cmd_index, repr(cmd))
proc = None
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
**kwargs,
)
# Use communicate instead of polling stdout:
# * Avoid deadlocks due to streams pausing reading or writing and blocking the
# child process. Please refer to:
# https://docs.python.org/3/library/asyncio-subprocess.html#asyncio.asyncio.subprocess.Process.stderr
# * Avoid mixing multiple outputs of concurrent cmds.
stdout, _ = await proc.communicate()
except asyncio.exceptions.CancelledError as e:
# since Python 3.9, when cancelled, the inner process needs to throw as it is
# for asyncio to timeout properly https://bugs.python.org/issue40607
raise e
except BaseException as e:
raise RuntimeError(f"Run cmd[{cmd_index}] got exception.") from e
else:
stdout = stdout.decode("utf-8")
if stdout:
logger.info("Output of cmd[%s]: %s", cmd_index, stdout)
else:
logger.info("No output for cmd[%s]", cmd_index)
if proc.returncode != 0:
raise SubprocessCalledProcessError(
proc.returncode, cmd, output=stdout, cmd_index=cmd_index
)
return stdout
finally:
if proc is not None:
# Kill process.
try:
proc.kill()
except ProcessLookupError:
pass
# Wait process exit.
await proc.wait()