Skip to content

Commit 845af47

Browse files
committed
Cancel asyncio tasks when exiting the open_loop() block
1 parent b93c320 commit 845af47

File tree

3 files changed

+110
-5
lines changed

3 files changed

+110
-5
lines changed

newsfragments/91.feature.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
Exiting an ``async with trio_asyncio.open_loop():`` block now cancels
2+
any asyncio tasks that are still running in the background, like
3+
:func:`asyncio.run` does, so that they have a chance to clean up
4+
resources by running async context managers and ``finally``
5+
blocks. Previously such tasks would simply be abandoned to the garbage
6+
collector, resulting in potential deadlocks and stderr spew. Note that,
7+
like :func:`asyncio.run`, we *do* still abandon any tasks that are
8+
started during this finalization phase and outlive the existing tasks.

tests/test_trio_asyncio.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import pytest
22
import sys
3+
import types
34
import asyncio
5+
import trio
46
from async_generator import async_generator, yield_
57
import trio_asyncio
68

@@ -47,3 +49,54 @@ async def test_get_running_loop():
4749
pass # Python 3.6
4850
else:
4951
assert get_running_loop() == loop
52+
53+
54+
@pytest.mark.trio
55+
async def test_tasks_get_cancelled():
56+
record = []
57+
tasks = []
58+
59+
@types.coroutine
60+
def aio_yield():
61+
yield
62+
63+
async def aio_sleeper(key):
64+
try:
65+
await asyncio.sleep(10)
66+
record.append("expired")
67+
finally:
68+
try:
69+
# Prove that we're still running in the aio loop, not
70+
# some GC pass
71+
await aio_yield()
72+
finally:
73+
record.append(key)
74+
if "early" in key:
75+
tasks.append(asyncio.ensure_future(aio_sleeper("aio late")))
76+
asyncio.get_event_loop().run_trio_task(trio_sleeper, "trio late")
77+
78+
async def trio_sleeper(key):
79+
try:
80+
await trio.sleep_forever()
81+
finally:
82+
await trio.lowlevel.cancel_shielded_checkpoint()
83+
record.append(key)
84+
85+
async with trio_asyncio.open_loop() as loop:
86+
tasks.append(asyncio.ensure_future(aio_sleeper("aio early")))
87+
loop.run_trio_task(trio_sleeper, "trio early")
88+
89+
assert record[0] == "aio early"
90+
assert set(record[1:]) == {"trio early", "trio late"}
91+
assert len(tasks) == 2 and tasks[0].done() and not tasks[1].done()
92+
93+
# Suppress "Task was destroyed but it was pending!" message
94+
tasks[1]._log_traceback = False
95+
tasks[1]._log_destroy_pending = False
96+
97+
# Suppress the "coroutine ignored GeneratorExit" message
98+
while True:
99+
try:
100+
tasks[1]._coro.throw(SystemExit)
101+
except SystemExit:
102+
break

trio_asyncio/_loop.py

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -377,11 +377,33 @@ async def open_loop(queue_len=None):
377377
"""Returns a Trio-flavored async context manager which provides
378378
an asyncio event loop running on top of Trio.
379379
380+
The context manager evaluates to a new `TrioEventLoop` object.
381+
380382
Entering the context manager is not enough on its own to immediately
381383
run asyncio code; it just provides the context that makes running that
382384
code possible. You additionally need to wrap any asyncio functions
383385
that you want to run in :func:`aio_as_trio`.
384386
387+
Exiting the context manager will attempt to do an orderly shutdown
388+
of the tasks it contains, analogously to :func:`asyncio.run`.
389+
asyncio-flavored tasks are cancelled and awaited first, then
390+
Trio-flavored tasks that were started using
391+
:meth:`~BaseTrioEventLoop.trio_as_future` or
392+
:meth:`~BaseTrioEventLoop.run_trio_task`. All
393+
:meth:`~asyncio.loop.call_soon` callbacks that are submitted
394+
before exiting the context manager will run before starting
395+
this shutdown sequence, and all callbacks that are submitted
396+
before the last task exits will run before the loop closes.
397+
The exact point at which the loop stops running callbacks is
398+
not specified.
399+
400+
.. warning:: As with :func:`asyncio.run`, asyncio-flavored tasks
401+
that are started *after* exiting the context manager (such as by
402+
another task as it unwinds) may or may not be cancelled, and will
403+
be abandoned if they survive the shutdown sequence. This may lead
404+
to unclosed resources, stderr spew about "coroutine ignored
405+
GeneratorExit", etc. Trio-flavored tasks do not have this hazard.
406+
385407
Example usage::
386408
387409
async def async_main(*args):
@@ -408,12 +430,11 @@ async def async_main(*args):
408430
await loop._main_loop_init(tasks_nursery)
409431
await loop_nursery.start(loop._main_loop)
410432
yield loop
411-
tasks_nursery.cancel_scope.cancel()
412433

413-
# Allow all submitted run_trio() tasks calls a chance
414-
# to start before the tasks_nursery closes, unless the
415-
# loop stops (due to someone else calling stop())
416-
# before that:
434+
# Allow all already-submitted tasks a chance to start
435+
# (and then immediately be cancelled), unless the loop
436+
# stops (due to someone else calling stop()) before
437+
# that.
417438
async with trio.open_nursery() as sync_nursery:
418439
sync_nursery.cancel_scope.shield = True
419440

@@ -425,6 +446,29 @@ async def wait_for_sync():
425446

426447
await loop.wait_stopped()
427448
sync_nursery.cancel_scope.cancel()
449+
450+
# Cancel and wait on all currently-running asyncio tasks.
451+
# Like asyncio.run(), we don't bother cancelling and waiting
452+
# on any additional tasks that these tasks start as they
453+
# unwind.
454+
if sys.version_info >= (3, 7):
455+
aio_tasks = asyncio.all_tasks(loop)
456+
else:
457+
aio_tasks = {t for t in asyncio.Task.all_tasks(loop) if not t.done()}
458+
if aio_tasks:
459+
# Start one Trio task to wait for each still-running
460+
# asyncio task. This provides better exception
461+
# propagation than using asyncio.gather().
462+
async with trio.open_nursery() as aio_cancel_nursery:
463+
for task in aio_tasks:
464+
aio_cancel_nursery.start_soon(run_aio_future, task)
465+
aio_cancel_nursery.cancel_scope.cancel()
466+
467+
# If there are any trio_as_aio tasks still going after
468+
# the cancellation of asyncio tasks above, this will
469+
# cancel them, and exiting the tasks_nursery block
470+
# will wait for them to exit.
471+
tasks_nursery.cancel_scope.cancel()
428472
finally:
429473
try:
430474
await loop._main_loop_exit()

0 commit comments

Comments
 (0)