Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / uvloop   python

Repository URL to install this package:

Version: 0.9.1 

/ handles / process.pyx

@cython.no_gc_clear
cdef class UVProcess(UVHandle):
    """Abstract class; wrapper over uv_process_t handle."""

    def __cinit__(self):
        self.uv_opt_env = NULL
        self.uv_opt_args = NULL
        self._returncode = None
        self._pid = None
        self._fds_to_close = set()
        self._preexec_fn = None
        self._restore_signals = True

    cdef _init(self, Loop loop, list args, dict env,
               cwd, start_new_session,
               _stdin, _stdout, _stderr,  # std* can be defined as macros in C
               pass_fds, debug_flags, preexec_fn, restore_signals):

        global __forking
        global __forking_loop

        cdef int err

        self._start_init(loop)

        self._handle = <uv.uv_handle_t*> \
                            PyMem_RawMalloc(sizeof(uv.uv_process_t))
        if self._handle is NULL:
            self._abort_init()
            raise MemoryError()

        # Too early to call _finish_init, but still a lot of work to do.
        # Let's set handle.data to NULL, so in case something goes wrong,
        # callbacks have a chance to avoid casting *something* into UVHandle.
        self._handle.data = NULL

        try:
            self._init_options(args, env, cwd, start_new_session,
                               _stdin, _stdout, _stderr)

            restore_inheritable = set()
            if pass_fds:
                for fd in pass_fds:
                    if not os_get_inheritable(fd):
                        restore_inheritable.add(fd)
                        os_set_inheritable(fd, True)
        except:
            self._abort_init()
            raise

        if __forking or loop.active_process_handler is not None:
            # Our pthread_atfork handlers won't work correctly when
            # another loop is forking in another thread (even though
            # GIL should help us to avoid that.)
            self._abort_init()
            raise RuntimeError(
                'Racing with another loop to spawn a process.')

        self._errpipe_read, self._errpipe_write = os_pipe()
        try:
            os_set_inheritable(self._errpipe_write, True)

            self._preexec_fn = preexec_fn
            self._restore_signals = restore_signals

            loop.active_process_handler = self
            __forking = 1
            __forking_loop = loop

            _PyImport_AcquireLock()

            err = uv.uv_spawn(loop.uvloop,
                              <uv.uv_process_t*>self._handle,
                              &self.options)

            __forking = 0
            __forking_loop = None
            loop.active_process_handler = None

            if _PyImport_ReleaseLock() < 0:
                # See CPython/posixmodule.c for details
                self._abort_init()
                raise RuntimeError('not holding the import lock')

            if err < 0:
                self._abort_init()
                raise convert_error(err)

            self._finish_init()

            os_close(self._errpipe_write)

            if preexec_fn is not None:
                errpipe_data = bytearray()
                while True:
                    # XXX: This is a blocking code that has to be
                    # rewritten (using loop.connect_read_pipe() or
                    # otherwise.)
                    part = os_read(self._errpipe_read, 50000)
                    errpipe_data += part
                    if not part or len(errpipe_data) > 50000:
                        break

        finally:
            os_close(self._errpipe_read)
            try:
                os_close(self._errpipe_write)
            except OSError:
                # Might be already closed
                pass

        # asyncio caches the PID in BaseSubprocessTransport,
        # so that the transport knows what the PID was even
        # after the process is finished.
        self._pid = (<uv.uv_process_t*>self._handle).pid

        for fd in restore_inheritable:
            os_set_inheritable(fd, False)

        fds_to_close = self._fds_to_close
        self._fds_to_close = None
        for fd in fds_to_close:
            os_close(fd)

        if debug_flags & __PROCESS_DEBUG_SLEEP_AFTER_FORK:
            time_sleep(1)

        if preexec_fn is not None and errpipe_data:
            # preexec_fn has raised an exception.  The child
            # process must be dead now.
            try:
                exc_name, exc_msg = errpipe_data.split(b':', 1)
                exc_name = exc_name.decode()
                exc_msg = exc_msg.decode()
            except:
                self._close()
                raise subprocess_SubprocessError(
                    'Bad exception data from child: {!r}'.format(
                        errpipe_data))
            exc_cls = getattr(__builtins__, exc_name,
                              subprocess_SubprocessError)

            exc = subprocess_SubprocessError(
                'Exception occurred in preexec_fn.')
            exc.__cause__ = exc_cls(exc_msg)
            self._close()
            raise exc

    cdef _after_fork(self):
        # See CPython/_posixsubprocess.c for details
        cdef int err

        if self._restore_signals:
            _Py_RestoreSignals()

        PyOS_AfterFork()

        err = uv.uv_loop_fork(self._loop.uvloop)
        if err < 0:
            raise convert_error(err)

        if self._preexec_fn is not None:
            try:
                gc_disable()
                self._preexec_fn()
            except BaseException as ex:
                try:
                    with open(self._errpipe_write, 'wb') as f:
                        f.write(str(ex.__class__.__name__).encode())
                        f.write(b':')
                        f.write(str(ex.args[0]).encode())
                finally:
                    system._exit(255)
            else:
                os_close(self._errpipe_write)
        else:
            os_close(self._errpipe_write)

    cdef _close_after_spawn(self, int fd):
        if self._fds_to_close is None:
            raise RuntimeError(
                'UVProcess._close_after_spawn called after uv_spawn')
        self._fds_to_close.add(fd)

    def __dealloc__(self):
        if self.uv_opt_env is not NULL:
            PyMem_RawFree(self.uv_opt_env)
            self.uv_opt_env = NULL

        if self.uv_opt_args is not NULL:
            PyMem_RawFree(self.uv_opt_args)
            self.uv_opt_args = NULL

    cdef char** __to_cstring_array(self, list arr):
        cdef:
            int i
            int arr_len = len(arr)
            bytes el

            char **ret

        if UVLOOP_DEBUG:
            assert arr_len > 0

        ret = <char **>PyMem_RawMalloc((arr_len + 1) * sizeof(char *))
        if ret is NULL:
            raise MemoryError()

        for i in range(arr_len):
            el = arr[i]
            # NB: PyBytes_AsSptring doesn't copy the data;
            # we have to be careful when the "arr" is GCed,
            # and it shouldn't be ever mutated.
            ret[i] = PyBytes_AsString(el)

        ret[arr_len] = NULL
        return ret

    cdef _init_options(self, list args, dict env, cwd, start_new_session,
                       _stdin, _stdout, _stderr):

        memset(&self.options, 0, sizeof(uv.uv_process_options_t))

        self._init_env(env)
        self.options.env = self.uv_opt_env

        self._init_args(args)
        self.options.file = self.uv_opt_file
        self.options.args = self.uv_opt_args

        if start_new_session:
            self.options.flags |= uv.UV_PROCESS_DETACHED

        if cwd is not None:
            try:
                # Lookup __fspath__ manually, as os.fspath() isn't
                # available on Python 3.5.
                fspath = type(cwd).__fspath__
            except AttributeError:
                pass
            else:
                cwd = fspath(cwd)

            if isinstance(cwd, str):
                cwd = PyUnicode_EncodeFSDefault(cwd)
            if not isinstance(cwd, bytes):
                raise ValueError('cwd must be a str or bytes object')

            self.__cwd = cwd
            self.options.cwd = PyBytes_AsString(self.__cwd)

        self.options.exit_cb = &__uvprocess_on_exit_callback

        self._init_files(_stdin, _stdout, _stderr)

    cdef _init_args(self, list args):
        cdef:
            bytes path
            int an = len(args)

        if an < 1:
            raise ValueError('cannot spawn a process: args are empty')

        self.__args = args.copy()
        for i in range(an):
            arg = args[i]
            if isinstance(arg, str):
                self.__args[i] = PyUnicode_EncodeFSDefault(arg)
            elif not isinstance(arg, bytes):
                raise TypeError('all args must be str or bytes')

        path = self.__args[0]
        self.uv_opt_file = PyBytes_AsString(path)
        self.uv_opt_args = self.__to_cstring_array(self.__args)

    cdef _init_env(self, dict env):
        if env is not None and len(env):
            self.__env = list()
            for key in env:
                val = env[key]

                if isinstance(key, str):
                    key = PyUnicode_EncodeFSDefault(key)
                elif not isinstance(key, bytes):
                    raise TypeError(
                        'all environment vars must be bytes or str')

                if isinstance(val, str):
                    val = PyUnicode_EncodeFSDefault(val)
                elif not isinstance(val, bytes):
                    raise TypeError(
                        'all environment values must be bytes or str')

                self.__env.append(key + b'=' + val)

            self.uv_opt_env = self.__to_cstring_array(self.__env)
        else:
            self.__env = None

    cdef _init_files(self, _stdin, _stdout, _stderr):
        self.options.stdio_count = 0

    cdef _kill(self, int signum):
        cdef int err
        self._ensure_alive()
        err = uv.uv_process_kill(<uv.uv_process_t*>self._handle, signum)
        if err < 0:
            raise convert_error(err)

    cdef _on_exit(self, int64_t exit_status, int term_signal):
        if term_signal:
            # From Python docs:
            #    A negative value -N indicates that the child was
            #    terminated by signal N (POSIX only).
            self._returncode = -term_signal
        else:
            self._returncode = exit_status

        self._close()


DEF _CALL_PIPE_DATA_RECEIVED = 0
DEF _CALL_PIPE_CONNECTION_LOST = 1
DEF _CALL_PROCESS_EXITED = 2
DEF _CALL_CONNECTION_LOST = 3


@cython.no_gc_clear
cdef class UVProcessTransport(UVProcess):
    def __cinit__(self):
        self._exit_waiters = []
        self._protocol = None

        self._init_futs = []
        self._pending_calls = []
        self._stdio_ready = 0

        self._stdin = self._stdout = self._stderr = None
        self.stdin_proto = self.stdout_proto = self.stderr_proto = None

        self._finished = 0

    cdef _on_exit(self, int64_t exit_status, int term_signal):
        UVProcess._on_exit(self, exit_status, term_signal)
Loading ...