@cython.no_gc_clear
cdef class UVProcess(UVHandle):
"""Abstract class; wrapper over uv_process_t handle."""
def __cinit__(self):
self.uv_opt_env = NULL
self.uv_opt_args = NULL
self._returncode = None
self._pid = None
self._fds_to_close = set()
self._preexec_fn = None
self._restore_signals = True
cdef _init(self, Loop loop, list args, dict env,
cwd, start_new_session,
_stdin, _stdout, _stderr, # std* can be defined as macros in C
pass_fds, debug_flags, preexec_fn, restore_signals):
global __forking
global __forking_loop
cdef int err
self._start_init(loop)
self._handle = <uv.uv_handle_t*> \
PyMem_RawMalloc(sizeof(uv.uv_process_t))
if self._handle is NULL:
self._abort_init()
raise MemoryError()
# Too early to call _finish_init, but still a lot of work to do.
# Let's set handle.data to NULL, so in case something goes wrong,
# callbacks have a chance to avoid casting *something* into UVHandle.
self._handle.data = NULL
try:
self._init_options(args, env, cwd, start_new_session,
_stdin, _stdout, _stderr)
restore_inheritable = set()
if pass_fds:
for fd in pass_fds:
if not os_get_inheritable(fd):
restore_inheritable.add(fd)
os_set_inheritable(fd, True)
except:
self._abort_init()
raise
if __forking or loop.active_process_handler is not None:
# Our pthread_atfork handlers won't work correctly when
# another loop is forking in another thread (even though
# GIL should help us to avoid that.)
self._abort_init()
raise RuntimeError(
'Racing with another loop to spawn a process.')
self._errpipe_read, self._errpipe_write = os_pipe()
try:
os_set_inheritable(self._errpipe_write, True)
self._preexec_fn = preexec_fn
self._restore_signals = restore_signals
loop.active_process_handler = self
__forking = 1
__forking_loop = loop
_PyImport_AcquireLock()
err = uv.uv_spawn(loop.uvloop,
<uv.uv_process_t*>self._handle,
&self.options)
__forking = 0
__forking_loop = None
loop.active_process_handler = None
if _PyImport_ReleaseLock() < 0:
# See CPython/posixmodule.c for details
self._abort_init()
raise RuntimeError('not holding the import lock')
if err < 0:
self._abort_init()
raise convert_error(err)
self._finish_init()
os_close(self._errpipe_write)
if preexec_fn is not None:
errpipe_data = bytearray()
while True:
# XXX: This is a blocking code that has to be
# rewritten (using loop.connect_read_pipe() or
# otherwise.)
part = os_read(self._errpipe_read, 50000)
errpipe_data += part
if not part or len(errpipe_data) > 50000:
break
finally:
os_close(self._errpipe_read)
try:
os_close(self._errpipe_write)
except OSError:
# Might be already closed
pass
# asyncio caches the PID in BaseSubprocessTransport,
# so that the transport knows what the PID was even
# after the process is finished.
self._pid = (<uv.uv_process_t*>self._handle).pid
for fd in restore_inheritable:
os_set_inheritable(fd, False)
fds_to_close = self._fds_to_close
self._fds_to_close = None
for fd in fds_to_close:
os_close(fd)
if debug_flags & __PROCESS_DEBUG_SLEEP_AFTER_FORK:
time_sleep(1)
if preexec_fn is not None and errpipe_data:
# preexec_fn has raised an exception. The child
# process must be dead now.
try:
exc_name, exc_msg = errpipe_data.split(b':', 1)
exc_name = exc_name.decode()
exc_msg = exc_msg.decode()
except:
self._close()
raise subprocess_SubprocessError(
'Bad exception data from child: {!r}'.format(
errpipe_data))
exc_cls = getattr(__builtins__, exc_name,
subprocess_SubprocessError)
exc = subprocess_SubprocessError(
'Exception occurred in preexec_fn.')
exc.__cause__ = exc_cls(exc_msg)
self._close()
raise exc
cdef _after_fork(self):
# See CPython/_posixsubprocess.c for details
cdef int err
if self._restore_signals:
_Py_RestoreSignals()
PyOS_AfterFork()
err = uv.uv_loop_fork(self._loop.uvloop)
if err < 0:
raise convert_error(err)
if self._preexec_fn is not None:
try:
gc_disable()
self._preexec_fn()
except BaseException as ex:
try:
with open(self._errpipe_write, 'wb') as f:
f.write(str(ex.__class__.__name__).encode())
f.write(b':')
f.write(str(ex.args[0]).encode())
finally:
system._exit(255)
else:
os_close(self._errpipe_write)
else:
os_close(self._errpipe_write)
cdef _close_after_spawn(self, int fd):
if self._fds_to_close is None:
raise RuntimeError(
'UVProcess._close_after_spawn called after uv_spawn')
self._fds_to_close.add(fd)
def __dealloc__(self):
if self.uv_opt_env is not NULL:
PyMem_RawFree(self.uv_opt_env)
self.uv_opt_env = NULL
if self.uv_opt_args is not NULL:
PyMem_RawFree(self.uv_opt_args)
self.uv_opt_args = NULL
cdef char** __to_cstring_array(self, list arr):
cdef:
int i
int arr_len = len(arr)
bytes el
char **ret
if UVLOOP_DEBUG:
assert arr_len > 0
ret = <char **>PyMem_RawMalloc((arr_len + 1) * sizeof(char *))
if ret is NULL:
raise MemoryError()
for i in range(arr_len):
el = arr[i]
# NB: PyBytes_AsSptring doesn't copy the data;
# we have to be careful when the "arr" is GCed,
# and it shouldn't be ever mutated.
ret[i] = PyBytes_AsString(el)
ret[arr_len] = NULL
return ret
cdef _init_options(self, list args, dict env, cwd, start_new_session,
_stdin, _stdout, _stderr):
memset(&self.options, 0, sizeof(uv.uv_process_options_t))
self._init_env(env)
self.options.env = self.uv_opt_env
self._init_args(args)
self.options.file = self.uv_opt_file
self.options.args = self.uv_opt_args
if start_new_session:
self.options.flags |= uv.UV_PROCESS_DETACHED
if cwd is not None:
try:
# Lookup __fspath__ manually, as os.fspath() isn't
# available on Python 3.5.
fspath = type(cwd).__fspath__
except AttributeError:
pass
else:
cwd = fspath(cwd)
if isinstance(cwd, str):
cwd = PyUnicode_EncodeFSDefault(cwd)
if not isinstance(cwd, bytes):
raise ValueError('cwd must be a str or bytes object')
self.__cwd = cwd
self.options.cwd = PyBytes_AsString(self.__cwd)
self.options.exit_cb = &__uvprocess_on_exit_callback
self._init_files(_stdin, _stdout, _stderr)
cdef _init_args(self, list args):
cdef:
bytes path
int an = len(args)
if an < 1:
raise ValueError('cannot spawn a process: args are empty')
self.__args = args.copy()
for i in range(an):
arg = args[i]
if isinstance(arg, str):
self.__args[i] = PyUnicode_EncodeFSDefault(arg)
elif not isinstance(arg, bytes):
raise TypeError('all args must be str or bytes')
path = self.__args[0]
self.uv_opt_file = PyBytes_AsString(path)
self.uv_opt_args = self.__to_cstring_array(self.__args)
cdef _init_env(self, dict env):
if env is not None and len(env):
self.__env = list()
for key in env:
val = env[key]
if isinstance(key, str):
key = PyUnicode_EncodeFSDefault(key)
elif not isinstance(key, bytes):
raise TypeError(
'all environment vars must be bytes or str')
if isinstance(val, str):
val = PyUnicode_EncodeFSDefault(val)
elif not isinstance(val, bytes):
raise TypeError(
'all environment values must be bytes or str')
self.__env.append(key + b'=' + val)
self.uv_opt_env = self.__to_cstring_array(self.__env)
else:
self.__env = None
cdef _init_files(self, _stdin, _stdout, _stderr):
self.options.stdio_count = 0
cdef _kill(self, int signum):
cdef int err
self._ensure_alive()
err = uv.uv_process_kill(<uv.uv_process_t*>self._handle, signum)
if err < 0:
raise convert_error(err)
cdef _on_exit(self, int64_t exit_status, int term_signal):
if term_signal:
# From Python docs:
# A negative value -N indicates that the child was
# terminated by signal N (POSIX only).
self._returncode = -term_signal
else:
self._returncode = exit_status
self._close()
DEF _CALL_PIPE_DATA_RECEIVED = 0
DEF _CALL_PIPE_CONNECTION_LOST = 1
DEF _CALL_PROCESS_EXITED = 2
DEF _CALL_CONNECTION_LOST = 3
@cython.no_gc_clear
cdef class UVProcessTransport(UVProcess):
def __cinit__(self):
self._exit_waiters = []
self._protocol = None
self._init_futs = []
self._pending_calls = []
self._stdio_ready = 0
self._stdin = self._stdout = self._stderr = None
self.stdin_proto = self.stdout_proto = self.stderr_proto = None
self._finished = 0
cdef _on_exit(self, int64_t exit_status, int term_signal):
UVProcess._on_exit(self, exit_status, term_signal)
Loading ...