Skip to content

Commit 848a5e7

Browse files
authored
Make async executors work in Jupyter notebooks (#661)
1 parent 4fce79a commit 848a5e7

File tree

4 files changed

+22
-4
lines changed

4 files changed

+22
-4
lines changed

cubed/runtime/executors/dask.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from cubed.runtime.pipeline import visit_node_generations, visit_nodes
2323
from cubed.runtime.types import Callback, CubedPipeline, DagExecutor
2424
from cubed.runtime.utils import (
25+
asyncio_run,
2526
execution_stats,
2627
gensym,
2728
handle_callbacks,
@@ -170,7 +171,7 @@ def execute_dag(
170171
**kwargs,
171172
) -> None:
172173
merged_kwargs = {**self.kwargs, **kwargs}
173-
asyncio.run(
174+
asyncio_run(
174175
async_execute_dag(
175176
dag,
176177
callbacks=callbacks,

cubed/runtime/executors/local.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from cubed.runtime.pipeline import visit_node_generations, visit_nodes
1818
from cubed.runtime.types import Callback, CubedPipeline, DagExecutor, TaskEndEvent
1919
from cubed.runtime.utils import (
20+
asyncio_run,
2021
execution_stats,
2122
execution_timing,
2223
handle_callbacks,
@@ -271,7 +272,7 @@ def execute_dag(
271272
**kwargs,
272273
) -> None:
273274
merged_kwargs = {**self.kwargs, **kwargs}
274-
asyncio.run(
275+
asyncio_run(
275276
async_execute_dag(
276277
dag,
277278
callbacks=callbacks,
@@ -310,7 +311,7 @@ def execute_dag(
310311
**kwargs,
311312
) -> None:
312313
merged_kwargs = {**self.kwargs, **kwargs}
313-
asyncio.run(
314+
asyncio_run(
314315
async_execute_dag(
315316
dag,
316317
callbacks=callbacks,

cubed/runtime/executors/modal.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from cubed.runtime.pipeline import visit_node_generations, visit_nodes
1717
from cubed.runtime.types import Callback, DagExecutor
1818
from cubed.runtime.utils import (
19+
asyncio_run,
1920
execute_with_stats,
2021
handle_callbacks,
2122
handle_operation_start_callbacks,
@@ -261,7 +262,7 @@ def execute_dag(
261262
**kwargs,
262263
) -> None:
263264
merged_kwargs = {**self.kwargs, **kwargs}
264-
asyncio.run(
265+
asyncio_run(
265266
async_execute_dag(
266267
dag,
267268
callbacks=callbacks,

cubed/runtime/utils.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import asyncio
12
import time
3+
from concurrent.futures import ThreadPoolExecutor
24
from contextlib import nullcontext
35
from functools import partial
46
from itertools import islice
@@ -115,6 +117,19 @@ def handle_callbacks(callbacks, result, stats):
115117
[callback.on_task_end(event) for callback in callbacks]
116118

117119

120+
# Like asyncio.run(), but works in a Jupyter notebook
121+
# Based on https://stackoverflow.com/a/75341431
122+
def asyncio_run(coro):
123+
try:
124+
asyncio.get_running_loop() # Triggers RuntimeError if no running event loop
125+
except RuntimeError:
126+
return asyncio.run(coro)
127+
else:
128+
# Create a separate thread so we can block before returning
129+
with ThreadPoolExecutor(1) as pool:
130+
return pool.submit(lambda: asyncio.run(coro)).result()
131+
132+
118133
# this will be in Python 3.12 https://docs.python.org/3.12/library/itertools.html#itertools.batched
119134
def batched(iterable, n):
120135
# batched('ABCDEFG', 3) --> ABC DEF G

0 commit comments

Comments
 (0)