Skip to content

Commit 1c0704d

Browse files
authored
Merge pull request #108 from oremanj/cancel-aio-tasks
Cancel and join asyncio tasks when exiting an open_loop() block
2 parents dd4476b + 50617c8 commit 1c0704d

File tree

4 files changed

+108
-10
lines changed

4 files changed

+108
-10
lines changed

newsfragments/91.feature.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
Exiting an ``async with trio_asyncio.open_loop():`` block now cancels
2+
any asyncio tasks that are still running in the background, like
3+
:func:`asyncio.run` does, so that they have a chance to clean up
4+
resources by running async context managers and ``finally``
5+
blocks. Previously such tasks would simply be abandoned to the garbage
6+
collector, resulting in potential deadlocks and stderr spew. Note that,
7+
like :func:`asyncio.run`, we *do* still abandon any tasks that are
8+
started during this finalization phase and outlive the existing tasks.

tests/interop/test_calls.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pytest
22
import asyncio
33
import trio
4+
import trio.testing
45
import sniffio
56
from trio_asyncio import aio_as_trio, trio_as_aio, run_aio_generator
67
from tests import aiotest
@@ -331,16 +332,13 @@ async def dly_asyncio(hold, seen):
331332
seen.flag |= 1
332333
await hold.wait()
333334

334-
async def cancel_soon(nursery):
335-
await trio.sleep(0.01)
336-
nursery.cancel_scope.cancel()
337-
338335
hold = asyncio.Event()
339336
seen = Seen()
340337

341338
async with trio.open_nursery() as nursery:
342339
nursery.start_soon(async_gen_to_list, run_aio_generator(loop, dly_asyncio(hold, seen)))
343-
nursery.start_soon(cancel_soon, nursery)
340+
await trio.testing.wait_all_tasks_blocked()
341+
nursery.cancel_scope.cancel()
344342
assert nursery.cancel_scope.cancel_called
345343
assert seen.flag == 1
346344

tests/test_trio_asyncio.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import pytest
22
import sys
3+
import types
34
import asyncio
5+
import trio
46
from async_generator import async_generator, yield_
57
import trio_asyncio
68

@@ -47,3 +49,53 @@ async def test_get_running_loop():
4749
pass # Python 3.6
4850
else:
4951
assert get_running_loop() == loop
52+
53+
54+
@pytest.mark.trio
55+
async def test_tasks_get_cancelled():
56+
record = []
57+
tasks = []
58+
59+
@types.coroutine
60+
def aio_yield():
61+
yield
62+
63+
async def aio_sleeper(key):
64+
try:
65+
await asyncio.sleep(10)
66+
record.append("expired")
67+
finally:
68+
try:
69+
# Prove that we're still running in the aio loop, not
70+
# some GC pass
71+
await aio_yield()
72+
finally:
73+
record.append(key)
74+
if "early" in key:
75+
tasks.append(asyncio.ensure_future(aio_sleeper("aio late")))
76+
asyncio.get_event_loop().run_trio_task(trio_sleeper, "trio late")
77+
78+
async def trio_sleeper(key):
79+
try:
80+
await trio.sleep_forever()
81+
finally:
82+
await trio.lowlevel.cancel_shielded_checkpoint()
83+
record.append(key)
84+
85+
async with trio_asyncio.open_loop() as loop:
86+
tasks.append(asyncio.ensure_future(aio_sleeper("aio early")))
87+
loop.run_trio_task(trio_sleeper, "trio early")
88+
89+
assert set(record) == {"aio early", "trio early", "trio late"}
90+
assert len(tasks) == 2 and tasks[0].done() and not tasks[1].done()
91+
92+
# Suppress "Task was destroyed but it was pending!" message
93+
tasks[1]._log_traceback = False
94+
tasks[1]._log_destroy_pending = False
95+
96+
# Suppress the "coroutine ignored GeneratorExit" message
97+
while True:
98+
try:
99+
tasks[1]._coro.throw(SystemExit)
100+
except SystemExit:
101+
break

trio_asyncio/_loop.py

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from async_generator import asynccontextmanager
1414

1515
from ._async import TrioEventLoop
16+
from ._util import run_aio_future
1617
from ._deprecate import warn_deprecated
1718

1819
try:
@@ -375,11 +376,33 @@ async def open_loop(queue_len=None):
375376
"""Returns a Trio-flavored async context manager which provides
376377
an asyncio event loop running on top of Trio.
377378
379+
The context manager evaluates to a new `TrioEventLoop` object.
380+
378381
Entering the context manager is not enough on its own to immediately
379382
run asyncio code; it just provides the context that makes running that
380383
code possible. You additionally need to wrap any asyncio functions
381384
that you want to run in :func:`aio_as_trio`.
382385
386+
Exiting the context manager will attempt to do an orderly shutdown
387+
of the tasks it contains, analogously to :func:`asyncio.run`.
388+
Both asyncio-flavored tasks and Trio-flavored tasks (the latter
389+
started using :meth:`~BaseTrioEventLoop.trio_as_future`,
390+
:meth:`~BaseTrioEventLoop.run_trio_task`, :func:`trio_as_aio`,
391+
etc) are cancelled simultaneously, and the loop waits for them to
392+
exit in response to this cancellation before proceeding. All
393+
:meth:`~asyncio.loop.call_soon` callbacks that are submitted
394+
before exiting the context manager will run before starting this
395+
shutdown sequence, and all callbacks that are submitted before the
396+
last task exits will run before the loop closes. The exact point
397+
at which the loop stops running callbacks is not specified.
398+
399+
.. warning:: As with :func:`asyncio.run`, asyncio-flavored tasks
400+
that are started *after* exiting the context manager (such as by
401+
another task as it unwinds) may or may not be cancelled, and will
402+
be abandoned if they survive the shutdown sequence. This may lead
403+
to unclosed resources, stderr spew about "coroutine ignored
404+
GeneratorExit", etc. Trio-flavored tasks do not have this hazard.
405+
383406
Example usage::
384407
385408
async def async_main(*args):
@@ -406,12 +429,11 @@ async def async_main(*args):
406429
await loop._main_loop_init(tasks_nursery)
407430
await loop_nursery.start(loop._main_loop)
408431
yield loop
409-
tasks_nursery.cancel_scope.cancel()
410432

411-
# Allow all submitted run_trio() tasks calls a chance
412-
# to start before the tasks_nursery closes, unless the
413-
# loop stops (due to someone else calling stop())
414-
# before that:
433+
# Allow all already-submitted tasks a chance to start
434+
# (and then immediately be cancelled), unless the loop
435+
# stops (due to someone else calling stop()) before
436+
# that.
415437
async with trio.open_nursery() as sync_nursery:
416438
sync_nursery.cancel_scope.shield = True
417439

@@ -423,6 +445,24 @@ async def wait_for_sync():
423445

424446
await loop.wait_stopped()
425447
sync_nursery.cancel_scope.cancel()
448+
449+
# Cancel and wait on all currently-running tasks.
450+
# Exiting the tasks_nursery will wait for the Trio tasks
451+
# automatically; we mix in the asyncio tasks by scheduling
452+
# a call to run_aio_future() for each one. It's important
453+
# not to wait on one kind of task before the other, so that
454+
# we support Trio tasks that need to run some asyncio
455+
# code during teardown as well as the opposite.
456+
# Like asyncio.run(), we don't bother cancelling and waiting
457+
# on any additional asyncio tasks that these tasks start as they
458+
# unwind.
459+
if sys.version_info >= (3, 7):
460+
aio_tasks = asyncio.all_tasks(loop)
461+
else:
462+
aio_tasks = {t for t in asyncio.Task.all_tasks(loop) if not t.done()}
463+
for task in aio_tasks:
464+
tasks_nursery.start_soon(run_aio_future, task)
465+
tasks_nursery.cancel_scope.cancel()
426466
finally:
427467
try:
428468
await loop._main_loop_exit()

0 commit comments

Comments
 (0)