Skip to content

Commit dfa8d2f

Browse files
committed
wake up all futex threads at the same time
On macOS, the futex operations return the number of outstanding waiters. For this count to be correct, we have to remove all eligible threads from the internal futex list before any of the wakeup callbacks are run. This has the nice side-effect of making the runtime removal operation linear in the number of threads instead of quadratic.
1 parent a93a671 commit dfa8d2f

File tree

6 files changed

+87
-31
lines changed

6 files changed

+87
-31
lines changed

src/concurrency/sync.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ struct Condvar {
128128
/// The futex state.
129129
#[derive(Default, Debug)]
130130
struct Futex {
131-
waiters: VecDeque<FutexWaiter>,
131+
waiters: Vec<FutexWaiter>,
132132
/// Tracks the happens-before relationship
133133
/// between a futex-wake and a futex-wait
134134
/// during a non-spurious wake event.
@@ -748,7 +748,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
748748
let mut futex = futex_ref.0.borrow_mut();
749749
let waiters = &mut futex.waiters;
750750
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
751-
waiters.push_back(FutexWaiter { thread, bitset });
751+
waiters.push(FutexWaiter { thread, bitset });
752752
drop(futex);
753753

754754
this.block_thread(
@@ -782,9 +782,14 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
782782
);
783783
}
784784

785-
/// Wake up the first thread in the queue that matches any of the bits in the bitset.
786-
/// Returns whether anything was woken.
787-
fn futex_wake(&mut self, futex_ref: &FutexRef, bitset: u32) -> InterpResult<'tcx, bool> {
785+
/// Wake up `count` of the threads in the queue that match any of the bits
786+
/// in the bitset. Returns how many threads were woken.
787+
fn futex_wake(
788+
&mut self,
789+
futex_ref: &FutexRef,
790+
bitset: u32,
791+
count: usize,
792+
) -> InterpResult<'tcx, usize> {
788793
let this = self.eval_context_mut();
789794
let mut futex = futex_ref.0.borrow_mut();
790795
let data_race = &this.machine.data_race;
@@ -794,13 +799,18 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
794799
data_race.release_clock(&this.machine.threads, |clock| futex.clock.clone_from(clock));
795800
}
796801

797-
// Wake up the first thread in the queue that matches any of the bits in the bitset.
798-
let Some(i) = futex.waiters.iter().position(|w| w.bitset & bitset != 0) else {
799-
return interp_ok(false);
800-
};
801-
let waiter = futex.waiters.remove(i).unwrap();
802+
// Remove `count` of the threads in the queue that match any of the bits in the bitset.
803+
// We collect all of them before unblocking because the unblock callback may access the
804+
// futex state to retrieve the remaining number of waiters on macOS.
805+
let waiters: Vec<_> =
806+
futex.waiters.extract_if(.., |w| w.bitset & bitset != 0).take(count).collect();
802807
drop(futex);
803-
this.unblock_thread(waiter.thread, BlockReason::Futex)?;
804-
interp_ok(true)
808+
809+
let woken = waiters.len();
810+
for waiter in waiters {
811+
this.unblock_thread(waiter.thread, BlockReason::Futex)?;
812+
}
813+
814+
interp_ok(woken)
805815
}
806816
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#![feature(derive_coerce_pointee)]
1717
#![feature(arbitrary_self_types)]
1818
#![feature(unsigned_is_multiple_of)]
19+
#![feature(extract_if)]
1920
// Configure clippy and other lints
2021
#![allow(
2122
clippy::collapsible_else_if,

src/shims/unix/linux_like/sync.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -215,16 +215,8 @@ pub fn futex<'tcx>(
215215
// will see the latest value on addr which could be changed by our caller
216216
// before doing the syscall.
217217
ecx.atomic_fence(AtomicFenceOrd::SeqCst)?;
218-
let mut n = 0;
219-
#[expect(clippy::arithmetic_side_effects)]
220-
for _ in 0..val {
221-
if ecx.futex_wake(&futex_ref, bitset)? {
222-
n += 1;
223-
} else {
224-
break;
225-
}
226-
}
227-
ecx.write_scalar(Scalar::from_target_isize(n, ecx), dest)?;
218+
let woken = ecx.futex_wake(&futex_ref, bitset, val.try_into().unwrap())?;
219+
ecx.write_scalar(Scalar::from_target_isize(woken.try_into().unwrap(), ecx), dest)?;
228220
}
229221
op => throw_unsup_format!("Miri does not support `futex` syscall with op={}", op),
230222
}

src/shims/unix/macos/sync.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -257,13 +257,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
257257

258258
// See the Linux futex implementation for why this fence exists.
259259
this.atomic_fence(AtomicFenceOrd::SeqCst)?;
260-
261-
if all {
262-
while this.futex_wake(&futex_ref, u32::MAX)? {}
263-
} else {
264-
this.futex_wake(&futex_ref, u32::MAX)?;
265-
}
266-
260+
this.futex_wake(&futex_ref, u32::MAX, if all { usize::MAX } else { 1 })?;
267261
this.write_scalar(Scalar::from_i32(0), dest)?;
268262
interp_ok(())
269263
}

src/shims/windows/sync.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
255255
};
256256
let futex_ref = futex_ref.futex.clone();
257257

258-
this.futex_wake(&futex_ref, u32::MAX)?;
258+
this.futex_wake(&futex_ref, u32::MAX, 1)?;
259259

260260
interp_ok(())
261261
}
@@ -275,7 +275,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
275275
};
276276
let futex_ref = futex_ref.futex.clone();
277277

278-
while this.futex_wake(&futex_ref, u32::MAX)? {}
278+
this.futex_wake(&futex_ref, u32::MAX, usize::MAX)?;
279279

280280
interp_ok(())
281281
}

tests/pass-dep/concurrency/apple-futex.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,64 @@ fn wait_wake() {
149149
t.join().unwrap();
150150
}
151151

152+
fn wait_wake_multiple() {
153+
let val = 0i32;
154+
let futex = &val;
155+
156+
thread::scope(|s| {
157+
// Spawn some threads and make them wait on the futex.
158+
for i in 0..4 {
159+
s.spawn(move || unsafe {
160+
assert_eq!(
161+
libc::os_sync_wait_on_address(
162+
ptr::from_ref(futex).cast_mut().cast(),
163+
0,
164+
size_of::<i32>(),
165+
libc::OS_SYNC_WAIT_ON_ADDRESS_NONE,
166+
),
167+
// The last two threads will be woken at the same time,
168+
// but for the first two threads the remaining number
169+
// of waiters should be strictly decreasing.
170+
if i < 2 { 3 - i } else { 0 },
171+
);
172+
});
173+
174+
thread::sleep(Duration::from_millis(200));
175+
}
176+
177+
// Wake the threads up again.
178+
unsafe {
179+
assert_eq!(
180+
libc::os_sync_wake_by_address_any(
181+
ptr::from_ref(futex).cast_mut().cast(),
182+
size_of::<i32>(),
183+
libc::OS_SYNC_WAKE_BY_ADDRESS_NONE,
184+
),
185+
0
186+
);
187+
188+
assert_eq!(
189+
libc::os_sync_wake_by_address_any(
190+
ptr::from_ref(futex).cast_mut().cast(),
191+
size_of::<i32>(),
192+
libc::OS_SYNC_WAKE_BY_ADDRESS_NONE,
193+
),
194+
0
195+
);
196+
197+
// Wake both remaining threads at the same time.
198+
assert_eq!(
199+
libc::os_sync_wake_by_address_all(
200+
ptr::from_ref(futex).cast_mut().cast(),
201+
size_of::<i32>(),
202+
libc::OS_SYNC_WAKE_BY_ADDRESS_NONE,
203+
),
204+
0
205+
);
206+
}
207+
})
208+
}
209+
152210
fn param_mismatch() {
153211
let futex = 0;
154212
thread::scope(|s| {
@@ -209,5 +267,6 @@ fn main() {
209267
wait_timeout();
210268
wait_absolute_timeout();
211269
wait_wake();
270+
wait_wake_multiple();
212271
param_mismatch();
213272
}

0 commit comments

Comments
 (0)