###############################################################################
# Re-implementation of the ProcessPoolExecutor more robust to faults
#
# author: Thomas Moreau and Olivier Grisel
#
# adapted from concurrent/futures/process_pool_executor.py (17/02/2017)
# * Backport for python2.7/3.3,
# * Add an extra management thread to detect queue_management_thread failures,
# * Improve the shutdown process to avoid deadlocks,
# * Add timeout for workers,
# * More robust pickling process.
#
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Implements ProcessPoolExecutor.
The follow diagram and text describe the data-flow through the system:
|======================= In-process =====================|== Out-of-process ==|
+----------+ +----------+ +--------+ +-----------+ +---------+
| | => | Work Ids | | | | Call Q | | Process |
| | +----------+ | | +-----------+ | Pool |
| | | ... | | | | ... | +---------+
| | | 6 | => | | => | 5, call() | => | |
| | | 7 | | | | ... | | |
| Process | | ... | | Local | +-----------+ | Process |
| Pool | +----------+ | Worker | | #1..n |
| Executor | | Thread | | |
| | +----------- + | | +-----------+ | |
| | <=> | Work Items | <=> | | <= | Result Q | <= | |
| | +------------+ | | +-----------+ | |
| | | 6: call() | | | | ... | | |
| | | future | +--------+ | 4, result | | |
| | | ... | | 3, except | | |
+----------+ +------------+ +-----------+ +---------+
Executor.submit() called:
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
- adds the id of the _WorkItem to the "Work Ids" queue
Local worker thread:
- reads work ids from the "Work Ids" queue and looks up the corresponding
WorkItem from the "Work Items" dict: if the work item has been cancelled then
it is simply removed from the dict, otherwise it is repackaged as a
_CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
- reads _ResultItems from "Result Q", updates the future stored in the
"Work Items" dict and deletes the dict entry
Process #1..n:
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
_ResultItems in "Result Q"
"""
__author__ = 'Thomas Moreau (thomas.moreau.2010@gmail.com)'
import os
import gc
import sys
import struct
import weakref
import warnings
import itertools
import traceback
import threading
from time import time
import multiprocessing as mp
from functools import partial
from pickle import PicklingError
from . import _base
from .backend import get_context
from .backend.compat import queue
from .backend.compat import wait
from .backend.compat import set_cause
from .backend.context import cpu_count
from .backend.queues import Queue, SimpleQueue, Full
from .backend.reduction import set_loky_pickler, get_loky_pickler_name
from .backend.utils import recursive_terminate, get_exitcodes_terminated_worker
try:
from concurrent.futures.process import BrokenProcessPool as _BPPException
except ImportError:
_BPPException = RuntimeError
# Compatibility for python2.7
if sys.version_info[0] == 2:
ProcessLookupError = OSError
# Workers are created as daemon threads and processes. This is done to allow
# the interpreter to exit when there are still idle processes in a
# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
# allowing workers to die with the interpreter has two undesirable properties:
# - The workers would still be running during interpreter shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads/processes finish.
_threads_wakeups = weakref.WeakKeyDictionary()
_global_shutdown = False
# Mechanism to prevent infinite process spawning. When a worker of a
# ProcessPoolExecutor nested in MAX_DEPTH Executor tries to create a new
# Executor, a LokyRecursionError is raised
MAX_DEPTH = int(os.environ.get("LOKY_MAX_DEPTH", 10))
_CURRENT_DEPTH = 0
# Minimum time interval between two consecutive memory leak protection checks.
_MEMORY_LEAK_CHECK_DELAY = 1.
# Number of bytes of memory usage allowed over the reference process size.
_MAX_MEMORY_LEAK_SIZE = int(1e8)
try:
from psutil import Process
_USE_PSUTIL = True
def _get_memory_usage(pid, force_gc=False):
if force_gc:
gc.collect()
return Process(pid).memory_info().rss
except ImportError:
_USE_PSUTIL = False
class _ThreadWakeup:
def __init__(self):
self._reader, self._writer = mp.Pipe(duplex=False)
def close(self):
self._writer.close()
self._reader.close()
def wakeup(self):
if sys.platform == "win32" and sys.version_info[:2] < (3, 4):
# Compat for python2.7 on windows, where poll return false for
# b"" messages. Use the slightly larger message b"0".
self._writer.send_bytes(b"0")
else:
self._writer.send_bytes(b"")
def clear(self):
while self._reader.poll():
self._reader.recv_bytes()
class _ExecutorFlags(object):
"""necessary references to maintain executor states without preventing gc
It permits to keep the information needed by queue_management_thread
and crash_detection_thread to maintain the pool without preventing the
garbage collection of unreferenced executors.
"""
def __init__(self):
self.shutdown = False
self.broken = None
self.kill_workers = False
self.shutdown_lock = threading.Lock()
def flag_as_shutting_down(self, kill_workers=False):
with self.shutdown_lock:
self.shutdown = True
self.kill_workers = kill_workers
def flag_as_broken(self, broken):
with self.shutdown_lock:
self.shutdown = True
self.broken = broken
def _python_exit():
global _global_shutdown
_global_shutdown = True
items = list(_threads_wakeups.items())
mp.util.debug("Interpreter shutting down. Waking up queue_manager_threads "
"{}".format(items))
for thread, thread_wakeup in items:
if thread.is_alive():
thread_wakeup.wakeup()
for thread, _ in items:
thread.join()
# Module variable to register the at_exit call
process_pool_executor_at_exit = None
# Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for
# work while a larger number will make Future.cancel() succeed less frequently
# (Futures in the call queue cannot be cancelled).
EXTRA_QUEUED_CALLS = 1
class _RemoteTraceback(Exception):
"""Embed stringification of remote traceback in local traceback
"""
def __init__(self, tb=None):
self.tb = tb
def __str__(self):
return self.tb
class _ExceptionWithTraceback(BaseException):
def __init__(self, exc):
tb = getattr(exc, "__traceback__", None)
if tb is None:
_, _, tb = sys.exc_info()
tb = traceback.format_exception(type(exc), exc, tb)
tb = ''.join(tb)
self.exc = exc
self.tb = '\n"""\n%s"""' % tb
def __reduce__(self):
return _rebuild_exc, (self.exc, self.tb)
def _rebuild_exc(exc, tb):
exc = set_cause(exc, _RemoteTraceback(tb))
return exc
class _WorkItem(object):
__slots__ = ["future", "fn", "args", "kwargs"]
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None):
self.work_id = work_id
self.exception = exception
self.result = result
class _CallItem(object):
def __init__(self, work_id, fn, args, kwargs):
self.work_id = work_id
self.fn = fn
self.args = args
self.kwargs = kwargs
# Store the current loky_pickler so it is correctly set in the worker
self.loky_pickler = get_loky_pickler_name()
def __call__(self):
set_loky_pickler(self.loky_pickler)
return self.fn(*self.args, **self.kwargs)
def __repr__(self):
return "CallItem({}, {}, {}, {})".format(
self.work_id, self.fn, self.args, self.kwargs)
class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
def __init__(self, max_size=0, ctx=None, pending_work_items=None,
running_work_items=None, thread_wakeup=None, reducers=None):
self.thread_wakeup = thread_wakeup
self.pending_work_items = pending_work_items
self.running_work_items = running_work_items
super(_SafeQueue, self).__init__(max_size, reducers=reducers, ctx=ctx)
def _on_queue_feeder_error(self, e, obj):
if isinstance(obj, _CallItem):
# format traceback only works on python3
if isinstance(e, struct.error):
raised_error = RuntimeError(
"The task could not be sent to the workers as it is too "
"large for `send_bytes`.")
else:
raised_error = PicklingError(
"Could not pickle the task to send it to the workers.")
tb = traceback.format_exception(
type(e), e, getattr(e, "__traceback__", None))
raised_error = set_cause(raised_error, _RemoteTraceback(
'\n"""\n{}"""'.format(''.join(tb))))
work_item = self.pending_work_items.pop(obj.work_id, None)
self.running_work_items.remove(obj.work_id)
# work_item can be None if another process terminated. In this
# case, the queue_manager_thread fails all work_items with
# BrokenProcessPool
if work_item is not None:
work_item.future.set_exception(raised_error)
del work_item
self.thread_wakeup.wakeup()
else:
super(_SafeQueue, self)._on_queue_feeder_error(e, obj)
def _get_chunks(chunksize, *iterables):
"""Iterates over zip()ed iterables in chunks. """
if sys.version_info < (3, 3):
it = itertools.izip(*iterables)
else:
it = zip(*iterables)
while True:
chunk = tuple(itertools.islice(it, chunksize))
if not chunk:
return
yield chunk
def _process_chunk(fn, chunk):
"""Processes a chunk of an iterable passed to map.
Runs the function passed to map() on a chunk of the
iterable passed to map.
This function is run in a separate process.
"""
return [fn(*args) for args in chunk]
def _sendback_result(result_queue, work_id, result=None, exception=None):
"""Safely send back the given result or exception"""
try:
result_queue.put(_ResultItem(work_id, result=result,
exception=exception))
except BaseException as e:
Loading ...