Repository URL to install this package:
|
Version:
2022.10.0 ▾
|
from dask.callbacks import Callback
from dask.local import get_sync
from dask.threaded import get as get_threaded
from dask.utils_test import add
def test_start_callback():
flag = [False]
class MyCallback(Callback):
def _start(self, dsk):
flag[0] = True
with MyCallback():
get_sync({"x": 1}, "x")
assert flag[0] is True
def test_start_state_callback():
flag = [False]
class MyCallback(Callback):
def _start_state(self, dsk, state):
flag[0] = True
assert dsk["x"] == 1
assert len(state["cache"]) == 1
with MyCallback():
get_sync({"x": 1}, "x")
assert flag[0] is True
def test_finish_always_called():
flag = [False]
class MyCallback(Callback):
def _finish(self, dsk, state, errored):
flag[0] = True
assert errored
dsk = {"x": (lambda: 1 / 0,)}
# `raise_on_exception=True`
try:
with MyCallback():
get_sync(dsk, "x")
except Exception as e:
assert isinstance(e, ZeroDivisionError)
assert flag[0]
# `raise_on_exception=False`
flag[0] = False
try:
with MyCallback():
get_threaded(dsk, "x")
except Exception as e:
assert isinstance(e, ZeroDivisionError)
assert flag[0]
# KeyboardInterrupt
def raise_keyboard():
raise KeyboardInterrupt()
dsk = {"x": (raise_keyboard,)}
flag[0] = False
try:
with MyCallback():
get_sync(dsk, "x")
except BaseException as e:
assert isinstance(e, KeyboardInterrupt)
assert flag[0]
def test_nested_schedulers():
class MyCallback(Callback):
def _start(self, dsk):
self.dsk = dsk
def _pretask(self, key, dsk, state):
assert key in self.dsk
inner_callback = MyCallback()
inner_dsk = {"x": (add, 1, 2), "y": (add, "x", 3)}
def nested_call(x):
assert not Callback.active
with inner_callback:
return get_threaded(inner_dsk, "y") + x
outer_callback = MyCallback()
outer_dsk = {"a": (nested_call, 1), "b": (add, "a", 2)}
with outer_callback:
get_threaded(outer_dsk, "b")
assert not Callback.active
assert outer_callback.dsk == outer_dsk
assert inner_callback.dsk == inner_dsk
assert not Callback.active
def test_add_remove_mutates_not_replaces():
assert not Callback.active
with Callback():
assert Callback.active
assert not Callback.active