Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / joblib   python

Repository URL to install this package:

/ parallel.py

"""
Helpers for embarrassingly parallel code.
"""
# Author: Gael Varoquaux < gael dot varoquaux at normalesup dot org >
# Copyright: 2010, Gael Varoquaux
# License: BSD 3 clause

from __future__ import division

import os
import sys
from math import sqrt
import functools
import time
import threading
import itertools
from numbers import Integral
import warnings

from ._multiprocessing_helpers import mp

from .format_stack import format_outer_frames
from .logger import Logger, short_format_time
from .my_exceptions import TransportableException
from .disk import memstr_to_bytes
from ._parallel_backends import (FallbackToBackend, MultiprocessingBackend,
                                 ThreadingBackend, SequentialBackend,
                                 LokyBackend)
from ._compat import _basestring
from .externals.cloudpickle import dumps, loads
from .externals import loky

# Make sure that those two classes are part of the public joblib.parallel API
# so that 3rd party backend implementers can import them from here.
from ._parallel_backends import AutoBatchingMixin  # noqa
from ._parallel_backends import ParallelBackendBase  # noqa

try:
    import queue
except ImportError:  # backward compat for Python 2
    import Queue as queue

BACKENDS = {
    'multiprocessing': MultiprocessingBackend,
    'threading': ThreadingBackend,
    'sequential': SequentialBackend,
    'loky': LokyBackend,
}
# name of the backend used by default by Parallel outside of any context
# managed by ``parallel_backend``.
DEFAULT_BACKEND = 'loky'
DEFAULT_N_JOBS = 1
DEFAULT_THREAD_BACKEND = 'threading'

# Thread local value that can be overridden by the ``parallel_backend`` context
# manager
_backend = threading.local()

VALID_BACKEND_HINTS = ('processes', 'threads', None)
VALID_BACKEND_CONSTRAINTS = ('sharedmem', None)


def _register_dask():
    """ Register Dask Backend if called with parallel_backend("dask") """
    try:
        from ._dask import DaskDistributedBackend
        register_parallel_backend('dask', DaskDistributedBackend)
    except ImportError:
        msg = ("To use the dask.distributed backend you must install both "
               "the `dask` and distributed modules.\n\n"
               "See https://dask.pydata.org/en/latest/install.html for more "
               "information.")
        raise ImportError(msg)


EXTERNAL_BACKENDS = {
    'dask': _register_dask,
}


def get_active_backend(prefer=None, require=None, verbose=0):
    """Return the active default backend"""
    if prefer not in VALID_BACKEND_HINTS:
        raise ValueError("prefer=%r is not a valid backend hint, "
                         "expected one of %r" % (prefer, VALID_BACKEND_HINTS))
    if require not in VALID_BACKEND_CONSTRAINTS:
        raise ValueError("require=%r is not a valid backend constraint, "
                         "expected one of %r"
                         % (require, VALID_BACKEND_CONSTRAINTS))

    if prefer == 'processes' and require == 'sharedmem':
        raise ValueError("prefer == 'processes' and require == 'sharedmem'"
                         " are inconsistent settings")
    backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
    if backend_and_jobs is not None:
        # Try to use the backend set by the user with the context manager.
        backend, n_jobs = backend_and_jobs
        nesting_level = backend.nesting_level
        supports_sharedmem = getattr(backend, 'supports_sharedmem', False)
        if require == 'sharedmem' and not supports_sharedmem:
            # This backend does not match the shared memory constraint:
            # fallback to the default thead-based backend.
            sharedmem_backend = BACKENDS[DEFAULT_THREAD_BACKEND](
                nesting_level=nesting_level)
            if verbose >= 10:
                print("Using %s as joblib.Parallel backend instead of %s "
                      "as the latter does not provide shared memory semantics."
                      % (sharedmem_backend.__class__.__name__,
                         backend.__class__.__name__))
            return sharedmem_backend, DEFAULT_N_JOBS
        else:
            return backend_and_jobs

    # We are outside of the scope of any parallel_backend context manager,
    # create the default backend instance now.
    backend = BACKENDS[DEFAULT_BACKEND](nesting_level=0)
    supports_sharedmem = getattr(backend, 'supports_sharedmem', False)
    uses_threads = getattr(backend, 'uses_threads', False)
    if ((require == 'sharedmem' and not supports_sharedmem) or
            (prefer == 'threads' and not uses_threads)):
        # Make sure the selected default backend match the soft hints and
        # hard constraints:
        backend = BACKENDS[DEFAULT_THREAD_BACKEND](nesting_level=0)
    return backend, DEFAULT_N_JOBS


class parallel_backend(object):
    """Change the default backend used by Parallel inside a with block.

    If ``backend`` is a string it must match a previously registered
    implementation using the ``register_parallel_backend`` function.

    By default the following backends are available:

    - 'loky': single-host, process-based parallelism (used by default),
    - 'threading': single-host, thread-based parallelism,
    - 'multiprocessing': legacy single-host, process-based parallelism.

    'loky' is recommended to run functions that manipulate Python objects.
    'threading' is a low-overhead alternative that is most efficient for
    functions that release the Global Interpreter Lock: e.g. I/O-bound code or
    CPU-bound code in a few calls to native code that explicitly releases the
    GIL.

    In addition, if the `dask` and `distributed` Python packages are installed,
    it is possible to use the 'dask' backend for better scheduling of nested
    parallel calls without over-subscription and potentially distribute
    parallel calls over a networked cluster of several hosts.

    Alternatively the backend can be passed directly as an instance.

    By default all available workers will be used (``n_jobs=-1``) unless the
    caller passes an explicit value for the ``n_jobs`` parameter.

    This is an alternative to passing a ``backend='backend_name'`` argument to
    the ``Parallel`` class constructor. It is particularly useful when calling
    into library code that uses joblib internally but does not expose the
    backend argument in its own API.

    >>> from operator import neg
    >>> with parallel_backend('threading'):
    ...     print(Parallel()(delayed(neg)(i + 1) for i in range(5)))
    ...
    [-1, -2, -3, -4, -5]

    Warning: this function is experimental and subject to change in a future
    version of joblib.

    Joblib also tries to limit the oversubscription by limiting the number of
    threads usable in some third-party library threadpools like OpenBLAS, MKL
    or OpenMP. The default limit in each worker is set to
    ``max(cpu_count() // effective_n_jobs, 1)`` but this limit can be
    overwritten with the ``inner_max_num_threads`` argument which will be used
    to set this limit in the child processes.

    .. versionadded:: 0.10

    """
    def __init__(self, backend, n_jobs=-1, inner_max_num_threads=None,
                 **backend_params):
        if isinstance(backend, _basestring):
            if backend not in BACKENDS and backend in EXTERNAL_BACKENDS:
                register = EXTERNAL_BACKENDS[backend]
                register()

            backend = BACKENDS[backend](**backend_params)

        if inner_max_num_threads is not None:
            msg = ("{} does not accept setting the inner_max_num_threads "
                   "argument.".format(backend.__class__.__name__))
            assert backend.supports_inner_max_num_threads, msg
            backend.inner_max_num_threads = inner_max_num_threads

        # If the nesting_level of the backend is not set previously, use the
        # nesting level from the previous active_backend to set it
        current_backend_and_jobs = getattr(_backend, 'backend_and_jobs', None)
        if backend.nesting_level is None:
            if current_backend_and_jobs is None:
                nesting_level = 0
            else:
                nesting_level = current_backend_and_jobs[0].nesting_level

            backend.nesting_level = nesting_level

        # Save the backends info and set the active backend
        self.old_backend_and_jobs = current_backend_and_jobs
        self.new_backend_and_jobs = (backend, n_jobs)

        _backend.backend_and_jobs = (backend, n_jobs)

    def __enter__(self):
        return self.new_backend_and_jobs

    def __exit__(self, type, value, traceback):
        self.unregister()

    def unregister(self):
        if self.old_backend_and_jobs is None:
            if getattr(_backend, 'backend_and_jobs', None) is not None:
                del _backend.backend_and_jobs
        else:
            _backend.backend_and_jobs = self.old_backend_and_jobs


# Under Linux or OS X the default start method of multiprocessing
# can cause third party libraries to crash. Under Python 3.4+ it is possible
# to set an environment variable to switch the default start method from
# 'fork' to 'forkserver' or 'spawn' to avoid this issue albeit at the cost
# of causing semantic changes and some additional pool instantiation overhead.
DEFAULT_MP_CONTEXT = None
if hasattr(mp, 'get_context'):
    method = os.environ.get('JOBLIB_START_METHOD', '').strip() or None
    if method is not None:
        DEFAULT_MP_CONTEXT = mp.get_context(method=method)


class BatchedCalls(object):
    """Wrap a sequence of (func, args, kwargs) tuples as a single callable"""

    def __init__(self, iterator_slice, backend_and_jobs, pickle_cache=None):
        self.items = list(iterator_slice)
        self._size = len(self.items)
        if isinstance(backend_and_jobs, tuple):
            self._backend, self._n_jobs = backend_and_jobs
        else:
            # this is for backward compatibility purposes. Before 0.12.6,
            # nested backends were returned without n_jobs indications.
            self._backend, self._n_jobs = backend_and_jobs, None
        self._pickle_cache = pickle_cache if pickle_cache is not None else {}

    def __call__(self):
        # Set the default nested backend to self._backend but do not set the
        # change the default number of processes to -1
        with parallel_backend(self._backend, n_jobs=self._n_jobs):
            return [func(*args, **kwargs)
                    for func, args, kwargs in self.items]

    def __len__(self):
        return self._size


###############################################################################
# CPU count that works also when multiprocessing has been disabled via
# the JOBLIB_MULTIPROCESSING environment variable
def cpu_count():
    """Return the number of CPUs."""
    if mp is None:
        return 1

    return loky.cpu_count()


###############################################################################
# For verbosity

def _verbosity_filter(index, verbose):
    """ Returns False for indices increasingly apart, the distance
        depending on the value of verbose.

        We use a lag increasing as the square of index
    """
    if not verbose:
        return True
    elif verbose > 10:
        return False
    if index == 0:
        return False
    verbose = .5 * (11 - verbose) ** 2
    scale = sqrt(index / verbose)
    next_scale = sqrt((index + 1) / verbose)
    return (int(next_scale) == int(scale))


###############################################################################
def delayed(function, check_pickle=None):
    """Decorator used to capture the arguments of a function."""
    if check_pickle is not None:
        warnings.warn('check_pickle is deprecated in joblib 0.12 and will be'
                      ' removed in 0.13', DeprecationWarning)
    # Try to pickle the input function, to catch the problems early when
    # using with multiprocessing:
    if check_pickle:
        dumps(function)

    def delayed_function(*args, **kwargs):
        return function, args, kwargs
    try:
        delayed_function = functools.wraps(function)(delayed_function)
    except AttributeError:
        " functools.wraps fails on some callable objects "
    return delayed_function


###############################################################################
class BatchCompletionCallBack(object):
    """Callback used by joblib.Parallel's multiprocessing backend.

    This callable is executed by the parent process whenever a worker process
    has returned the results of a batch of tasks.

    It is used for progress reporting, to update estimate of the batch
    processing duration and to schedule the next batch of tasks to be
    processed.

    """
    def __init__(self, dispatch_timestamp, batch_size, parallel):
        self.dispatch_timestamp = dispatch_timestamp
        self.batch_size = batch_size
        self.parallel = parallel

    def __call__(self, out):
        self.parallel.n_completed_tasks += self.batch_size
        this_batch_duration = time.time() - self.dispatch_timestamp

        self.parallel._backend.batch_completed(self.batch_size,
                                               this_batch_duration)
        self.parallel.print_progress()
        with self.parallel._lock:
            if self.parallel._original_iterator is not None:
                self.parallel.dispatch_next()


###############################################################################
def register_parallel_backend(name, factory, make_default=False):
    """Register a new Parallel backend factory.
Loading ...