Skip to content

Commit 3ffb650

Browse files
authored
Allow stream.{read,write}s of length 0 to query/signal readiness (#444)
1 parent 41daa8d commit 3ffb650

File tree

4 files changed

+95
-42
lines changed

4 files changed

+95
-42
lines changed

design/mvp/Async.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,11 @@ These built-ins can either return immediately if >0 elements were able to be
356356
written or read immediately (without blocking) or return a sentinel "blocked"
357357
value indicating that the read or write will execute concurrently. The readable
358358
and writable ends of streams and futures can then be [waited](#waiting) on to
359-
make progress.
359+
make progress. Notification of progress signals *completion* of a read or write
360+
(i.e., the bytes have already been copied into the buffer). Additionally,
361+
*readiness* (to perform a read or write in the future) can be queried and
362+
signalled by performing a `0`-length read or write (see the [Stream State]
363+
section in the Canonical ABI explainer for details).
360364

361365
As a temporary limitation, if a `read` and `write` for a single stream or
362366
future occur from within the same component, there is a trap. In the future

design/mvp/CanonicalABI.md

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -415,8 +415,8 @@ class BufferGuestImpl(Buffer):
415415
length: int
416416

417417
def __init__(self, t, cx, ptr, length):
418-
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
419-
if t:
418+
trap_if(length > Buffer.MAX_LENGTH)
419+
if t and length > 0:
420420
trap_if(ptr != align_to(ptr, alignment(t)))
421421
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
422422
self.cx = cx
@@ -1299,10 +1299,13 @@ class ReadableStreamGuestImpl(ReadableStream):
12991299
self.reset_pending()
13001300

13011301
def reset_pending(self):
1302-
self.pending_inst = None
1303-
self.pending_buffer = None
1304-
self.pending_on_partial_copy = None
1305-
self.pending_on_copy_done = None
1302+
self.set_pending(None, None, None, None)
1303+
1304+
def set_pending(self, inst, buffer, on_partial_copy, on_copy_done):
1305+
self.pending_inst = inst
1306+
self.pending_buffer = buffer
1307+
self.pending_on_partial_copy = on_partial_copy
1308+
self.pending_on_copy_done = on_copy_done
13061309
```
13071310
If set, the `pending_*` fields record the `Buffer` and `On*` callbacks of a
13081311
`read` or `write` that is waiting to rendezvous with a complementary `write` or
@@ -1356,27 +1359,45 @@ but in the opposite direction. Both are implemented by a single underlying
13561359
if self.closed_:
13571360
return 'done'
13581361
elif not self.pending_buffer:
1359-
self.pending_inst = inst
1360-
self.pending_buffer = buffer
1361-
self.pending_on_partial_copy = on_partial_copy
1362-
self.pending_on_copy_done = on_copy_done
1362+
self.set_pending(inst, buffer, on_partial_copy, on_copy_done)
13631363
return 'blocked'
13641364
else:
13651365
trap_if(inst is self.pending_inst) # temporary
1366-
ncopy = min(src.remain(), dst.remain())
1367-
assert(ncopy > 0)
1368-
dst.write(src.read(ncopy))
13691366
if self.pending_buffer.remain() > 0:
1370-
self.pending_on_partial_copy(self.reset_pending)
1367+
if buffer.remain() > 0:
1368+
dst.write(src.read(min(src.remain(), dst.remain())))
1369+
if self.pending_buffer.remain() > 0:
1370+
self.pending_on_partial_copy(self.reset_pending)
1371+
else:
1372+
self.reset_and_notify_pending('completed')
1373+
return 'done'
13711374
else:
1372-
self.reset_and_notify_pending('completed')
1373-
return 'done'
1374-
```
1375-
Currently, there is a trap when both the `read` and `write` come from the same
1376-
component instance, but this trapping condition will be removed in a subsequent
1377-
release. The reason for this trap is that when lifting and lowering can alias
1378-
the same memory, interleaving must be handled carefully. Future improvements to
1379-
the Canonical ABI ([lazy lowering]) can greatly simplify this interleaving.
1375+
if buffer.remain() > 0 or buffer is dst:
1376+
self.reset_and_notify_pending('completed')
1377+
self.set_pending(inst, buffer, on_partial_copy, on_copy_done)
1378+
return 'blocked'
1379+
else:
1380+
return 'done'
1381+
```
1382+
The meaning of a `read` or `write` when the length is `0` is that the caller is
1383+
querying the "readiness" of the other side. When a `0`-length read/write
1384+
rendezvous with a non-`0`-length read/write, only the `0`-length read/write
1385+
completes; the non-`0`-length read/write is kept pending (and ready for a
1386+
subsequent rendezvous).
1387+
1388+
In the corner case where a `0`-length read *and* write rendezvous, only the
1389+
*writer* is notified of readiness. To avoid livelock, the Canonical ABI
1390+
requires that a writer *must* (eventually) follow a completed `0`-length write
1391+
with a non-`0`-length write that is allowed to block (allowing the reader end
1392+
to run and rendezvous with its own non-`0`-length read). To implement a
1393+
traditional `O_NONBLOCK` `write()` or `sendmsg()` API, a writer can use a
1394+
buffering scheme in which, after `select()` (or a similar API) signals a file
1395+
descriptor is ready to write, the next `O_NONBLOCK` `write()`/`sendmsg()` on
1396+
that file descriptor copies to an internal buffer and suceeds, issuing an
1397+
`async` `stream.write` in the background and waiting for completion before
1398+
signalling readiness again. Note that buffering only occurs when streaming
1399+
between two components using non-blocking I/O; if either side is the host or a
1400+
component using blocking or completion-based I/O, no buffering is necessary.
13801401

13811402
Given the above, we can define the `{Readable,Writable}StreamEnd` classes that
13821403
are actually stored in the `waitables` table. The classes are almost entirely

design/mvp/canonical-abi/definitions.py

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,8 @@ class BufferGuestImpl(Buffer):
327327
length: int
328328

329329
def __init__(self, t, cx, ptr, length):
330-
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
331-
if t:
330+
trap_if(length > Buffer.MAX_LENGTH)
331+
if t and length > 0:
332332
trap_if(ptr != align_to(ptr, alignment(t)))
333333
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
334334
self.cx = cx
@@ -772,10 +772,13 @@ def __init__(self, t):
772772
self.reset_pending()
773773

774774
def reset_pending(self):
775-
self.pending_inst = None
776-
self.pending_buffer = None
777-
self.pending_on_partial_copy = None
778-
self.pending_on_copy_done = None
775+
self.set_pending(None, None, None, None)
776+
777+
def set_pending(self, inst, buffer, on_partial_copy, on_copy_done):
778+
self.pending_inst = inst
779+
self.pending_buffer = buffer
780+
self.pending_on_partial_copy = on_partial_copy
781+
self.pending_on_copy_done = on_copy_done
779782

780783
def reset_and_notify_pending(self, why):
781784
pending_on_copy_done = self.pending_on_copy_done
@@ -804,21 +807,25 @@ def copy(self, inst, buffer, on_partial_copy, on_copy_done, src, dst):
804807
if self.closed_:
805808
return 'done'
806809
elif not self.pending_buffer:
807-
self.pending_inst = inst
808-
self.pending_buffer = buffer
809-
self.pending_on_partial_copy = on_partial_copy
810-
self.pending_on_copy_done = on_copy_done
810+
self.set_pending(inst, buffer, on_partial_copy, on_copy_done)
811811
return 'blocked'
812812
else:
813813
trap_if(inst is self.pending_inst) # temporary
814-
ncopy = min(src.remain(), dst.remain())
815-
assert(ncopy > 0)
816-
dst.write(src.read(ncopy))
817814
if self.pending_buffer.remain() > 0:
818-
self.pending_on_partial_copy(self.reset_pending)
815+
if buffer.remain() > 0:
816+
dst.write(src.read(min(src.remain(), dst.remain())))
817+
if self.pending_buffer.remain() > 0:
818+
self.pending_on_partial_copy(self.reset_pending)
819+
else:
820+
self.reset_and_notify_pending('completed')
821+
return 'done'
819822
else:
820-
self.reset_and_notify_pending('completed')
821-
return 'done'
823+
if buffer.remain() > 0 or buffer is dst:
824+
self.reset_and_notify_pending('completed')
825+
self.set_pending(inst, buffer, on_partial_copy, on_copy_done)
826+
return 'blocked'
827+
else:
828+
return 'done'
822829

823830
class StreamEnd(Waitable):
824831
stream: ReadableStream

design/mvp/canonical-abi/run_tests.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1503,8 +1503,19 @@ async def core_func1(task, args):
15031503
result,n = unpack_result(mem1[retp+4])
15041504
assert(n == 4 and result == definitions.COMPLETED)
15051505

1506+
[ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 12345, 0)
1507+
assert(ret == definitions.BLOCKED)
1508+
15061509
fut4.set_result(None)
15071510

1511+
[event] = await canon_waitable_set_wait(False, mem1, task, seti, retp)
1512+
assert(event == EventCode.STREAM_WRITE)
1513+
assert(mem1[retp+0] == wsi)
1514+
assert(mem1[retp+4] == 0)
1515+
1516+
[ret] = await canon_stream_write(U8Type(), opts1, task, wsi, 12345, 0)
1517+
assert(ret == 0)
1518+
15081519
[errctxi] = await canon_error_context_new(opts1, task, 0, 0)
15091520
[] = await canon_stream_close_writable(U8Type(), task, wsi)
15101521
[] = await canon_waitable_set_drop(task, seti)
@@ -1545,6 +1556,9 @@ async def core_func2(task, args):
15451556
fut2.set_result(None)
15461557
await task.on_block(fut3)
15471558

1559+
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0)
1560+
assert(ret == 0)
1561+
15481562
mem2[0:8] = bytes(8)
15491563
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2)
15501564
result,n = unpack_result(ret)
@@ -1557,9 +1571,16 @@ async def core_func2(task, args):
15571571

15581572
await task.on_block(fut4)
15591573

1560-
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2)
1561-
result,n = unpack_result(ret)
1562-
assert(n == 0 and result == definitions.CLOSED)
1574+
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 12345, 0)
1575+
assert(ret == definitions.BLOCKED)
1576+
1577+
[event] = await canon_waitable_set_wait(False, mem2, task, seti, retp)
1578+
assert(event == EventCode.STREAM_READ)
1579+
assert(mem2[retp+0] == rsi)
1580+
p2 = int.from_bytes(mem2[retp+4 : retp+8], 'little', signed=False)
1581+
errctxi = 1
1582+
assert(p2 == (definitions.CLOSED | errctxi))
1583+
15631584
[] = await canon_stream_close_readable(U8Type(), task, rsi)
15641585
[] = await canon_waitable_set_drop(task, seti)
15651586
return []

0 commit comments

Comments
 (0)