Skip to content

Commit 9a0e59b

Browse files
authored
Allow same-instance stream/future writes when element is empty (#508)
1 parent 3ffb650 commit 9a0e59b

File tree

4 files changed

+77
-6
lines changed

4 files changed

+77
-6
lines changed

design/mvp/Async.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,8 +363,8 @@ signalled by performing a `0`-length read or write (see the [Stream State]
363363
section in the Canonical ABI explainer for details).
364364

365365
As a temporary limitation, if a `read` and `write` for a single stream or
366-
future occur from within the same component, there is a trap. In the future
367-
this limitation will be removed.
366+
future occur from within the same component and the element type is non-empty,
367+
there is a trap. In the future this limitation will be removed.
368368

369369
The `T` element type of streams and futures is optional, such that `future` and
370370
`stream` can be written in WIT without a trailing `<T>`. In this case, the

design/mvp/CanonicalABI.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1362,7 +1362,8 @@ but in the opposite direction. Both are implemented by a single underlying
13621362
self.set_pending(inst, buffer, on_partial_copy, on_copy_done)
13631363
return 'blocked'
13641364
else:
1365-
trap_if(inst is self.pending_inst) # temporary
1365+
assert(self.t == src.t == dst.t)
1366+
trap_if(inst is self.pending_inst and self.t is not None) # temporary
13661367
if self.pending_buffer.remain() > 0:
13671368
if buffer.remain() > 0:
13681369
dst.write(src.read(min(src.remain(), dst.remain())))
@@ -1379,6 +1380,13 @@ but in the opposite direction. Both are implemented by a single underlying
13791380
else:
13801381
return 'done'
13811382
```
1383+
Currently, there is a trap when both the `read` and `write` come from the same
1384+
component instance and there is a non-empty element type. This trap will be
1385+
removed in a subsequent release; the reason for the trap is that when lifting
1386+
and lowering can alias the same memory, interleavings can be complex and must
1387+
be handled carefully. Future improvements to the Canonical ABI ([lazy lowering])
1388+
can greatly simplify this interleaving and be more practical to implement.
1389+
13821390
The meaning of a `read` or `write` when the length is `0` is that the caller is
13831391
querying the "readiness" of the other side. When a `0`-length read/write
13841392
rendezvous with a non-`0`-length read/write, only the `0`-length read/write

design/mvp/canonical-abi/definitions.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -810,7 +810,8 @@ def copy(self, inst, buffer, on_partial_copy, on_copy_done, src, dst):
810810
self.set_pending(inst, buffer, on_partial_copy, on_copy_done)
811811
return 'blocked'
812812
else:
813-
trap_if(inst is self.pending_inst) # temporary
813+
assert(self.t == src.t == dst.t)
814+
trap_if(inst is self.pending_inst and self.t is not None) # temporary
814815
if self.pending_buffer.remain() > 0:
815816
if buffer.remain() > 0:
816817
dst.write(src.read(min(src.remain(), dst.remain())))

design/mvp/canonical-abi/run_tests.py

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,6 +1062,7 @@ def write(self, vs):
10621062

10631063
class HostSink:
10641064
stream: ReadableStream
1065+
t: ValType
10651066
received: list[int]
10661067
chunk: int
10671068
write_remain: int
@@ -1070,6 +1071,7 @@ class HostSink:
10701071

10711072
def __init__(self, stream, chunk, remain = 2**64):
10721073
self.stream = stream
1074+
self.t = stream.t
10731075
self.received = []
10741076
self.chunk = chunk
10751077
self.write_remain = remain
@@ -1781,10 +1783,12 @@ async def core_func(task, args):
17811783

17821784

17831785
class HostFutureSink:
1786+
t: ValType
17841787
v: Optional[any]
17851788
has_v: asyncio.Event
17861789

1787-
def __init__(self):
1790+
def __init__(self, t):
1791+
self.t = t
17881792
self.v = None
17891793
self.has_v = asyncio.Event()
17901794

@@ -1849,7 +1853,7 @@ async def host_func(task, on_start, on_resolve, on_block):
18491853
[future] = on_start()
18501854
outgoing = HostFutureSource(U8Type())
18511855
on_resolve([outgoing])
1852-
incoming = HostFutureSink()
1856+
incoming = HostFutureSink(U8Type())
18531857
future.read(None, incoming, lambda:(), lambda why:())
18541858
wait = asyncio.create_task(incoming.has_v.wait())
18551859
await on_block(wait)
@@ -2169,6 +2173,63 @@ def on_resolve(results):
21692173
assert(len(got) == 1)
21702174
assert(got[0] == 42)
21712175

2176+
async def test_self_empty():
2177+
inst = ComponentInstance()
2178+
mem = bytearray(24)
2179+
sync_opts = mk_opts(memory=mem, sync=True)
2180+
async_opts = mk_opts(memory=mem, sync=False)
2181+
2182+
ft = FuncType([],[])
2183+
async def core_func(task, args):
2184+
[seti] = await canon_waitable_set_new(task)
2185+
2186+
[packed] = await canon_future_new(None, task)
2187+
rfi,wfi = unpack_new_ends(packed)
2188+
2189+
[ret] = await canon_future_write(None, async_opts, task, wfi, 10000)
2190+
assert(ret == definitions.BLOCKED)
2191+
2192+
[ret] = await canon_future_read(None, async_opts, task, rfi, 20000)
2193+
result,n = unpack_result(ret)
2194+
assert(n == 1 and result == definitions.CLOSED)
2195+
[] = await canon_future_close_readable(None, task, rfi)
2196+
2197+
[] = await canon_waitable_join(task, wfi, seti)
2198+
[event] = await canon_waitable_set_wait(True, mem, task, seti, 0)
2199+
assert(event == EventCode.FUTURE_WRITE)
2200+
assert(mem[0] == wfi)
2201+
result,n = unpack_result(mem[4])
2202+
assert(result == definitions.CLOSED)
2203+
assert(n == 1)
2204+
[] = await canon_future_close_writable(None, task, wfi)
2205+
2206+
[packed] = await canon_stream_new(None, task)
2207+
rsi,wsi = unpack_new_ends(packed)
2208+
[ret] = await canon_stream_write(None, async_opts, task, wsi, 10000, 3)
2209+
assert(ret == definitions.BLOCKED)
2210+
2211+
[ret] = await canon_stream_read(None, async_opts, task, rsi, 2000, 1)
2212+
result,n = unpack_result(ret)
2213+
assert(n == 1 and result == definitions.COMPLETED)
2214+
[ret] = await canon_stream_read(None, async_opts, task, rsi, 2000, 4)
2215+
result,n = unpack_result(ret)
2216+
assert(n == 2 and result == definitions.COMPLETED)
2217+
[] = await canon_stream_close_readable(None, task, rsi)
2218+
2219+
[] = await canon_waitable_join(task, wsi, seti)
2220+
[event] = await canon_waitable_set_wait(True, mem, task, seti, 0)
2221+
assert(event == EventCode.STREAM_WRITE)
2222+
assert(mem[0] == wsi)
2223+
result,n = unpack_result(mem[4])
2224+
assert(result == definitions.CLOSED)
2225+
assert(n == 3)
2226+
[] = await canon_stream_close_writable(None, task, wsi)
2227+
2228+
[] = await canon_waitable_set_drop(task, seti)
2229+
return []
2230+
2231+
await canon_lift(sync_opts, inst, ft, core_func, None, lambda:[], lambda _:(), host_on_block)
2232+
21722233
async def run_async_tests():
21732234
await test_roundtrips()
21742235
await test_handles()
@@ -2187,6 +2248,7 @@ async def run_async_tests():
21872248
await test_cancel_copy()
21882249
await test_futures()
21892250
await test_cancel_subtask()
2251+
await test_self_empty()
21902252

21912253
asyncio.run(run_async_tests())
21922254

0 commit comments

Comments
 (0)