Skip to content

Commit d2f364e

Browse files
authored
Merge pull request #1123 from njsmith/no-more-mandatory-buffer-size
Streams are iterable + receive_some doesn't require an explicit size
2 parents 3623535 + 8de6171 commit d2f364e

19 files changed

+189
-119
lines changed

docs/source/reference-io.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ Abstract base classes
9898
* - :class:`ReceiveStream`
9999
- :class:`AsyncResource`
100100
- :meth:`~ReceiveStream.receive_some`
101-
-
101+
- ``__aiter__``, ``__anext__``
102102
- :class:`~trio.testing.MemoryReceiveStream`
103103
* - :class:`Stream`
104104
- :class:`SendStream`, :class:`ReceiveStream`

docs/source/tutorial.rst

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -908,12 +908,10 @@ And the second task's job is to process the data the server sends back:
908908
:lineno-match:
909909
:pyobject: receiver
910910

911-
It repeatedly calls ``await client_stream.receive_some(...)`` to get
912-
more data from the server (again, all Trio streams provide this
913-
method), and then checks to see if the server has closed the
914-
connection. ``receive_some`` only returns an empty bytestring if the
915-
connection has been closed; otherwise, it waits until some data has
916-
arrived, up to a maximum of ``BUFSIZE`` bytes.
911+
It uses an ``async for`` loop to fetch data from the server.
912+
Alternatively, it could use `~trio.abc.ReceiveStream.receive_some`,
913+
which is the opposite of `~trio.abc.SendStream.send_all`, but using
914+
``async for`` saves some boilerplate.
917915

918916
And now we're ready to look at the server.
919917

@@ -974,11 +972,11 @@ functions we saw in the last section:
974972

975973
The argument ``server_stream`` is provided by :func:`serve_tcp`, and
976974
is the other end of the connection we made in the client: so the data
977-
that the client passes to ``send_all`` will come out of
978-
``receive_some`` here, and vice-versa. Then we have a ``try`` block
979-
discussed below, and finally the server loop which alternates between
980-
reading some data from the socket and then sending it back out again
981-
(unless the socket was closed, in which case we quit).
975+
that the client passes to ``send_all`` will come out here. Then we
976+
have a ``try`` block discussed below, and finally the server loop
977+
which alternates between reading some data from the socket and then
978+
sending it back out again (unless the socket was closed, in which case
979+
we quit).
982980

983981
So what's that ``try`` block for? Remember that in Trio, like Python
984982
in general, exceptions keep propagating until they're caught. Here we
@@ -1029,7 +1027,7 @@ our client could use a single task like::
10291027
while True:
10301028
data = ...
10311029
await client_stream.send_all(data)
1032-
received = await client_stream.receive_some(BUFSIZE)
1030+
received = await client_stream.receive_some()
10331031
if not received:
10341032
sys.exit()
10351033
await trio.sleep(1)
@@ -1046,18 +1044,23 @@ line, any time we're expecting more than one byte of data, we have to
10461044
be prepared to call ``receive_some`` multiple times.
10471045

10481046
And where this would go especially wrong is if we find ourselves in
1049-
the situation where ``len(data) > BUFSIZE``. On each pass through the
1050-
loop, we send ``len(data)`` bytes, but only read *at most* ``BUFSIZE``
1051-
bytes. The result is something like a memory leak: we'll end up with
1052-
more and more data backed up in the network, until eventually
1053-
something breaks.
1047+
the situation where ``data`` is big enough that it passes some
1048+
internal threshold, and the operating system or network decide to
1049+
always break it up into multiple pieces. Now on each pass through the
1050+
loop, we send ``len(data)`` bytes, but read less than that. The result
1051+
is something like a memory leak: we'll end up with more and more data
1052+
backed up in the network, until eventually something breaks.
1053+
1054+
.. note:: If you're curious *how* things break, then you can use
1055+
`~trio.abc.ReceiveStream.receive_some`\'s optional argument to put
1056+
a limit on how many bytes you read each time, and see what happens.
10541057

10551058
We could fix this by keeping track of how much data we're expecting at
10561059
each moment, and then keep calling ``receive_some`` until we get it all::
10571060

10581061
expected = len(data)
10591062
while expected > 0:
1060-
received = await client_stream.receive_some(BUFSIZE)
1063+
received = await client_stream.receive_some(expected)
10611064
if not received:
10621065
sys.exit(1)
10631066
expected -= len(received)

docs/source/tutorial/echo-client.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@
88
# - can't be in use by some other program on your computer
99
# - must match what we set in our echo server
1010
PORT = 12345
11-
# How much memory to spend (at most) on each call to recv. Pretty arbitrary,
12-
# but shouldn't be too big or too small.
13-
BUFSIZE = 16384
1411

1512
async def sender(client_stream):
1613
print("sender: started!")
@@ -22,12 +19,10 @@ async def sender(client_stream):
2219

2320
async def receiver(client_stream):
2421
print("receiver: started!")
25-
while True:
26-
data = await client_stream.receive_some(BUFSIZE)
22+
async for data in client_stream:
2723
print("receiver: got data {!r}".format(data))
28-
if not data:
29-
print("receiver: connection closed")
30-
sys.exit()
24+
print("receiver: connection closed")
25+
sys.exit()
3126

3227
async def parent():
3328
print("parent: connecting to 127.0.0.1:{}".format(PORT))

docs/source/tutorial/echo-server.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@
88
# - can't be in use by some other program on your computer
99
# - must match what we set in our echo client
1010
PORT = 12345
11-
# How much memory to spend (at most) on each call to recv. Pretty arbitrary,
12-
# but shouldn't be too big or too small.
13-
BUFSIZE = 16384
1411

1512
CONNECTION_COUNTER = count()
1613

@@ -20,14 +17,10 @@ async def echo_server(server_stream):
2017
ident = next(CONNECTION_COUNTER)
2118
print("echo_server {}: started".format(ident))
2219
try:
23-
while True:
24-
data = await server_stream.receive_some(BUFSIZE)
20+
async for data in server_stream:
2521
print("echo_server {}: received data {!r}".format(ident, data))
26-
if not data:
27-
print("echo_server {}: connection closed".format(ident))
28-
return
29-
print("echo_server {}: sending data {!r}".format(ident, data))
3022
await server_stream.send_all(data)
23+
print("echo_server {}: connection closed".format(ident))
3124
# FIXME: add discussion of MultiErrors to the tutorial, and use
3225
# MultiError.catch here. (Not important in this case, but important if the
3326
# server code uses nurseries internally.)

newsfragments/959.feature.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
If you have a `~trio.abc.ReceiveStream` object, you can now use
2+
``async for data in stream: ...`` instead of calling
3+
`~trio.abc.ReceiveStream.receive_some`. Each iteration gives an
4+
arbitrary sized chunk of bytes. And the best part is, the loop
5+
automatically exits when you reach EOF, so you don't have to check for
6+
it yourself anymore. Relatedly, you no longer need to pick a magic
7+
buffer size value before calling
8+
`~trio.abc.ReceiveStream.receive_some`; you can ``await
9+
stream.receive_some()`` with no arguments, and the stream will
10+
automatically pick a reasonable size for you.

notes-to-self/graceful-shutdown-idea.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def shutting_down(self):
3030
async def stream_handler(stream):
3131
while True:
3232
with gsm.cancel_on_graceful_shutdown():
33-
data = await stream.receive_some(...)
33+
data = await stream.receive_some()
3434
if gsm.shutting_down:
3535
break
3636

trio/_abc.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -378,26 +378,28 @@ class ReceiveStream(AsyncResource):
378378
If you want to receive Python objects rather than raw bytes, see
379379
:class:`ReceiveChannel`.
380380
381+
`ReceiveStream` objects can be used in ``async for`` loops. Each iteration
382+
will produce an arbitrary sized chunk of bytes, like calling
383+
`receive_some` with no arguments. Every chunk will contain at least one
384+
byte, and the loop automatically exits when reaching end-of-file.
385+
381386
"""
382387
__slots__ = ()
383388

384389
@abstractmethod
385-
async def receive_some(self, max_bytes):
390+
async def receive_some(self, max_bytes=None):
386391
"""Wait until there is data available on this stream, and then return
387-
at most ``max_bytes`` of it.
392+
some of it.
388393
389394
A return value of ``b""`` (an empty bytestring) indicates that the
390395
stream has reached end-of-file. Implementations should be careful that
391396
they return ``b""`` if, and only if, the stream has reached
392397
end-of-file!
393398
394-
This method will return as soon as any data is available, so it may
395-
return fewer than ``max_bytes`` of data. But it will never return
396-
more.
397-
398399
Args:
399400
max_bytes (int): The maximum number of bytes to return. Must be
400-
greater than zero.
401+
greater than zero. Optional; if omitted, then the stream object
402+
is free to pick a reasonable default.
401403
402404
Returns:
403405
bytes or bytearray: The data received.
@@ -413,6 +415,16 @@ async def receive_some(self, max_bytes):
413415
414416
"""
415417

418+
@aiter_compat
419+
def __aiter__(self):
420+
return self
421+
422+
async def __anext__(self):
423+
data = await self.receive_some()
424+
if not data:
425+
raise StopAsyncIteration
426+
return data
427+
416428

417429
class Stream(SendStream, ReceiveStream):
418430
"""A standard interface for interacting with bidirectional byte streams.

trio/_highlevel_generic.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class StapledStream(HalfCloseableStream):
5252
left, right = trio.testing.memory_stream_pair()
5353
echo_stream = StapledStream(SocketStream(left), SocketStream(right))
5454
await echo_stream.send_all(b"x")
55-
assert await echo_stream.receive_some(1) == b"x"
55+
assert await echo_stream.receive_some() == b"x"
5656
5757
:class:`StapledStream` objects implement the methods in the
5858
:class:`~trio.abc.HalfCloseableStream` interface. They also have two
@@ -96,7 +96,7 @@ async def send_eof(self):
9696
else:
9797
return await self.send_stream.aclose()
9898

99-
async def receive_some(self, max_bytes):
99+
async def receive_some(self, max_bytes=None):
100100
"""Calls ``self.receive_stream.receive_some``.
101101
102102
"""

trio/_highlevel_socket.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@
1010

1111
__all__ = ["SocketStream", "SocketListener"]
1212

13+
# XX TODO: this number was picked arbitrarily. We should do experiments to
14+
# tune it. (Or make it dynamic -- one idea is to start small and increase it
15+
# if we observe single reads filling up the whole buffer, at least within some
16+
# limits.)
17+
DEFAULT_RECEIVE_SIZE = 65536
18+
1319
_closed_stream_errnos = {
1420
# Unix
1521
errno.EBADF,
@@ -129,7 +135,9 @@ async def send_eof(self):
129135
with _translate_socket_errors_to_stream_errors():
130136
self.socket.shutdown(tsocket.SHUT_WR)
131137

132-
async def receive_some(self, max_bytes):
138+
async def receive_some(self, max_bytes=None):
139+
if max_bytes is None:
140+
max_bytes = DEFAULT_RECEIVE_SIZE
133141
if max_bytes < 1:
134142
raise ValueError("max_bytes must be >= 1")
135143
with _translate_socket_errors_to_stream_errors():

0 commit comments

Comments
 (0)