Skip to content

Commit fd153f2

Browse files
authored
Merge pull request #1115 from njsmith/rework-channels
Refactor channel interfaces
2 parents c8e5bf5 + d2eb09c commit fd153f2

File tree

8 files changed

+185
-157
lines changed

8 files changed

+185
-157
lines changed

docs/source/reference-core.rst

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1109,7 +1109,7 @@ Using channels to pass values between tasks
11091109
different tasks. They're particularly useful for implementing
11101110
producer/consumer patterns.
11111111

1112-
The channel API is defined by the abstract base classes
1112+
The core channel API is defined by the abstract base classes
11131113
:class:`trio.abc.SendChannel` and :class:`trio.abc.ReceiveChannel`.
11141114
You can use these to implement your own custom channels, that do
11151115
things like pass objects between processes or over the network. But in
@@ -1125,14 +1125,23 @@ inside a single process, and for that you can use
11251125
what you use when you're looking for a queue. The main difference
11261126
is that Trio splits the classic queue interface up into two
11271127
objects. The advantage of this is that it makes it possible to put
1128-
the two ends in different processes, and that we can close the two
1129-
sides separately.
1128+
the two ends in different processes without rewriting your code,
1129+
and that we can close the two sides separately.
1130+
1131+
`MemorySendChannel` and `MemoryReceiveChannel` also expose several
1132+
more features beyond the core channel interface:
1133+
1134+
.. autoclass:: MemorySendChannel
1135+
:members:
1136+
1137+
.. autoclass:: MemoryReceiveChannel
1138+
:members:
11301139

11311140

11321141
A simple channel example
11331142
++++++++++++++++++++++++
11341143

1135-
Here's a simple example of how to use channels:
1144+
Here's a simple example of how to use memory channels:
11361145

11371146
.. literalinclude:: reference-core/channels-simple.py
11381147

@@ -1244,14 +1253,13 @@ program above:
12441253
.. literalinclude:: reference-core/channels-mpmc-fixed.py
12451254
:emphasize-lines: 7, 9, 10, 12, 13
12461255

1247-
This example demonstrates using the :meth:`SendChannel.clone
1248-
<trio.abc.SendChannel.clone>` and :meth:`ReceiveChannel.clone
1249-
<trio.abc.ReceiveChannel.clone>` methods. What these do is create
1250-
copies of our endpoints, that act just like the original – except that
1251-
they can be closed independently. And the underlying channel is only
1252-
closed after *all* the clones have been closed. So this completely
1253-
solves our problem with shutdown, and if you run this program, you'll
1254-
see it print its six lines of output and then exits cleanly.
1256+
This example demonstrates using the `MemorySendChannel.clone` and
1257+
`MemoryReceiveChannel.clone` methods. What these do is create copies
1258+
of our endpoints, that act just like the original – except that they
1259+
can be closed independently. And the underlying channel is only closed
1260+
after *all* the clones have been closed. So this completely solves our
1261+
problem with shutdown, and if you run this program, you'll see it
1262+
print its six lines of output and then exits cleanly.
12551263

12561264
Notice a small trick we use: the code in ``main`` creates clone
12571265
objects to pass into all the child tasks, and then closes the original

docs/source/reference-io.rst

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,19 @@ Abstract base classes
117117
- :class:`~trio.SocketListener`, :class:`~trio.SSLListener`
118118
* - :class:`SendChannel`
119119
- :class:`AsyncResource`
120-
- :meth:`~SendChannel.send`, :meth:`~SendChannel.send_nowait`
120+
- :meth:`~SendChannel.send`
121121
-
122-
- :func:`~trio.open_memory_channel`
122+
- `~trio.MemorySendChannel`
123123
* - :class:`ReceiveChannel`
124124
- :class:`AsyncResource`
125-
- :meth:`~ReceiveChannel.receive`, :meth:`~ReceiveChannel.receive_nowait`
125+
- :meth:`~ReceiveChannel.receive`
126126
- ``__aiter__``, ``__anext__``
127-
- :func:`~trio.open_memory_channel`
127+
- `~trio.MemoryReceiveChannel`
128+
* - `Channel`
129+
- `SendChannel`, `ReceiveChannel`
130+
-
131+
-
132+
-
128133

129134
.. autoclass:: trio.abc.AsyncResource
130135
:members:
@@ -165,6 +170,10 @@ Abstract base classes
165170
:members:
166171
:show-inheritance:
167172

173+
.. autoclass:: trio.abc.Channel
174+
:members:
175+
:show-inheritance:
176+
168177
.. currentmodule:: trio
169178

170179

newsfragments/719.feature.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
We cleaned up the distinction between the "abstract channel interface"
2+
and the "memory channel" concrete implementation.
3+
`trio.abc.SendChannel` and `trio.abc.ReceiveChannel` have been slimmed
4+
down, `trio.MemorySendChannel` and `trio.MemoryReceiveChannel` are now
5+
public types that can be used in type hints, and there's a new
6+
`trio.abc.Channel` interface for future bidirectional channels.

trio/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838

3939
from ._highlevel_generic import aclose_forcefully, StapledStream
4040

41-
from ._channel import open_memory_channel
41+
from ._channel import (
42+
open_memory_channel, MemorySendChannel, MemoryReceiveChannel
43+
)
4244

4345
from ._signals import open_signal_receiver
4446

trio/_abc.py

Lines changed: 36 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -484,15 +484,18 @@ async def send_eof(self):
484484
"""
485485

486486

487+
# A regular invariant generic type
488+
T = TypeVar("T")
489+
487490
# The type of object produced by a ReceiveChannel (covariant because
488491
# ReceiveChannel[Derived] can be passed to someone expecting
489492
# ReceiveChannel[Base])
490-
T_co = TypeVar("T_co", covariant=True)
493+
ReceiveType = TypeVar("ReceiveType", covariant=True)
491494

492495
# The type of object accepted by a SendChannel (contravariant because
493496
# SendChannel[Base] can be passed to someone expecting
494497
# SendChannel[Derived])
495-
T_contra = TypeVar("T_contra", contravariant=True)
498+
SendType = TypeVar("SendType", contravariant=True)
496499

497500
# The type of object produced by a Listener (covariant plus must be
498501
# an AsyncResource)
@@ -537,39 +540,21 @@ async def accept(self):
537540
"""
538541

539542

540-
class SendChannel(AsyncResource, Generic[T_contra]):
543+
class SendChannel(AsyncResource, Generic[SendType]):
541544
"""A standard interface for sending Python objects to some receiver.
542545
543-
:class:`SendChannel` objects also implement the :class:`AsyncResource`
544-
interface, so they can be closed by calling :meth:`~AsyncResource.aclose`
545-
or using an ``async with`` block.
546+
`SendChannel` objects also implement the `AsyncResource` interface, so
547+
they can be closed by calling `~AsyncResource.aclose` or using an ``async
548+
with`` block.
546549
547550
If you want to send raw bytes rather than Python objects, see
548-
:class:`ReceiveStream`.
551+
`ReceiveStream`.
549552
550553
"""
551554
__slots__ = ()
552555

553556
@abstractmethod
554-
def send_nowait(self, value):
555-
"""Attempt to send an object through the channel, without blocking.
556-
557-
Args:
558-
value (object): The object to send.
559-
560-
Raises:
561-
trio.WouldBlock: if the operation cannot be completed immediately
562-
(for example, because the channel's internal buffer is full).
563-
trio.BrokenResourceError: if something has gone wrong, and the
564-
channel is broken. For example, you may get this if the receiver
565-
has already been closed.
566-
trio.ClosedResourceError: if you previously closed this
567-
:class:`SendChannel` object.
568-
569-
"""
570-
571-
@abstractmethod
572-
async def send(self, value):
557+
async def send(self, value: SendType) -> None:
573558
"""Attempt to send an object through the channel, blocking if necessary.
574559
575560
Args:
@@ -582,36 +567,15 @@ async def send(self, value):
582567
trio.ClosedResourceError: if you previously closed this
583568
:class:`SendChannel` object, or if another task closes it while
584569
:meth:`send` is running.
585-
586-
"""
587-
588-
@abstractmethod
589-
def clone(self):
590-
"""Clone this send channel object.
591-
592-
This returns a new :class:`SendChannel` object, which acts as a
593-
duplicate of the original: sending on the new object does exactly the
594-
same thing as sending on the old object.
595-
596-
However, closing one of the objects does not close the other, and
597-
receivers don't get :exc:`~trio.EndOfChannel` until *all* clones have
598-
been closed.
599-
600-
This is useful for communication patterns that involve multiple
601-
producers all sending objects to the same destination. If you give
602-
each producer its own clone of the :class:`SendChannel`, and then make
603-
sure to close each :class:`SendChannel` when it's finished, receivers
604-
will automatically get notified when all producers are finished. See
605-
:ref:`channel-mpmc` for examples.
606-
607-
Raises:
608-
trio.ClosedResourceError: if you already closed this
609-
:class:`SendChannel` object.
570+
trio.BusyResourceError: some channels allow multiple tasks to call
571+
`send` at the same time, but others don't. If you try to call
572+
`send` simultaneously from multiple tasks on a channel that
573+
doesn't support it, then you can get `~trio.BusyResourceError`.
610574
611575
"""
612576

613577

614-
class ReceiveChannel(AsyncResource, Generic[T_co]):
578+
class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
615579
"""A standard interface for receiving Python objects from some sender.
616580
617581
You can iterate over a :class:`ReceiveChannel` using an ``async for``
@@ -621,45 +585,22 @@ class ReceiveChannel(AsyncResource, Generic[T_co]):
621585
...
622586
623587
This is equivalent to calling :meth:`receive` repeatedly. The loop exits
624-
without error when :meth:`receive` raises :exc:`~trio.EndOfChannel`.
588+
without error when `receive` raises `~trio.EndOfChannel`.
625589
626-
:class:`ReceiveChannel` objects also implement the :class:`AsyncResource`
627-
interface, so they can be closed by calling :meth:`~AsyncResource.aclose`
628-
or using an ``async with`` block.
590+
`ReceiveChannel` objects also implement the `AsyncResource` interface, so
591+
they can be closed by calling `~AsyncResource.aclose` or using an ``async
592+
with`` block.
629593
630594
If you want to receive raw bytes rather than Python objects, see
631-
:class:`ReceiveStream`.
595+
`ReceiveStream`.
632596
633597
"""
634598
__slots__ = ()
635599

636600
@abstractmethod
637-
def receive_nowait(self):
638-
"""Attempt to receive an incoming object, without blocking.
639-
640-
Returns:
641-
object: Whatever object was received.
642-
643-
Raises:
644-
trio.WouldBlock: if the operation cannot be completed immediately
645-
(for example, because no object has been sent yet).
646-
trio.EndOfChannel: if the sender has been closed cleanly, and no
647-
more objects are coming. This is not an error condition.
648-
trio.ClosedResourceError: if you previously closed this
649-
:class:`ReceiveChannel` object.
650-
trio.BrokenResourceError: if something has gone wrong, and the
651-
channel is broken.
652-
653-
"""
654-
655-
@abstractmethod
656-
async def receive(self):
601+
async def receive(self) -> ReceiveType:
657602
"""Attempt to receive an incoming object, blocking if necessary.
658603
659-
It's legal for multiple tasks to call :meth:`receive` at the same
660-
time. If this happens, then one task receives the first value sent,
661-
another task receives the next value sent, and so on.
662-
663604
Returns:
664605
object: Whatever object was received.
665606
@@ -670,43 +611,28 @@ async def receive(self):
670611
:class:`ReceiveChannel` object.
671612
trio.BrokenResourceError: if something has gone wrong, and the
672613
channel is broken.
673-
674-
"""
675-
676-
@abstractmethod
677-
def clone(self):
678-
"""Clone this receive channel object.
679-
680-
This returns a new :class:`ReceiveChannel` object, which acts as a
681-
duplicate of the original: receiving on the new object does exactly
682-
the same thing as receiving on the old object.
683-
684-
However, closing one of the objects does not close the other, and the
685-
underlying channel is not closed until all clones are closed.
686-
687-
This is useful for communication patterns involving multiple consumers
688-
all receiving objects from the same underlying channel. See
689-
:ref:`channel-mpmc` for examples.
690-
691-
.. warning:: The clones all share the same underlying channel.
692-
Whenever a clone :meth:`receive`\\s a value, it is removed from the
693-
channel and the other clones do *not* receive that value. If you
694-
want to send multiple copies of the same stream of values to
695-
multiple destinations, like :func:`itertools.tee`, then you need to
696-
find some other solution; this method does *not* do that.
697-
698-
Raises:
699-
trio.ClosedResourceError: if you already closed this
700-
:class:`SendChannel` object.
614+
trio.BusyResourceError: some channels allow multiple tasks to call
615+
`receive` at the same time, but others don't. If you try to call
616+
`receive` simultaneously from multiple tasks on a channel that
617+
doesn't support it, then you can get `~trio.BusyResourceError`.
701618
702619
"""
703620

704621
@aiter_compat
705622
def __aiter__(self):
706623
return self
707624

708-
async def __anext__(self):
625+
async def __anext__(self) -> ReceiveType:
709626
try:
710627
return await self.receive()
711628
except trio.EndOfChannel:
712629
raise StopAsyncIteration
630+
631+
632+
class Channel(SendChannel[T], ReceiveChannel[T]):
633+
"""A standard interface for interacting with bidirectional channels.
634+
635+
A `Channel` is an object that implements both the `SendChannel` and
636+
`ReceiveChannel` interfaces, so you can both send and receive objects.
637+
638+
"""

0 commit comments

Comments
 (0)