Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / joblib   python

Repository URL to install this package:

Version: 0.14.1 

/ externals / loky / _base.py

###############################################################################
# Backport concurrent.futures for python2.7/3.3
#
# author: Thomas Moreau and Olivier Grisel
#
# adapted from concurrent/futures/_base.py (17/02/2017)
#  * Do not use yield from
#  * Use old super syntax
#
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.

import sys
import time
import logging
import threading
import collections


if sys.version_info[:2] >= (3, 3):

    from concurrent.futures import wait, as_completed
    from concurrent.futures import TimeoutError, CancelledError
    from concurrent.futures import Executor, Future as _BaseFuture

    from concurrent.futures import FIRST_EXCEPTION
    from concurrent.futures import ALL_COMPLETED, FIRST_COMPLETED

    from concurrent.futures._base import LOGGER
    from concurrent.futures._base import PENDING, RUNNING, CANCELLED
    from concurrent.futures._base import CANCELLED_AND_NOTIFIED, FINISHED
else:

    FIRST_COMPLETED = 'FIRST_COMPLETED'
    FIRST_EXCEPTION = 'FIRST_EXCEPTION'
    ALL_COMPLETED = 'ALL_COMPLETED'
    _AS_COMPLETED = '_AS_COMPLETED'

    # Possible future states (for internal use by the futures package).
    PENDING = 'PENDING'
    RUNNING = 'RUNNING'
    # The future was cancelled by the user...
    CANCELLED = 'CANCELLED'
    # ...and _Waiter.add_cancelled() was called by a worker.
    CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
    FINISHED = 'FINISHED'

    _FUTURE_STATES = [
        PENDING,
        RUNNING,
        CANCELLED,
        CANCELLED_AND_NOTIFIED,
        FINISHED
    ]

    _STATE_TO_DESCRIPTION_MAP = {
        PENDING: "pending",
        RUNNING: "running",
        CANCELLED: "cancelled",
        CANCELLED_AND_NOTIFIED: "cancelled",
        FINISHED: "finished"
    }

    # Logger for internal use by the futures package.
    LOGGER = logging.getLogger("concurrent.futures")

    class Error(Exception):
        """Base class for all future-related exceptions."""
        pass

    class CancelledError(Error):
        """The Future was cancelled."""
        pass

    class TimeoutError(Error):
        """The operation exceeded the given deadline."""
        pass

    class _Waiter(object):
        """Provides the event that wait() and as_completed() block on."""
        def __init__(self):
            self.event = threading.Event()
            self.finished_futures = []

        def add_result(self, future):
            self.finished_futures.append(future)

        def add_exception(self, future):
            self.finished_futures.append(future)

        def add_cancelled(self, future):
            self.finished_futures.append(future)

    class _AsCompletedWaiter(_Waiter):
        """Used by as_completed()."""

        def __init__(self):
            super(_AsCompletedWaiter, self).__init__()
            self.lock = threading.Lock()

        def add_result(self, future):
            with self.lock:
                super(_AsCompletedWaiter, self).add_result(future)
                self.event.set()

        def add_exception(self, future):
            with self.lock:
                super(_AsCompletedWaiter, self).add_exception(future)
                self.event.set()

        def add_cancelled(self, future):
            with self.lock:
                super(_AsCompletedWaiter, self).add_cancelled(future)
                self.event.set()

    class _FirstCompletedWaiter(_Waiter):
        """Used by wait(return_when=FIRST_COMPLETED)."""

        def add_result(self, future):
            super(_FirstCompletedWaiter, self).add_result(future)
            self.event.set()

        def add_exception(self, future):
            super(_FirstCompletedWaiter, self).add_exception(future)
            self.event.set()

        def add_cancelled(self, future):
            super(_FirstCompletedWaiter, self).add_cancelled(future)
            self.event.set()

    class _AllCompletedWaiter(_Waiter):
        """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""

        def __init__(self, num_pending_calls, stop_on_exception):
            self.num_pending_calls = num_pending_calls
            self.stop_on_exception = stop_on_exception
            self.lock = threading.Lock()
            super(_AllCompletedWaiter, self).__init__()

        def _decrement_pending_calls(self):
            with self.lock:
                self.num_pending_calls -= 1
                if not self.num_pending_calls:
                    self.event.set()

        def add_result(self, future):
            super(_AllCompletedWaiter, self).add_result(future)
            self._decrement_pending_calls()

        def add_exception(self, future):
            super(_AllCompletedWaiter, self).add_exception(future)
            if self.stop_on_exception:
                self.event.set()
            else:
                self._decrement_pending_calls()

        def add_cancelled(self, future):
            super(_AllCompletedWaiter, self).add_cancelled(future)
            self._decrement_pending_calls()

    class _AcquireFutures(object):
        """A context manager that does an ordered acquire of Future conditions.
        """

        def __init__(self, futures):
            self.futures = sorted(futures, key=id)

        def __enter__(self):
            for future in self.futures:
                future._condition.acquire()

        def __exit__(self, *args):
            for future in self.futures:
                future._condition.release()

    def _create_and_install_waiters(fs, return_when):
        if return_when == _AS_COMPLETED:
            waiter = _AsCompletedWaiter()
        elif return_when == FIRST_COMPLETED:
            waiter = _FirstCompletedWaiter()
        else:
            pending_count = sum(
                    f._state not in [CANCELLED_AND_NOTIFIED, FINISHED]
                    for f in fs)

            if return_when == FIRST_EXCEPTION:
                waiter = _AllCompletedWaiter(pending_count,
                                             stop_on_exception=True)
            elif return_when == ALL_COMPLETED:
                waiter = _AllCompletedWaiter(pending_count,
                                             stop_on_exception=False)
            else:
                raise ValueError("Invalid return condition: %r" % return_when)

        for f in fs:
            f._waiters.append(waiter)

        return waiter

    def as_completed(fs, timeout=None):
        """An iterator over the given futures that yields each as it completes.

        Args:
            fs: The sequence of Futures (possibly created by different
                Executors) to iterate over.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.

        Returns:
            An iterator that yields the given Futures as they complete
            (finished or cancelled). If any given Futures are duplicated, they
            will be returned once.

        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
        """
        if timeout is not None:
            end_time = timeout + time.time()

        fs = set(fs)
        with _AcquireFutures(fs):
            finished = set(
                    f for f in fs
                    if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
            pending = fs - finished
            waiter = _create_and_install_waiters(fs, _AS_COMPLETED)

        try:
            for future in finished:
                yield future

            while pending:
                if timeout is None:
                    wait_timeout = None
                else:
                    wait_timeout = end_time - time.time()
                    if wait_timeout < 0:
                        raise TimeoutError('%d (of %d) futures unfinished' % (
                            len(pending), len(fs)))

                waiter.event.wait(wait_timeout)

                with waiter.lock:
                    finished = waiter.finished_futures
                    waiter.finished_futures = []
                    waiter.event.clear()

                for future in finished:
                    yield future
                    pending.remove(future)

        finally:
            for f in fs:
                with f._condition:
                    f._waiters.remove(waiter)

    DoneAndNotDoneFutures = collections.namedtuple(
            'DoneAndNotDoneFutures', 'done not_done')

    def wait(fs, timeout=None, return_when=ALL_COMPLETED):
        """Wait for the futures in the given sequence to complete.

        Args:
            fs: The sequence of Futures (possibly created by different
                Executors) to wait upon.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.
            return_when: Indicates when this function should return. The
                options are:

                FIRST_COMPLETED - Return when any future finishes or is
                                cancelled.
                FIRST_EXCEPTION - Return when any future finishes by raising an
                                exception. If no future raises an exception
                                then it is equivalent to ALL_COMPLETED.
                ALL_COMPLETED -   Return when all futures finish or are
                                cancelled.

        Returns:
            A named 2-tuple of sets. The first set, named 'done', contains the
            futures that completed (is finished or cancelled) before the wait
            completed. The second set, named 'not_done', contains uncompleted
            futures.
        """
        with _AcquireFutures(fs):
            done = set(f for f in fs
                       if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
            not_done = set(fs) - done

            if (return_when == FIRST_COMPLETED) and done:
                return DoneAndNotDoneFutures(done, not_done)
            elif (return_when == FIRST_EXCEPTION) and done:
                if any(f for f in done
                       if not f.cancelled() and f.exception() is not None):
                    return DoneAndNotDoneFutures(done, not_done)

            if len(done) == len(fs):
                return DoneAndNotDoneFutures(done, not_done)

            waiter = _create_and_install_waiters(fs, return_when)

        waiter.event.wait(timeout)
        for f in fs:
            with f._condition:
                f._waiters.remove(waiter)

        done.update(waiter.finished_futures)
        return DoneAndNotDoneFutures(done, set(fs) - done)

    class _BaseFuture(object):
        """Represents the result of an asynchronous computation."""

        def __init__(self):
            """Initializes the future. Should not be called by clients."""
            self._condition = threading.Condition()
            self._state = PENDING
            self._result = None
            self._exception = None
            self._waiters = []
            self._done_callbacks = []

        def __repr__(self):
            with self._condition:
                if self._state == FINISHED:
                    if self._exception:
                        return '<%s at %#x state=%s raised %s>' % (
                            self.__class__.__name__,
                            id(self),
                            _STATE_TO_DESCRIPTION_MAP[self._state],
                            self._exception.__class__.__name__)
                    else:
                        return '<%s at %#x state=%s returned %s>' % (
                            self.__class__.__name__,
                            id(self),
                            _STATE_TO_DESCRIPTION_MAP[self._state],
                            self._result.__class__.__name__)
                return '<%s at %#x state=%s>' % (
                        self.__class__.__name__,
                        id(self),
                        _STATE_TO_DESCRIPTION_MAP[self._state])

        def cancel(self):
            """Cancel the future if possible.
Loading ...