"""
Helpers for embarrassingly parallel code.
"""
# Author: Gael Varoquaux < gael dot varoquaux at normalesup dot org >
# Copyright: 2010, Gael Varoquaux
# License: BSD 3 clause
from __future__ import division
import os
import sys
from math import sqrt
import functools
import time
import threading
import itertools
from numbers import Integral
import warnings
from ._multiprocessing_helpers import mp
from .format_stack import format_outer_frames
from .logger import Logger, short_format_time
from .my_exceptions import TransportableException
from .disk import memstr_to_bytes
from ._parallel_backends import (FallbackToBackend, MultiprocessingBackend,
ThreadingBackend, SequentialBackend,
LokyBackend)
from ._compat import _basestring
from .externals.cloudpickle import dumps, loads
from .externals import loky
# Make sure that those two classes are part of the public joblib.parallel API
# so that 3rd party backend implementers can import them from here.
from ._parallel_backends import AutoBatchingMixin # noqa
from ._parallel_backends import ParallelBackendBase # noqa
try:
import queue
except ImportError: # backward compat for Python 2
import Queue as queue
BACKENDS = {
'multiprocessing': MultiprocessingBackend,
'threading': ThreadingBackend,
'sequential': SequentialBackend,
'loky': LokyBackend,
}
# name of the backend used by default by Parallel outside of any context
# managed by ``parallel_backend``.
DEFAULT_BACKEND = 'loky'
DEFAULT_N_JOBS = 1
DEFAULT_THREAD_BACKEND = 'threading'
# Thread local value that can be overridden by the ``parallel_backend`` context
# manager
_backend = threading.local()
VALID_BACKEND_HINTS = ('processes', 'threads', None)
VALID_BACKEND_CONSTRAINTS = ('sharedmem', None)
def _register_dask():
""" Register Dask Backend if called with parallel_backend("dask") """
try:
from ._dask import DaskDistributedBackend
register_parallel_backend('dask', DaskDistributedBackend)
except ImportError:
msg = ("To use the dask.distributed backend you must install both "
"the `dask` and distributed modules.\n\n"
"See https://dask.pydata.org/en/latest/install.html for more "
"information.")
raise ImportError(msg)
EXTERNAL_BACKENDS = {
'dask': _register_dask,
}
def get_active_backend(prefer=None, require=None, verbose=0):
"""Return the active default backend"""
if prefer not in VALID_BACKEND_HINTS:
raise ValueError("prefer=%r is not a valid backend hint, "
"expected one of %r" % (prefer, VALID_BACKEND_HINTS))
if require not in VALID_BACKEND_CONSTRAINTS:
raise ValueError("require=%r is not a valid backend constraint, "
"expected one of %r"
% (require, VALID_BACKEND_CONSTRAINTS))
if prefer == 'processes' and require == 'sharedmem':
raise ValueError("prefer == 'processes' and require == 'sharedmem'"
" are inconsistent settings")
backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
if backend_and_jobs is not None:
# Try to use the backend set by the user with the context manager.
backend, n_jobs = backend_and_jobs
nesting_level = backend.nesting_level
supports_sharedmem = getattr(backend, 'supports_sharedmem', False)
if require == 'sharedmem' and not supports_sharedmem:
# This backend does not match the shared memory constraint:
# fallback to the default thead-based backend.
sharedmem_backend = BACKENDS[DEFAULT_THREAD_BACKEND](
nesting_level=nesting_level)
if verbose >= 10:
print("Using %s as joblib.Parallel backend instead of %s "
"as the latter does not provide shared memory semantics."
% (sharedmem_backend.__class__.__name__,
backend.__class__.__name__))
return sharedmem_backend, DEFAULT_N_JOBS
else:
return backend_and_jobs
# We are outside of the scope of any parallel_backend context manager,
# create the default backend instance now.
backend = BACKENDS[DEFAULT_BACKEND](nesting_level=0)
supports_sharedmem = getattr(backend, 'supports_sharedmem', False)
uses_threads = getattr(backend, 'uses_threads', False)
if ((require == 'sharedmem' and not supports_sharedmem) or
(prefer == 'threads' and not uses_threads)):
# Make sure the selected default backend match the soft hints and
# hard constraints:
backend = BACKENDS[DEFAULT_THREAD_BACKEND](nesting_level=0)
return backend, DEFAULT_N_JOBS
class parallel_backend(object):
"""Change the default backend used by Parallel inside a with block.
If ``backend`` is a string it must match a previously registered
implementation using the ``register_parallel_backend`` function.
By default the following backends are available:
- 'loky': single-host, process-based parallelism (used by default),
- 'threading': single-host, thread-based parallelism,
- 'multiprocessing': legacy single-host, process-based parallelism.
'loky' is recommended to run functions that manipulate Python objects.
'threading' is a low-overhead alternative that is most efficient for
functions that release the Global Interpreter Lock: e.g. I/O-bound code or
CPU-bound code in a few calls to native code that explicitly releases the
GIL.
In addition, if the `dask` and `distributed` Python packages are installed,
it is possible to use the 'dask' backend for better scheduling of nested
parallel calls without over-subscription and potentially distribute
parallel calls over a networked cluster of several hosts.
Alternatively the backend can be passed directly as an instance.
By default all available workers will be used (``n_jobs=-1``) unless the
caller passes an explicit value for the ``n_jobs`` parameter.
This is an alternative to passing a ``backend='backend_name'`` argument to
the ``Parallel`` class constructor. It is particularly useful when calling
into library code that uses joblib internally but does not expose the
backend argument in its own API.
>>> from operator import neg
>>> with parallel_backend('threading'):
... print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
...
[-1, -2, -3, -4, -5]
Warning: this function is experimental and subject to change in a future
version of joblib.
Joblib also tries to limit the oversubscription by limiting the number of
threads usable in some third-party library threadpools like OpenBLAS, MKL
or OpenMP. The default limit in each worker is set to
``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be
overwritten with the ``inner_max_num_threads`` argument which will be used
to set this limit in the child processes.
.. versionadded:: 0.10
"""
def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,
**backend_params):
if isinstance(backend, _basestring):
if backend not in BACKENDS and backend in EXTERNAL_BACKENDS:
register = EXTERNAL_BACKENDS[backend]
register()
backend = BACKENDS[backend](**backend_params)
if inner_max_num_threads is not None:
msg = ("{} does not accept setting the inner_max_num_threads "
"argument.".format(backend.__class__.__name__))
assert backend.supports_inner_max_num_threads, msg
backend.inner_max_num_threads = inner_max_num_threads
# If the nesting_level of the backend is not set previously, use the
# nesting level from the previous active_backend to set it
current_backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
if backend.nesting_level is None:
if current_backend_and_jobs is None:
nesting_level = 0
else:
nesting_level = current_backend_and_jobs[0].nesting_level
backend.nesting_level = nesting_level
# Save the backends info and set the active backend
self.old_backend_and_jobs = current_backend_and_jobs
self.new_backend_and_jobs = (backend, n_jobs)
_backend.backend_and_jobs = (backend, n_jobs)
def __enter__(self):
return self.new_backend_and_jobs
def __exit__(self, type, value, traceback):
self.unregister()
def unregister(self):
if self.old_backend_and_jobs is None:
if getattr(_backend, 'backend_and_jobs', None) is not None:
del _backend.backend_and_jobs
else:
_backend.backend_and_jobs = self.old_backend_and_jobs
# Under Linux or OS X the default start method of multiprocessing
# can cause third party libraries to crash. Under Python 3.4+ it is possible
# to set an environment variable to switch the default start method from
# 'fork' to 'forkserver' or 'spawn' to avoid this issue albeit at the cost
# of causing semantic changes and some additional pool instantiation overhead.
DEFAULT_MP_CONTEXT = None
if hasattr(mp, 'get_context'):
method = os.environ.get('JOBLIB_START_METHOD', '').strip() or None
if method is not None:
DEFAULT_MP_CONTEXT = mp.get_context(method=method)
class BatchedCalls(object):
"""Wrap a sequence of (func, args, kwargs) tuples as a single callable"""
def __init__(self, iterator_slice, backend_and_jobs, pickle_cache=None):
self.items = list(iterator_slice)
self._size = len(self.items)
if isinstance(backend_and_jobs, tuple):
self._backend, self._n_jobs = backend_and_jobs
else:
# this is for backward compatibility purposes. Before 0.12.6,
# nested backends were returned without n_jobs indications.
self._backend, self._n_jobs = backend_and_jobs, None
self._pickle_cache = pickle_cache if pickle_cache is not None else {}
def __call__(self):
# Set the default nested backend to self._backend but do not set the
# change the default number of processes to -1
with parallel_backend(self._backend, n_jobs=self._n_jobs):
return [func(*args, **kwargs)
for func, args, kwargs in self.items]
def __len__(self):
return self._size
###############################################################################
# CPU count that works also when multiprocessing has been disabled via
# the JOBLIB_MULTIPROCESSING environment variable
def cpu_count():
"""Return the number of CPUs."""
if mp is None:
return 1
return loky.cpu_count()
###############################################################################
# For verbosity
def _verbosity_filter(index, verbose):
""" Returns False for indices increasingly apart, the distance
depending on the value of verbose.
We use a lag increasing as the square of index
"""
if not verbose:
return True
elif verbose > 10:
return False
if index == 0:
return False
verbose = .5 * (11 - verbose) ** 2
scale = sqrt(index / verbose)
next_scale = sqrt((index + 1) / verbose)
return (int(next_scale) == int(scale))
###############################################################################
def delayed(function, check_pickle=None):
"""Decorator used to capture the arguments of a function."""
if check_pickle is not None:
warnings.warn('check_pickle is deprecated in joblib 0.12 and will be'
' removed in 0.13', DeprecationWarning)
# Try to pickle the input function, to catch the problems early when
# using with multiprocessing:
if check_pickle:
dumps(function)
def delayed_function(*args, **kwargs):
return function, args, kwargs
try:
delayed_function = functools.wraps(function)(delayed_function)
except AttributeError:
" functools.wraps fails on some callable objects "
return delayed_function
###############################################################################
class BatchCompletionCallBack(object):
"""Callback used by joblib.Parallel's multiprocessing backend.
This callable is executed by the parent process whenever a worker process
has returned the results of a batch of tasks.
It is used for progress reporting, to update estimate of the batch
processing duration and to schedule the next batch of tasks to be
processed.
"""
def __init__(self, dispatch_timestamp, batch_size, parallel):
self.dispatch_timestamp = dispatch_timestamp
self.batch_size = batch_size
self.parallel = parallel
def __call__(self, out):
self.parallel.n_completed_tasks += self.batch_size
this_batch_duration = time.time() - self.dispatch_timestamp
self.parallel._backend.batch_completed(self.batch_size,
this_batch_duration)
self.parallel.print_progress()
with self.parallel._lock:
if self.parallel._original_iterator is not None:
self.parallel.dispatch_next()
###############################################################################
def register_parallel_backend(name, factory, make_default=False):
"""Register a new Parallel backend factory.
Loading ...