Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / joblib   python

Repository URL to install this package:

/ externals / loky / backend / popen_loky_posix.py

###############################################################################
# Popen for LokyProcess.
#
# author: Thomas Moreau and Olivier Grisel
#
import os
import sys
import signal
import pickle
from io import BytesIO

from . import reduction, spawn
from .context import get_spawning_popen, set_spawning_popen
from multiprocessing import util, process

if sys.version_info[:2] < (3, 3):
    ProcessLookupError = OSError

if sys.platform != "win32":
    from . import resource_tracker


__all__ = []

if sys.platform != "win32":
    #
    # Wrapper for an fd used while launching a process
    #

    class _DupFd(object):
        def __init__(self, fd):
            self.fd = reduction._mk_inheritable(fd)

        def detach(self):
            return self.fd

    #
    # Start child process using subprocess.Popen
    #

    __all__.append('Popen')

    class Popen(object):
        method = 'loky'
        DupFd = _DupFd

        def __init__(self, process_obj):
            sys.stdout.flush()
            sys.stderr.flush()
            self.returncode = None
            self._fds = []
            self._launch(process_obj)

        if sys.version_info < (3, 4):
            @classmethod
            def duplicate_for_child(cls, fd):
                popen = get_spawning_popen()
                popen._fds.append(fd)
                return reduction._mk_inheritable(fd)

        else:
            def duplicate_for_child(self, fd):
                self._fds.append(fd)
                return reduction._mk_inheritable(fd)

        def poll(self, flag=os.WNOHANG):
            if self.returncode is None:
                while True:
                    try:
                        pid, sts = os.waitpid(self.pid, flag)
                    except OSError:
                        # Child process not yet created. See #1731717
                        # e.errno == errno.ECHILD == 10
                        return None
                    else:
                        break
                if pid == self.pid:
                    if os.WIFSIGNALED(sts):
                        self.returncode = -os.WTERMSIG(sts)
                    else:
                        assert os.WIFEXITED(sts)
                        self.returncode = os.WEXITSTATUS(sts)
            return self.returncode

        def wait(self, timeout=None):
            if sys.version_info < (3, 3):
                import time
                if timeout is None:
                    return self.poll(0)
                deadline = time.time() + timeout
                delay = 0.0005
                while 1:
                    res = self.poll()
                    if res is not None:
                        break
                    remaining = deadline - time.time()
                    if remaining <= 0:
                        break
                    delay = min(delay * 2, remaining, 0.05)
                    time.sleep(delay)
                return res

            if self.returncode is None:
                if timeout is not None:
                    from multiprocessing.connection import wait
                    if not wait([self.sentinel], timeout):
                        return None
                # This shouldn't block if wait() returned successfully.
                return self.poll(os.WNOHANG if timeout == 0.0 else 0)
            return self.returncode

        def terminate(self):
            if self.returncode is None:
                try:
                    os.kill(self.pid, signal.SIGTERM)
                except ProcessLookupError:
                    pass
                except OSError:
                    if self.wait(timeout=0.1) is None:
                        raise

        def _launch(self, process_obj):

            tracker_fd = resource_tracker._resource_tracker.getfd()

            fp = BytesIO()
            set_spawning_popen(self)
            try:
                prep_data = spawn.get_preparation_data(
                    process_obj._name,
                    getattr(process_obj, "init_main_module", True))
                reduction.dump(prep_data, fp)
                reduction.dump(process_obj, fp)

            finally:
                set_spawning_popen(None)

            try:
                parent_r, child_w = os.pipe()
                child_r, parent_w = os.pipe()
                # for fd in self._fds:
                #     _mk_inheritable(fd)

                cmd_python = [sys.executable]
                cmd_python += ['-m', self.__module__]
                cmd_python += ['--process-name', str(process_obj.name)]
                cmd_python += ['--pipe',
                               str(reduction._mk_inheritable(child_r))]
                reduction._mk_inheritable(child_w)
                reduction._mk_inheritable(tracker_fd)
                self._fds.extend([child_r, child_w, tracker_fd])
                from .fork_exec import fork_exec
                pid = fork_exec(cmd_python, self._fds, env=process_obj.env)
                util.debug("launched python with pid {} and cmd:\n{}"
                           .format(pid, cmd_python))
                self.sentinel = parent_r

                method = 'getbuffer'
                if not hasattr(fp, method):
                    method = 'getvalue'
                with os.fdopen(parent_w, 'wb') as f:
                    f.write(getattr(fp, method)())
                self.pid = pid
            finally:
                if parent_r is not None:
                    util.Finalize(self, os.close, (parent_r,))
                for fd in (child_r, child_w):
                    if fd is not None:
                        os.close(fd)

        @staticmethod
        def thread_is_spawning():
            return True


if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser('Command line parser')
    parser.add_argument('--pipe', type=int, required=True,
                        help='File handle for the pipe')
    parser.add_argument('--process-name', type=str, default=None,
                        help='Identifier for debugging purpose')

    args = parser.parse_args()

    info = dict()

    exitcode = 1
    try:
        with os.fdopen(args.pipe, 'rb') as from_parent:
            process.current_process()._inheriting = True
            try:
                prep_data = pickle.load(from_parent)
                spawn.prepare(prep_data)
                process_obj = pickle.load(from_parent)
            finally:
                del process.current_process()._inheriting

        exitcode = process_obj._bootstrap()
    except Exception:
        print('\n\n' + '-' * 80)
        print('{} failed with traceback: '.format(args.process_name))
        print('-' * 80)
        import traceback
        print(traceback.format_exc())
        print('\n' + '-' * 80)
    finally:
        if from_parent is not None:
            from_parent.close()

        sys.exit(exitcode)