Skip to content

Commit 4aac3cd

Browse files
aykevldeadprogram
authored andcommitted
sync: implement WaitGroup using a futex
This prepares sync.WaitGroup for multithreading. Code size for the cooperative scheduler is nearly unchanged.
1 parent 2588bf7 commit 4aac3cd

File tree

1 file changed

+61
-30
lines changed

1 file changed

+61
-30
lines changed

src/sync/waitgroup.go

Lines changed: 61 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,65 @@ package sync
33
import "internal/task"
44

55
type WaitGroup struct {
6-
counter uint
7-
waiters task.Stack
6+
futex task.Futex
87
}
98

109
func (wg *WaitGroup) Add(delta int) {
11-
if delta > 0 {
12-
// Check for overflow.
13-
if uint(delta) > (^uint(0))-wg.counter {
14-
panic("sync: WaitGroup counter overflowed")
15-
}
10+
switch {
11+
case delta > 0:
12+
// Delta is positive.
13+
for {
14+
// Check for overflow.
15+
counter := wg.futex.Load()
16+
if uint32(delta) > (^uint32(0))-counter {
17+
panic("sync: WaitGroup counter overflowed")
18+
}
1619

17-
// Add to the counter.
18-
wg.counter += uint(delta)
19-
} else {
20-
// Check for underflow.
21-
if uint(-delta) > wg.counter {
22-
panic("sync: negative WaitGroup counter")
20+
// Add to the counter.
21+
if wg.futex.CompareAndSwap(counter, counter+uint32(delta)) {
22+
// Successfully added.
23+
return
24+
}
2325
}
26+
default:
27+
// Delta is negative (or zero).
28+
for {
29+
counter := wg.futex.Load()
2430

25-
// Subtract from the counter.
26-
wg.counter -= uint(-delta)
31+
// Check for underflow.
32+
if uint32(-delta) > counter {
33+
panic("sync: negative WaitGroup counter")
34+
}
35+
36+
// Subtract from the counter.
37+
if !wg.futex.CompareAndSwap(counter, counter-uint32(-delta)) {
38+
// Could not swap, trying again.
39+
continue
40+
}
2741

28-
// If the counter is zero, everything is done and the waiters should be resumed.
29-
// This code assumes that the waiters cannot wake up until after this function returns.
30-
// In the current implementation, this is always correct.
31-
if wg.counter == 0 {
32-
for t := wg.waiters.Pop(); t != nil; t = wg.waiters.Pop() {
33-
scheduleTask(t)
42+
// If the counter is zero, everything is done and the waiters should
43+
// be resumed.
44+
// When there are multiple thread, there is a chance for the counter
45+
// to go to zero, WakeAll to be called, and then the counter to be
46+
// incremented again before a waiting goroutine has a chance to
47+
// check the new (zero) value. However the last increment is
48+
// explicitly given in the docs as something that should not be
49+
// done:
50+
//
51+
// > Note that calls with a positive delta that occur when the
52+
// > counter is zero must happen before a Wait.
53+
//
54+
// So we're fine here.
55+
if counter-uint32(-delta) == 0 {
56+
// TODO: this is not the most efficient implementation possible
57+
// because we wake up all waiters unconditionally, even if there
58+
// might be none. Though since the common usage is for this to
59+
// be called with at least one waiter, it's probably fine.
60+
wg.futex.WakeAll()
3461
}
62+
63+
// Successfully swapped (and woken all waiting tasks if needed).
64+
return
3565
}
3666
}
3767
}
@@ -41,14 +71,15 @@ func (wg *WaitGroup) Done() {
4171
}
4272

4373
func (wg *WaitGroup) Wait() {
44-
if wg.counter == 0 {
45-
// Everything already finished.
46-
return
47-
}
48-
49-
// Push the current goroutine onto the waiter stack.
50-
wg.waiters.Push(task.Current())
74+
for {
75+
counter := wg.futex.Load()
76+
if counter == 0 {
77+
return // everything already finished
78+
}
5179

52-
// Pause until the waiters are awoken by Add/Done.
53-
task.Pause()
80+
if wg.futex.Wait(counter) {
81+
// Successfully woken by WakeAll (in wg.Add).
82+
break
83+
}
84+
}
5485
}

0 commit comments

Comments
 (0)