Skip to content

Commit 8f1f294

Browse files
authored
Merge pull request #3197 from jakkdl/background_with_channel
add `@as_safe_channel`
2 parents 737d96a + 58e03c9 commit 8f1f294

File tree

8 files changed

+437
-51
lines changed

8 files changed

+437
-51
lines changed

docs/source/reference-core.rst

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1607,7 +1607,14 @@ the numbers 0 through 9 with a 1-second delay before each one:
16071607
16081608
trio.run(use_it)
16091609
1610-
Trio supports async generators, with some caveats described in this section.
1610+
Trio supports async generators, but there's several caveats and it's very
1611+
hard to handle them properly. Therefore Trio bundles a helper,
1612+
`trio.as_safe_channel` that does it for you.
1613+
1614+
1615+
.. autofunction:: trio.as_safe_channel
1616+
1617+
The details behind the problems are described in the following sections.
16111618

16121619
Finalization
16131620
~~~~~~~~~~~~
@@ -1737,7 +1744,8 @@ so sometimes you'll get an unhelpful `TrioInternalError`. (And
17371744
sometimes it will seem to work, which is probably the worst outcome of
17381745
all, since then you might not notice the issue until you perform some
17391746
minor refactoring of the generator or the code that's iterating it, or
1740-
just get unlucky. There is a `proposed Python enhancement
1747+
just get unlucky. There is a draft :pep:`789` with accompanying
1748+
`discussion thread
17411749
<https://discuss.python.org/t/preventing-yield-inside-certain-context-managers/1091>`__
17421750
that would at least make it fail consistently.)
17431751

@@ -1753,12 +1761,6 @@ the generator is suspended, what should the background tasks do?
17531761
There's no good way to suspend them, but if they keep running and throw
17541762
an exception, where can that exception be reraised?
17551763

1756-
If you have an async generator that wants to ``yield`` from within a nursery
1757-
or cancel scope, your best bet is to refactor it to be a separate task
1758-
that communicates over memory channels. The ``trio_util`` package offers a
1759-
`decorator that does this for you transparently
1760-
<https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator>`__.
1761-
17621764
For more discussion, see
17631765
Trio issues `264 <https://github.com/python-trio/trio/issues/264>`__
17641766
(especially `this comment

newsfragments/3197.feature.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Add :func:`@trio.as_safe_channel <trio.as_safe_channel>`, a wrapper that can be used to make async generators safe.
2+
This will be the suggested fix for the flake8-async lint rule `ASYNC900 <https://flake8-async.readthedocs.io/en/latest/rules.html#async900>`_.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ exclude_also = [
331331
"@overload",
332332
'class .*\bProtocol\b.*\):',
333333
"raise NotImplementedError",
334+
'.*if "sphinx" in sys.modules:',
334335
'TODO: test this line'
335336
]
336337
partial_branches = [

src/trio/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
MemoryChannelStatistics as MemoryChannelStatistics,
2828
MemoryReceiveChannel as MemoryReceiveChannel,
2929
MemorySendChannel as MemorySendChannel,
30+
as_safe_channel as as_safe_channel,
3031
open_memory_channel as open_memory_channel,
3132
)
3233
from ._core import (

src/trio/_channel.py

Lines changed: 146 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
from __future__ import annotations
22

3+
import sys
34
from collections import OrderedDict, deque
5+
from collections.abc import AsyncGenerator, Callable # noqa: TC003 # Needed for Sphinx
6+
from contextlib import AbstractAsyncContextManager, asynccontextmanager
7+
from functools import wraps
48
from math import inf
59
from typing import (
610
TYPE_CHECKING,
@@ -14,12 +18,31 @@
1418

1519
from ._abc import ReceiveChannel, ReceiveType, SendChannel, SendType, T
1620
from ._core import Abort, RaiseCancelT, Task, enable_ki_protection
17-
from ._util import NoPublicConstructor, final, generic_function
21+
from ._util import (
22+
MultipleExceptionError,
23+
NoPublicConstructor,
24+
final,
25+
generic_function,
26+
raise_single_exception_from_group,
27+
)
28+
29+
if sys.version_info < (3, 11):
30+
from exceptiongroup import BaseExceptionGroup
1831

1932
if TYPE_CHECKING:
2033
from types import TracebackType
2134

22-
from typing_extensions import Self
35+
from typing_extensions import ParamSpec, Self
36+
37+
P = ParamSpec("P")
38+
elif "sphinx" in sys.modules:
39+
# P needs to exist for Sphinx to parse the type hints successfully.
40+
try:
41+
from typing_extensions import ParamSpec
42+
except ImportError:
43+
P = ... # This is valid in Callable, though not correct
44+
else:
45+
P = ParamSpec("P")
2346

2447

2548
def _open_memory_channel(
@@ -440,3 +463,124 @@ async def aclose(self) -> None:
440463
See `MemoryReceiveChannel.close`."""
441464
self.close()
442465
await trio.lowlevel.checkpoint()
466+
467+
468+
class RecvChanWrapper(ReceiveChannel[T]):
469+
def __init__(
470+
self, recv_chan: MemoryReceiveChannel[T], send_semaphore: trio.Semaphore
471+
) -> None:
472+
self._recv_chan = recv_chan
473+
self._send_semaphore = send_semaphore
474+
475+
async def receive(self) -> T:
476+
self._send_semaphore.release()
477+
return await self._recv_chan.receive()
478+
479+
async def aclose(self) -> None:
480+
await self._recv_chan.aclose()
481+
482+
def __enter__(self) -> Self:
483+
return self
484+
485+
def __exit__(
486+
self,
487+
exc_type: type[BaseException] | None,
488+
exc_value: BaseException | None,
489+
traceback: TracebackType | None,
490+
) -> None:
491+
self._recv_chan.close()
492+
493+
494+
def as_safe_channel(
495+
fn: Callable[P, AsyncGenerator[T, None]],
496+
) -> Callable[P, AbstractAsyncContextManager[ReceiveChannel[T]]]:
497+
"""Decorate an async generator function to make it cancellation-safe.
498+
499+
The ``yield`` keyword offers a very convenient way to write iterators...
500+
which makes it really unfortunate that async generators are so difficult
501+
to call correctly. Yielding from the inside of a cancel scope or a nursery
502+
to the outside `violates structured concurrency <https://xkcd.com/292/>`_
503+
with consequences explained in :pep:`789`. Even then, resource cleanup
504+
errors remain common (:pep:`533`) unless you wrap every call in
505+
:func:`~contextlib.aclosing`.
506+
507+
This decorator gives you the best of both worlds: with careful exception
508+
handling and a background task we preserve structured concurrency by
509+
offering only the safe interface, and you can still write your iterables
510+
with the convenience of ``yield``. For example::
511+
512+
@as_safe_channel
513+
async def my_async_iterable(arg, *, kwarg=True):
514+
while ...:
515+
item = await ...
516+
yield item
517+
518+
async with my_async_iterable(...) as recv_chan:
519+
async for item in recv_chan:
520+
...
521+
522+
While the combined async-with-async-for can be inconvenient at first,
523+
the context manager is indispensable for both correctness and for prompt
524+
cleanup of resources.
525+
"""
526+
# Perhaps a future PEP will adopt `async with for` syntax, like
527+
# https://coconut.readthedocs.io/en/master/DOCS.html#async-with-for
528+
529+
@asynccontextmanager
530+
@wraps(fn)
531+
async def context_manager(
532+
*args: P.args, **kwargs: P.kwargs
533+
) -> AsyncGenerator[trio._channel.RecvChanWrapper[T], None]:
534+
send_chan, recv_chan = trio.open_memory_channel[T](0)
535+
try:
536+
async with trio.open_nursery(strict_exception_groups=True) as nursery:
537+
agen = fn(*args, **kwargs)
538+
send_semaphore = trio.Semaphore(0)
539+
# `nursery.start` to make sure that we will clean up send_chan & agen
540+
# If this errors we don't close `recv_chan`, but the caller
541+
# never gets access to it, so that's not a problem.
542+
await nursery.start(
543+
_move_elems_to_channel, agen, send_chan, send_semaphore
544+
)
545+
# `async with recv_chan` could eat exceptions, so use sync cm
546+
with RecvChanWrapper(recv_chan, send_semaphore) as wrapped_recv_chan:
547+
yield wrapped_recv_chan
548+
# User has exited context manager, cancel to immediately close the
549+
# abandoned generator if it's still alive.
550+
nursery.cancel_scope.cancel()
551+
except BaseExceptionGroup as eg:
552+
try:
553+
raise_single_exception_from_group(eg)
554+
except MultipleExceptionError:
555+
# In case user has except* we make it possible for them to handle the
556+
# exceptions.
557+
raise BaseExceptionGroup(
558+
"Encountered exception during cleanup of generator object, as well as exception in the contextmanager body - unable to unwrap.",
559+
[eg],
560+
) from None
561+
562+
async def _move_elems_to_channel(
563+
agen: AsyncGenerator[T, None],
564+
send_chan: trio.MemorySendChannel[T],
565+
send_semaphore: trio.Semaphore,
566+
task_status: trio.TaskStatus,
567+
) -> None:
568+
# `async with send_chan` will eat exceptions,
569+
# see https://github.com/python-trio/trio/issues/1559
570+
with send_chan:
571+
try:
572+
task_status.started()
573+
while True:
574+
# wait for receiver to call next on the aiter
575+
await send_semaphore.acquire()
576+
try:
577+
value = await agen.__anext__()
578+
except StopAsyncIteration:
579+
return
580+
# Send the value to the channel
581+
await send_chan.send(value)
582+
finally:
583+
# replace try-finally with contextlib.aclosing once python39 is dropped
584+
await agen.aclose()
585+
586+
return context_manager

0 commit comments

Comments
 (0)