@@ -24,6 +24,7 @@ being specified here.
24
24
* [ ` canon resource.new ` ] ( #canon-resourcenew )
25
25
* [ ` canon resource.drop ` ] ( #canon-resourcedrop )
26
26
* [ ` canon resource.rep ` ] ( #canon-resourcerep )
27
+ * [ ` canon task.backpressure ` ] ( #-canon-taskbackpressure ) 🔀
27
28
* [ ` canon task.start ` ] ( #-canon-taskstart ) 🔀
28
29
* [ ` canon task.return ` ] ( #-canon-taskreturn ) 🔀
29
30
* [ ` canon task.wait ` ] ( #-canon-taskwait ) 🔀
@@ -255,36 +256,44 @@ Canonical ABI:
255
256
class ComponentInstance :
256
257
# core module instance state
257
258
may_leave: bool
258
- may_enter_sync: bool
259
- may_enter_async: bool
260
- pending_sync_tasks: list[asyncio.Future]
261
- pending_async_tasks: list[asyncio.Future]
262
259
handles: HandleTables
260
+ num_tasks: int
261
+ backpressure: bool
262
+ pending_tasks: list[asyncio.Future]
263
+ active_sync_task: bool
264
+ pending_sync_tasks: list[asyncio.Future]
263
265
async_subtasks: Table[AsyncSubtask]
264
266
fiber: asyncio.Lock
265
267
266
268
def __init__ (self ):
267
269
self .may_leave = True
268
- self .may_enter_sync = True
269
- self .may_enter_async = True
270
- self .pending_sync_tasks = []
271
- self .pending_async_tasks = []
272
270
self .handles = HandleTables()
271
+ self .num_tasks = 0
272
+ self .backpressure = False
273
+ self .pending_tasks = []
274
+ self .active_sync_task = False
275
+ self .pending_sync_tasks = []
273
276
self .async_subtasks = Table[AsyncSubtask]()
274
277
self .fiber = asyncio.Lock()
275
278
```
276
279
The ` may_leave ` field is used below to track whether the instance may call a
277
280
lowered import to prevent optimization-breaking cases of reentrance during
278
281
lowering.
279
282
280
- The ` may_enter_(sync|async) ` and ` pending_(sync|async)_tasks ` fields
281
- are used below to implement backpressure that is applied when new
282
- sync|async-lifted export calls try to enter this ` ComponentInstance ` .
283
-
284
283
The ` handles ` field contains a mapping from ` ResourceType ` to ` Table ` s of
285
284
` HandleElem ` s (defined next), establishing a separate ` i32 ` -indexed array per
286
285
resource type.
287
286
287
+ The ` backpressure ` and ` pending_tasks ` fields are used below to implement
288
+ backpressure that is applied when new export calls create new ` Task ` s in this
289
+ ` ComponentInstance ` . The ` num_tasks ` field tracks the number of live ` Task ` s in
290
+ this ` ComponentInstance ` and is primarily used to guard that a component
291
+ doesn't enter an invalid state where ` backpressure ` enabled but there are no
292
+ live tasks to disable it.
293
+
294
+ The ` active_sync_task ` and ` pending_sync_tasks ` fields are similarly used to
295
+ serialize synchronously-lifted calls into this component instance.
296
+
288
297
The ` async_subtasks ` field is used below to track and assign an ` i32 ` index to
289
298
each active async-lowered call in progress that has been made by this
290
299
` ComponentInstance ` .
@@ -492,17 +501,30 @@ the particular feature (`borrow` handles, `async` imports) is not used.
492
501
493
502
The ` caller ` field is immutable and is either ` None ` , when a ` Task ` is created
494
503
for a component export called directly by the host, or else the current task
495
- when the calling component called into this component. The ` caller ` field is
496
- used by the following two methods to prevent a component from being reentered
497
- (enforcing the [ component invariant] ) in a way that is well-defined even in the
498
- presence of async calls). (The ` fiber.acquire() ` call in ` enter() ` is
504
+ when the calling component called into this component.
505
+
506
+ The ` enter() ` method is called immediately after constructing the ` Task ` and is
507
+ responsible for implementing backpressure that has been signalled by guest code
508
+ via the ` task.backpressure ` built-in. (The ` fiber.acquire() ` call in ` enter() ` is
499
509
described above and here ensures that concurrent export calls do not
500
510
arbitrarily interleave.)
501
511
``` python
502
512
async def enter (self ):
503
513
await self .inst.fiber.acquire()
504
514
self .trap_if_on_the_stack(self .inst)
505
-
515
+ self .inst.num_tasks += 1
516
+ if self .inst.backpressure or self .inst.pending_tasks:
517
+ f = asyncio.Future()
518
+ self .inst.pending_tasks.append(f)
519
+ await f
520
+ assert (not self .inst.backpressure)
521
+ ```
522
+ The ` caller ` field mentioned above is used by ` trap_if_on_the_stack ` (called by
523
+ ` enter ` above) to prevent a component from being unexpectedly reentered
524
+ (enforcing the [ component invariant] ) in a way that is well-defined even in the
525
+ presence of async calls). This definition depends on having an async call tree
526
+ which in turn depends on maintaining [ structured concurrency] .
527
+ ``` python
506
528
def trap_if_on_the_stack (self , inst ):
507
529
c = self .caller
508
530
while c is not None :
@@ -522,6 +544,16 @@ O(n) loop in `trap_if_on_the_stack`:
522
544
a packed bit-vector (assigning each potentially-reenterable async component
523
545
instance a static bit position) that is passed by copy from caller to callee.
524
546
547
+ The ` pending_tasks ` queue (appended to by ` enter ` above) is emptied one at a
548
+ time when backpressure is disabled, ensuring that each popped tasks gets a
549
+ chance to start and possibly re-enable backpressure before the next pending
550
+ task is started:
551
+ ``` python
552
+ def maybe_start_pending_task (self ):
553
+ if self .inst.pending_tasks and not self .inst.backpressure:
554
+ self .inst.pending_tasks.pop(0 ).set_result(None )
555
+ ```
556
+
525
557
The ` borrow_count ` field is used by the following methods to track the number
526
558
of borrowed handles that were passed as parameters to the export that have not
527
559
yet been dropped (and thus might dangle if the caller destroys the resource
@@ -559,6 +591,7 @@ guarded to be `0` in `Task.exit` (below) to ensure [structured concurrency].
559
591
560
592
async def wait (self ):
561
593
self .inst.fiber.release()
594
+ self .maybe_start_pending_task()
562
595
subtask = await self .events.get()
563
596
await self .inst.fiber.acquire()
564
597
return self .process_event(subtask)
@@ -599,6 +632,7 @@ emulated in the Python code here by awaiting a `sleep(0)`).
599
632
``` python
600
633
async def yield_ (self ):
601
634
self .inst.fiber.release()
635
+ self .maybe_start_pending_task()
602
636
await asyncio.sleep(0 )
603
637
await self .inst.fiber.acquire()
604
638
```
@@ -609,9 +643,13 @@ progress.
609
643
``` python
610
644
def exit (self ):
611
645
assert (self .events.empty())
646
+ assert (self .inst.num_tasks >= 1 )
647
+ trap_if(self .inst.backpressure and self .inst.num_tasks == 1 )
612
648
trap_if(self .borrow_count != 0 )
613
649
trap_if(self .num_async_subtasks != 0 )
650
+ self .inst.num_tasks -= 1
614
651
self .inst.fiber.release()
652
+ self .maybe_start_pending_task()
615
653
```
616
654
617
655
While ` canon_lift ` creates ` Task ` s, ` canon_lower ` creates ` Subtask ` objects:
@@ -649,20 +687,23 @@ given component instance at a given time.
649
687
``` python
650
688
class SyncTask (Task ):
651
689
async def enter (self ):
652
- if not self .inst.may_enter_sync:
690
+ await super ().enter()
691
+ if self .inst.active_sync_task:
653
692
f = asyncio.Future()
654
693
self .inst.pending_sync_tasks.append(f)
694
+ self .inst.fiber.release()
695
+ self .maybe_start_pending_task()
655
696
await f
656
- assert ( self .inst.may_enter_sync )
657
- self .inst.may_enter_sync = False
658
- await super ().enter()
697
+ await self .inst.fiber.acquire( )
698
+ assert ( not self .inst.active_sync_task)
699
+ self .inst.active_sync_task = True
659
700
660
701
def exit (self ):
661
- super ().exit()
662
- assert (not self .inst.may_enter_sync)
663
- self .inst.may_enter_sync = True
702
+ assert (self .inst.active_sync_task)
703
+ self .inst.active_sync_task = False
664
704
if self .inst.pending_sync_tasks:
665
705
self .inst.pending_sync_tasks.pop(0 ).set_result(None )
706
+ super ().exit()
666
707
```
667
708
Thus, after one sync task starts running, any subsequent attempts to call into
668
709
the same component instance before the first sync task finishes will wait in a
@@ -672,74 +713,37 @@ implementation should be able to avoid separately allocating
672
713
` Subtask ` table element of the caller.
673
714
674
715
The first 3 fields of ` AsyncTask ` are simply immutable copies of
675
- arguments/immediates passed to ` canon_lift ` that are used later on. The last 2
676
- fields are used to check the above-mentioned state machine transitions and also
677
- specify an async version of backpressure. In particular, the rules apply
678
- backpressure if a task blocks (calling ` wait ` ) while still in the ` STARTING `
679
- state, which signals that the component instance isn't ready to take on any new
680
- async calls (until some active calls finish):
716
+ arguments/immediates passed to ` canon_lift ` and are used by the ` task.start `
717
+ and ` task.return ` built-ins below. The last field is used to check the
718
+ above-mentioned state machine transitions from methods that are called by
719
+ ` task.start ` , ` task.return ` and ` canon_lift ` below.
681
720
``` python
682
721
class AsyncTask (Task ):
683
722
ft: FuncType
684
723
on_start: Callable
685
724
on_return: Callable
686
725
state: AsyncCallState
687
- unblock_next_pending: bool
688
726
689
727
def __init__ (self , opts , inst , caller , ft , on_start , on_return ):
690
728
super ().__init__ (opts, inst, caller)
691
729
self .ft = ft
692
730
self .on_start = on_start
693
731
self .on_return = on_return
694
732
self .state = AsyncCallState.STARTING
695
- self .unblock_next_pending = False
696
-
697
- async def enter (self ):
698
- if not self .inst.may_enter_async or self .inst.pending_async_tasks:
699
- f = asyncio.Future()
700
- self .inst.pending_async_tasks.append(f)
701
- await f
702
- assert (self .inst.may_enter_async)
703
- self .unblock_next_pending = len (self .inst.pending_async_tasks) > 0
704
- await super ().enter()
705
-
706
- async def wait (self ):
707
- if self .state == AsyncCallState.STARTING :
708
- self .inst.may_enter_async = False
709
- else :
710
- self .maybe_unblock_next_pending()
711
- return await super ().wait()
712
-
713
- def maybe_unblock_next_pending (self ):
714
- if self .unblock_next_pending:
715
- self .unblock_next_pending = False
716
- assert (self .inst.may_enter_async)
717
- self .inst.pending_async_tasks.pop(0 ).set_result(None )
718
733
719
734
def start (self ):
720
735
trap_if(self .state != AsyncCallState.STARTING )
721
736
self .state = AsyncCallState.STARTED
722
- if not self .inst.may_enter_async:
723
- self .inst.may_enter_async = True
724
737
725
738
def return_ (self ):
726
739
trap_if(self .state != AsyncCallState.STARTED )
727
740
self .state = AsyncCallState.RETURNED
728
741
729
742
def exit (self ):
730
- super ().exit()
731
743
trap_if(self .state != AsyncCallState.RETURNED )
732
744
self .state = AsyncCallState.DONE
733
- self .maybe_unblock_next_pending ()
745
+ super ().exit ()
734
746
```
735
- The above rules are careful to release pending async calls from the queue one
736
- at a time (rather than unblocking all of them at once). This ensures that, in
737
- all cases, every new task has a chance to apply backpressure before the next
738
- new task starts.
739
-
740
- Note that the backpressure rules described above apply independently to sync
741
- and async tasks and thus if a component exports both sync- * and* async-lifted
742
- functions, async functions may execute concurrently with sync functions.
743
747
744
748
Finally, the ` AsyncSubtask ` class extends ` Subtask ` with fields that are used
745
749
by the methods of ` Task ` , as shown above. ` AsyncSubtask ` s have the same linear
@@ -2222,6 +2226,27 @@ async def canon_resource_rep(rt, task, i):
2222
2226
Note that the "locally-defined" requirement above ensures that only the
2223
2227
component instance defining a resource can access its representation.
2224
2228
2229
+ ### 🔀 ` canon task.backpressure `
2230
+
2231
+ For a canonical definition:
2232
+ ``` wasm
2233
+ (canon task.backpressure (core func $f))
2234
+ ```
2235
+ validation specifies:
2236
+ * ` $f ` is given type ` [i32] -> [] `
2237
+
2238
+ Calling ` $f ` invokes the following function, which sets the ` backpressure `
2239
+ flag on the current ` ComponentInstance ` :
2240
+ ``` python
2241
+ async def canon_task_backpressure (task , flat_args ):
2242
+ trap_if(task.opts.sync)
2243
+ task.inst.backpressure = bool (flat_args[0 ])
2244
+ return []
2245
+ ```
2246
+ The ` backpressure ` flag is read by ` Task.enter ` (defined above) to prevent new
2247
+ tasks from entering the component instance and forcing the guest code to
2248
+ consume resources.
2249
+
2225
2250
### 🔀 ` canon task.start `
2226
2251
2227
2252
For a canonical definition:
0 commit comments