Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
dask / docs / source / scheduler-overview.rst
Size: Mime:
:orphan:

Scheduler Overview
==================

After we create a dask graph, we use a scheduler to run it. Dask currently
implements a few different schedulers:

-  ``dask.threaded.get``: a scheduler backed by a thread pool
-  ``dask.multiprocessing.get``: a scheduler backed by a process pool
-  ``dask.get``: a synchronous scheduler, good for debugging
-  ``distributed.Client.get``: a distributed scheduler for executing graphs
   on multiple machines.  This lives in the external distributed_ project.

.. _distributed: https://distributed.dask.org/en/latest/


The ``get`` function
--------------------

The entry point for all schedulers is a ``get`` function. This takes a dask
graph, and a key or list of keys to compute:

.. code-block:: python

   >>> from operator import add

   >>> dsk = {'a': 1,
   ...        'b': 2,
   ...        'c': (add, 'a', 'b'),
   ...        'd': (sum, ['a', 'b', 'c'])}

   >>> get(dsk, 'c')
   3

   >>> get(dsk, 'd')
   6

   >>> get(dsk, ['a', 'b', 'c'])
   [1, 2, 3]


Using ``compute`` methods
-------------------------

When working with dask collections, you will rarely need to
interact with scheduler ``get`` functions directly. Each collection has a
default scheduler, and a built-in ``compute`` method that calculates the output
of the collection:

.. code-block:: python

    >>> import dask.array as da
    >>> x = da.arange(100, chunks=10)
    >>> x.sum().compute()
    4950

The compute method takes a number of keywords:

- ``scheduler``: the name of the desired scheduler as a string (``"threads"``, ``"processes"``, ``"single-threaded"``, etc.), a ``get`` function, or a ``dask.distributed.Client`` object.  Overrides the default for the collection.
- ``**kwargs``: extra keywords to pass on to the scheduler ``get`` function.

See also: :ref:`configuring-schedulers`.


The ``compute`` function
------------------------

You may wish to compute results from multiple dask collections at once.
Similar to the ``compute`` method on each collection, there is a general
``compute`` function that takes multiple collections and returns multiple
results. This merges the graphs from each collection, so intermediate results
are shared:

.. code-block:: python

    >>> y = (x + 1).sum()
    >>> z = (x + 1).mean()
    >>> da.compute(y, z)    # Compute y and z, sharing intermediate results
    (5050, 50.5)

Here the ``x + 1`` intermediate was only computed once, while calling
``y.compute()`` and ``z.compute()`` would compute it twice. For large graphs
that share many intermediates, this can be a big performance gain.

The ``compute`` function works with any dask collection, and is found in
``dask.base``. For convenience it has also been imported into the top level
namespace of each collection.

.. code-block:: python

    >>> from dask.base import compute
    >>> compute is da.compute
    True


.. _configuring-schedulers:

Configuring the schedulers
--------------------------

The dask collections each have a default scheduler:

- ``dask.array`` and ``dask.dataframe`` use the threaded scheduler by default
- ``dask.bag`` uses the multiprocessing scheduler by default.

For most cases, the default settings are good choices. However, sometimes you
may want to use a different scheduler. There are two ways to do this.

1. Using the ``scheduler`` keyword in the ``compute`` method:

    .. code-block:: python

        >>> x.sum().compute(scheduler='processes')

2. Using ``dask.config.set``. This can be used either as a context manager, or to
   set the scheduler globally:

    .. code-block:: python

        # As a context manager
        >>> with dask.config.set(scheduler='processes'):
        ...     x.sum().compute()

        # Set globally
        >>> dask.config.set(scheduler='processes')
        >>> x.sum().compute()


Additionally, each scheduler may take a few extra keywords specific to that
scheduler. For example, the multiprocessing and threaded schedulers each take a
``num_workers`` keyword, which sets the number of processes or threads to use
(defaults to number of cores). This can be set by passing the keyword when
calling ``compute``:

.. code-block:: python

    # Compute with 4 threads
    >>> x.compute(num_workers=4)

Alternatively, the multiprocessing and threaded schedulers will check for a
global pool set with ``dask.config.set``:

.. code-block:: python

    >>> from concurrent.futures import ThreadPoolExecutor
    >>> with dask.config.set(pool=ThreadPoolExecutor(4)):
    ...     x.compute()

The multiprocessing scheduler also supports `different contexts`_ ("spawn",
"forkserver", "fork") which you can set with ``dask.config.set``. The default
context is "spawn", but you can set a different one:

.. code-block:: python

   >>> with dask.config.set({"multiprocessing.context": "forkserver"}):
   ...     x.compute()

.. _different contexts: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods

For more information on the individual options for each scheduler, see the
docstrings for each scheduler ``get`` function.


Debugging the schedulers
------------------------

Debugging parallel code can be difficult, as conventional tools such as ``pdb``
don't work well with multiple threads or processes. To get around this when
debugging, we recommend using the synchronous scheduler found at
``dask.get``. This runs everything serially, allowing it to work
well with ``pdb``:

.. code-block:: python

    >>> dask.config.set(scheduler='single-threaded')
    >>> x.sum().compute()    # This computation runs serially instead of in parallel


The shared memory schedulers also provide a set of callbacks that can be used
for diagnosing and profiling. You can learn more about scheduler callbacks and
diagnostics :doc:`here <diagnostics-local>`.


More Information
----------------

- See :doc:`shared` for information on the design of the shared memory
  (threaded or multiprocessing) schedulers
- See distributed_ for information on the distributed memory scheduler