Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / joblib   python

Repository URL to install this package:

/ externals / loky / process_executor.py

###############################################################################
# Re-implementation of the ProcessPoolExecutor more robust to faults
#
# author: Thomas Moreau and Olivier Grisel
#
# adapted from concurrent/futures/process_pool_executor.py (17/02/2017)
#  * Backport for python2.7/3.3,
#  * Add an extra management thread to detect queue_management_thread failures,
#  * Improve the shutdown process to avoid deadlocks,
#  * Add timeout for workers,
#  * More robust pickling process.
#
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.

"""Implements ProcessPoolExecutor.

The follow diagram and text describe the data-flow through the system:

|======================= In-process =====================|== Out-of-process ==|

+----------+     +----------+       +--------+     +-----------+    +---------+
|          |  => | Work Ids |       |        |     | Call Q    |    | Process |
|          |     +----------+       |        |     +-----------+    |  Pool   |
|          |     | ...      |       |        |     | ...       |    +---------+
|          |     | 6        |    => |        |  => | 5, call() | => |         |
|          |     | 7        |       |        |     | ...       |    |         |
| Process  |     | ...      |       | Local  |     +-----------+    | Process |
|  Pool    |     +----------+       | Worker |                      |  #1..n  |
| Executor |                        | Thread |                      |         |
|          |     +----------- +     |        |     +-----------+    |         |
|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
|          |     +------------+     |        |     +-----------+    |         |
|          |     | 6: call()  |     |        |     | ...       |    |         |
|          |     |    future  |     +--------+     | 4, result |    |         |
|          |     | ...        |                    | 3, except |    |         |
+----------+     +------------+                    +-----------+    +---------+

Executor.submit() called:
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
- adds the id of the _WorkItem to the "Work Ids" queue

Local worker thread:
- reads work ids from the "Work Ids" queue and looks up the corresponding
  WorkItem from the "Work Items" dict: if the work item has been cancelled then
  it is simply removed from the dict, otherwise it is repackaged as a
  _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
  until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
  calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
- reads _ResultItems from "Result Q", updates the future stored in the
  "Work Items" dict and deletes the dict entry

Process #1..n:
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
  _ResultItems in "Result Q"
"""


__author__ = 'Thomas Moreau (thomas.moreau.2010@gmail.com)'


import os
import gc
import sys
import struct
import weakref
import warnings
import itertools
import traceback
import threading
from time import time
import multiprocessing as mp
from functools import partial
from pickle import PicklingError

from . import _base
from .backend import get_context
from .backend.compat import queue
from .backend.compat import wait
from .backend.compat import set_cause
from .backend.context import cpu_count
from .backend.queues import Queue, SimpleQueue, Full
from .backend.reduction import set_loky_pickler, get_loky_pickler_name
from .backend.utils import recursive_terminate, get_exitcodes_terminated_worker

try:
    from concurrent.futures.process import BrokenProcessPool as _BPPException
except ImportError:
    _BPPException = RuntimeError


# Compatibility for python2.7
if sys.version_info[0] == 2:
    ProcessLookupError = OSError


# Workers are created as daemon threads and processes. This is done to allow
# the interpreter to exit when there are still idle processes in a
# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
# allowing workers to die with the interpreter has two undesirable properties:
#   - The workers would still be running during interpreter shutdown,
#     meaning that they would fail in unpredictable ways.
#   - The workers could be killed while evaluating a work item, which could
#     be bad if the callable being evaluated has external side-effects e.g.
#     writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads/processes finish.

_threads_wakeups = weakref.WeakKeyDictionary()
_global_shutdown = False

# Mechanism to prevent infinite process spawning. When a worker of a
# ProcessPoolExecutor nested in MAX_DEPTH Executor tries to create a new
# Executor, a LokyRecursionError is raised
MAX_DEPTH = int(os.environ.get("LOKY_MAX_DEPTH", 10))
_CURRENT_DEPTH = 0

# Minimum time interval between two consecutive memory leak protection checks.
_MEMORY_LEAK_CHECK_DELAY = 1.

# Number of bytes of memory usage allowed over the reference process size.
_MAX_MEMORY_LEAK_SIZE = int(1e8)


try:
    from psutil import Process
    _USE_PSUTIL = True

    def _get_memory_usage(pid, force_gc=False):
        if force_gc:
            gc.collect()

        return Process(pid).memory_info().rss

except ImportError:
    _USE_PSUTIL = False


class _ThreadWakeup:
    def __init__(self):
        self._reader, self._writer = mp.Pipe(duplex=False)

    def close(self):
        self._writer.close()
        self._reader.close()

    def wakeup(self):
        if sys.platform == "win32" and sys.version_info[:2] < (3, 4):
            # Compat for python2.7 on windows, where poll return false for
            # b"" messages. Use the slightly larger message b"0".
            self._writer.send_bytes(b"0")
        else:
            self._writer.send_bytes(b"")

    def clear(self):
        while self._reader.poll():
            self._reader.recv_bytes()


class _ExecutorFlags(object):
    """necessary references to maintain executor states without preventing gc

    It permits to keep the information needed by queue_management_thread
    and crash_detection_thread to maintain the pool without preventing the
    garbage collection of unreferenced executors.
    """
    def __init__(self):

        self.shutdown = False
        self.broken = None
        self.kill_workers = False
        self.shutdown_lock = threading.Lock()

    def flag_as_shutting_down(self, kill_workers=False):
        with self.shutdown_lock:
            self.shutdown = True
            self.kill_workers = kill_workers

    def flag_as_broken(self, broken):
        with self.shutdown_lock:
            self.shutdown = True
            self.broken = broken


def _python_exit():
    global _global_shutdown
    _global_shutdown = True
    items = list(_threads_wakeups.items())
    mp.util.debug("Interpreter shutting down. Waking up queue_manager_threads "
                  "{}".format(items))
    for thread, thread_wakeup in items:
        if thread.is_alive():
            thread_wakeup.wakeup()
    for thread, _ in items:
        thread.join()


# Module variable to register the at_exit call
process_pool_executor_at_exit = None

# Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for
# work while a larger number will make Future.cancel() succeed less frequently
# (Futures in the call queue cannot be cancelled).
EXTRA_QUEUED_CALLS = 1


class _RemoteTraceback(Exception):
    """Embed stringification of remote traceback in local traceback
    """
    def __init__(self, tb=None):
        self.tb = tb

    def __str__(self):
        return self.tb


class _ExceptionWithTraceback(BaseException):

    def __init__(self, exc):
        tb = getattr(exc, "__traceback__", None)
        if tb is None:
            _, _, tb = sys.exc_info()
        tb = traceback.format_exception(type(exc), exc, tb)
        tb = ''.join(tb)
        self.exc = exc
        self.tb = '\n"""\n%s"""' % tb

    def __reduce__(self):
        return _rebuild_exc, (self.exc, self.tb)


def _rebuild_exc(exc, tb):
    exc = set_cause(exc, _RemoteTraceback(tb))
    return exc


class _WorkItem(object):

    __slots__ = ["future", "fn", "args", "kwargs"]

    def __init__(self, future, fn, args, kwargs):
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs


class _ResultItem(object):

    def __init__(self, work_id, exception=None, result=None):
        self.work_id = work_id
        self.exception = exception
        self.result = result


class _CallItem(object):

    def __init__(self, work_id, fn, args, kwargs):
        self.work_id = work_id
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

        # Store the current loky_pickler so it is correctly set in the worker
        self.loky_pickler = get_loky_pickler_name()

    def __call__(self):
        set_loky_pickler(self.loky_pickler)
        return self.fn(*self.args, **self.kwargs)

    def __repr__(self):
        return "CallItem({}, {}, {}, {})".format(
            self.work_id, self.fn, self.args, self.kwargs)


class _SafeQueue(Queue):
    """Safe Queue set exception to the future object linked to a job"""
    def __init__(self, max_size=0, ctx=None, pending_work_items=None,
                 running_work_items=None, thread_wakeup=None, reducers=None):
        self.thread_wakeup = thread_wakeup
        self.pending_work_items = pending_work_items
        self.running_work_items = running_work_items
        super(_SafeQueue, self).__init__(max_size, reducers=reducers, ctx=ctx)

    def _on_queue_feeder_error(self, e, obj):
        if isinstance(obj, _CallItem):
            # format traceback only works on python3
            if isinstance(e, struct.error):
                raised_error = RuntimeError(
                    "The task could not be sent to the workers as it is too "
                    "large for `send_bytes`.")
            else:
                raised_error = PicklingError(
                    "Could not pickle the task to send it to the workers.")
            tb = traceback.format_exception(
                type(e), e, getattr(e, "__traceback__", None))
            raised_error = set_cause(raised_error, _RemoteTraceback(
                                     '\n"""\n{}"""'.format(''.join(tb))))
            work_item = self.pending_work_items.pop(obj.work_id, None)
            self.running_work_items.remove(obj.work_id)
            # work_item can be None if another process terminated. In this
            # case, the queue_manager_thread fails all work_items with
            # BrokenProcessPool
            if work_item is not None:
                work_item.future.set_exception(raised_error)
                del work_item
            self.thread_wakeup.wakeup()
        else:
            super(_SafeQueue, self)._on_queue_feeder_error(e, obj)


def _get_chunks(chunksize, *iterables):
    """Iterates over zip()ed iterables in chunks. """
    if sys.version_info < (3, 3):
        it = itertools.izip(*iterables)
    else:
        it = zip(*iterables)
    while True:
        chunk = tuple(itertools.islice(it, chunksize))
        if not chunk:
            return
        yield chunk


def _process_chunk(fn, chunk):
    """Processes a chunk of an iterable passed to map.

    Runs the function passed to map() on a chunk of the
    iterable passed to map.

    This function is run in a separate process.

    """
    return [fn(*args) for args in chunk]


def _sendback_result(result_queue, work_id, result=None, exception=None):
    """Safely send back the given result or exception"""
    try:
        result_queue.put(_ResultItem(work_id, result=result,
                                     exception=exception))
    except BaseException as e:
Loading ...