Skip to content
This repository was archived by the owner on May 28, 2025. It is now read-only.

Commit f71cdbb

Browse files
committed
Support blocking for epoll
1 parent dbfd066 commit f71cdbb

File tree

8 files changed

+239
-40
lines changed

8 files changed

+239
-40
lines changed

src/tools/miri/src/concurrency/thread.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ pub enum BlockReason {
172172
Futex { addr: u64 },
173173
/// Blocked on an InitOnce.
174174
InitOnce(InitOnceId),
175+
/// Blocked on epoll
176+
Epoll,
175177
}
176178

177179
/// The state of a thread.

src/tools/miri/src/shims/unix/fd.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,14 @@ impl WeakFileDescriptionRef {
278278
}
279279
}
280280

281+
impl VisitProvenance for WeakFileDescriptionRef {
282+
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
283+
// A weak reference can never be the only reference to some pointer or place.
284+
// Since the actual file description is tracked by strong ref somewhere,
285+
// it is ok to make this a NOP operation.
286+
}
287+
}
288+
281289
/// A unique id for file descriptions. While we could use the address, considering that
282290
/// is definitely unique, the address would expose interpreter internal state when used
283291
/// for sorting things. So instead we generate a unique id per file description that stays

src/tools/miri/src/shims/unix/linux/epoll.rs

Lines changed: 130 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ use std::cell::RefCell;
22
use std::collections::BTreeMap;
33
use std::io;
44
use std::rc::{Rc, Weak};
5+
use std::time::Duration;
56

6-
use crate::shims::unix::fd::{FdId, FileDescriptionRef};
7+
use crate::shims::unix::fd::{FdId, FileDescriptionRef, WeakFileDescriptionRef};
78
use crate::shims::unix::*;
89
use crate::*;
910

@@ -19,6 +20,8 @@ struct Epoll {
1920
// This is an Rc because EpollInterest need to hold a reference to update
2021
// it.
2122
ready_list: Rc<RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>>,
23+
/// A list of thread ids blocked on this epoll instance.
24+
thread_id: RefCell<Vec<ThreadId>>,
2225
}
2326

2427
/// EpollEventInstance contains information that will be returned by epoll_wait.
@@ -58,6 +61,8 @@ pub struct EpollEventInterest {
5861
data: u64,
5962
/// Ready list of the epoll instance under which this EpollEventInterest is registered.
6063
ready_list: Rc<RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>>,
64+
/// The file descriptor value that this EpollEventInterest is registered under.
65+
epfd: i32,
6166
}
6267

6368
/// EpollReadyEvents reflects the readiness of a file description.
@@ -338,6 +343,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
338343
events,
339344
data,
340345
ready_list: Rc::clone(ready_list),
346+
epfd: epfd_value,
341347
}));
342348

343349
if op == epoll_ctl_add {
@@ -395,7 +401,10 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
395401
396402
/// The `timeout` argument specifies the number of milliseconds that
397403
/// `epoll_wait()` will block. Time is measured against the
398-
/// CLOCK_MONOTONIC clock.
404+
/// CLOCK_MONOTONIC clock. If the timeout is zero, the function will not block,
405+
/// while if the timeout is -1, the function will block
406+
/// until at least one event has been retrieved (or an error
407+
/// occurred).
399408
400409
/// A call to `epoll_wait()` will block until either:
401410
/// • a file descriptor delivers an event;
@@ -421,43 +430,115 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
421430
events_op: &OpTy<'tcx>,
422431
maxevents: &OpTy<'tcx>,
423432
timeout: &OpTy<'tcx>,
424-
) -> InterpResult<'tcx, Scalar> {
433+
dest: MPlaceTy<'tcx>,
434+
) -> InterpResult<'tcx> {
425435
let this = self.eval_context_mut();
426436

427-
let epfd = this.read_scalar(epfd)?.to_i32()?;
437+
let epfd_value = this.read_scalar(epfd)?.to_i32()?;
428438
let events = this.read_immediate(events_op)?;
429439
let maxevents = this.read_scalar(maxevents)?.to_i32()?;
430440
let timeout = this.read_scalar(timeout)?.to_i32()?;
431441

432-
if epfd <= 0 || maxevents <= 0 {
442+
if epfd_value <= 0 || maxevents <= 0 {
433443
let einval = this.eval_libc("EINVAL");
434444
this.set_last_error(einval)?;
435-
return Ok(Scalar::from_i32(-1));
445+
this.write_int(-1, &dest)?;
446+
return Ok(());
436447
}
437448

438449
// This needs to come after the maxevents value check, or else maxevents.try_into().unwrap()
439450
// will fail.
440-
let events = this.deref_pointer_as(
451+
let event = this.deref_pointer_as(
441452
&events,
442453
this.libc_array_ty_layout("epoll_event", maxevents.try_into().unwrap()),
443454
)?;
444455

445-
// FIXME: Implement blocking support
446-
if timeout != 0 {
447-
throw_unsup_format!("epoll_wait: timeout value can only be 0");
456+
let Some(epfd) = this.machine.fds.get(epfd_value) else {
457+
let result_value: i32 = this.fd_not_found()?;
458+
this.write_int(result_value, &dest)?;
459+
return Ok(());
460+
};
461+
// Create a weak ref of epfd and pass it to callback so we will make sure that epfd
462+
// is not close after the thread unblocks.
463+
let weak_epfd = epfd.downgrade();
464+
465+
// We just need to know if the ready list is empty and borrow the thread_ids out.
466+
// The whole logic is wrapped inside a block so we don't need to manually drop epfd later.
467+
let ready_list_empty;
468+
let mut thread_ids;
469+
{
470+
let epoll_file_description = epfd
471+
.downcast::<Epoll>()
472+
.ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?;
473+
let binding = epoll_file_description.get_ready_list();
474+
ready_list_empty = binding.borrow_mut().is_empty();
475+
thread_ids = epoll_file_description.thread_id.borrow_mut();
476+
}
477+
if timeout == 0 || !ready_list_empty {
478+
// If the ready list is not empty, or the timeout is 0, we can return immediately.
479+
this.blocking_epoll_callback(epfd_value, weak_epfd, &dest, &event)?;
480+
} else {
481+
// Blocking
482+
let timeout = match timeout {
483+
0.. => {
484+
let duration = Duration::from_millis(timeout.try_into().unwrap());
485+
Some((TimeoutClock::Monotonic, TimeoutAnchor::Relative, duration))
486+
}
487+
-1 => None,
488+
..-1 => {
489+
throw_unsup_format!(
490+
"epoll_wait: Only timeout values greater than -1 are supported."
491+
);
492+
}
493+
};
494+
thread_ids.push(this.active_thread());
495+
this.block_thread(
496+
BlockReason::Epoll,
497+
timeout,
498+
callback!(
499+
@capture<'tcx> {
500+
epfd_value: i32,
501+
weak_epfd: WeakFileDescriptionRef,
502+
dest: MPlaceTy<'tcx>,
503+
event: MPlaceTy<'tcx>,
504+
}
505+
@unblock = |this| {
506+
this.blocking_epoll_callback(epfd_value, weak_epfd, &dest, &event)?;
507+
Ok(())
508+
}
509+
@timeout = |this| {
510+
// No notification after blocking timeout.
511+
this.write_int(0, &dest)?;
512+
Ok(())
513+
}
514+
),
515+
);
448516
}
517+
Ok(())
518+
}
449519

450-
let Some(epfd) = this.machine.fds.get(epfd) else {
451-
return Ok(Scalar::from_i32(this.fd_not_found()?));
520+
/// Callback function after epoll_wait unblocks
521+
fn blocking_epoll_callback(
522+
&mut self,
523+
epfd_value: i32,
524+
weak_epfd: WeakFileDescriptionRef,
525+
dest: &MPlaceTy<'tcx>,
526+
event: &MPlaceTy<'tcx>,
527+
) -> InterpResult<'tcx> {
528+
let this = self.eval_context_mut();
529+
530+
let Some(epfd) = weak_epfd.upgrade() else {
531+
throw_unsup_format!("epoll FD {epfd_value} is closed while blocking.")
452532
};
533+
453534
let epoll_file_description = epfd
454535
.downcast::<Epoll>()
455536
.ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?;
456537

457538
let ready_list = epoll_file_description.get_ready_list();
458539
let mut ready_list = ready_list.borrow_mut();
459540
let mut num_of_events: i32 = 0;
460-
let mut array_iter = this.project_array_fields(&events)?;
541+
let mut array_iter = this.project_array_fields(event)?;
461542

462543
while let Some(des) = array_iter.next(this)? {
463544
if let Some(epoll_event_instance) = ready_list_next(this, &mut ready_list) {
@@ -473,7 +554,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
473554
break;
474555
}
475556
}
476-
Ok(Scalar::from_i32(num_of_events))
557+
this.write_int(num_of_events, dest)?;
558+
Ok(())
477559
}
478560

479561
/// For a specific file description, get its ready events and update the corresponding ready
@@ -483,17 +565,42 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
483565
///
484566
/// This *will* report an event if anyone is subscribed to it, without any further filtering, so
485567
/// do not call this function when an FD didn't have anything happen to it!
486-
fn check_and_update_readiness(&self, fd_ref: &FileDescriptionRef) -> InterpResult<'tcx, ()> {
487-
let this = self.eval_context_ref();
568+
fn check_and_update_readiness(
569+
&mut self,
570+
fd_ref: &FileDescriptionRef,
571+
) -> InterpResult<'tcx, ()> {
572+
let this = self.eval_context_mut();
488573
let id = fd_ref.get_id();
574+
let mut waiter = Vec::new();
489575
// Get a list of EpollEventInterest that is associated to a specific file description.
490576
if let Some(epoll_interests) = this.machine.epoll_interests.get_epoll_interest(id) {
491577
for weak_epoll_interest in epoll_interests {
492578
if let Some(epoll_interest) = weak_epoll_interest.upgrade() {
493-
check_and_update_one_event_interest(fd_ref, epoll_interest, id, this)?;
579+
let is_updated = check_and_update_one_event_interest(fd_ref, epoll_interest, id, this)?;
580+
if is_updated {
581+
// Edge-triggered notification only notify one thread even if there are
582+
// multiple threads block on the same epfd.
583+
let epfd = this.machine.fds.get(epoll_event_interest.epfd).unwrap();
584+
// FIXME: We can randomly pick a thread to unblock.
585+
586+
// This unwrap can never fail because if the current epoll instance were
587+
// closed and its epfd value reused, the upgrade of weak_epoll_interest
588+
// above would fail. This guarantee holds because only the epoll instance
589+
// holds a strong ref to epoll_interest.
590+
if let Some(thread_id) =
591+
epfd.downcast::<Epoll>().unwrap().thread_id.borrow_mut().pop()
592+
{
593+
waiter.push(thread_id);
594+
};
595+
}
494596
}
495597
}
496598
}
599+
waiter.sort();
600+
waiter.dedup();
601+
for thread_id in waiter {
602+
this.unblock_thread(thread_id, BlockReason::Epoll)?;
603+
}
497604
Ok(())
498605
}
499606
}
@@ -517,14 +624,15 @@ fn ready_list_next(
517624
}
518625

519626
/// This helper function checks whether an epoll notification should be triggered for a specific
520-
/// epoll_interest and, if necessary, triggers the notification. Unlike check_and_update_readiness,
521-
/// this function sends a notification to only one epoll instance.
627+
/// epoll_interest and, if necessary, triggers the notification, and returns whether the
628+
/// event interest was updated. Unlike check_and_update_readiness, this function sends a
629+
/// notification to only one epoll instance.
522630
fn check_and_update_one_event_interest<'tcx>(
523631
fd_ref: &FileDescriptionRef,
524632
interest: Rc<RefCell<EpollEventInterest>>,
525633
id: FdId,
526634
ecx: &MiriInterpCx<'tcx>,
527-
) -> InterpResult<'tcx> {
635+
) -> InterpResult<'tcx, bool> {
528636
// Get the bitmask of ready events for a file description.
529637
let ready_events_bitmask = fd_ref.get_epoll_ready_events()?.get_event_bitmask(ecx);
530638
let epoll_event_interest = interest.borrow();
@@ -539,6 +647,7 @@ fn check_and_update_one_event_interest<'tcx>(
539647
let event_instance = EpollEventInstance::new(flags, epoll_event_interest.data);
540648
// Triggers the notification by inserting it to the ready list.
541649
ready_list.insert(epoll_key, event_instance);
650+
return Ok(true);
542651
}
543-
Ok(())
652+
return Ok(false);
544653
}

src/tools/miri/src/shims/unix/linux/foreign_items.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
6262
"epoll_wait" => {
6363
let [epfd, events, maxevents, timeout] =
6464
this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
65-
let result = this.epoll_wait(epfd, events, maxevents, timeout)?;
66-
this.write_scalar(result, dest)?;
65+
this.epoll_wait(epfd, events, maxevents, timeout, dest.clone())?;
6766
}
6867
"eventfd" => {
6968
let [val, flag] =

src/tools/miri/tests/fail-dep/tokio/sleep.stderr

Lines changed: 0 additions & 15 deletions
This file was deleted.

0 commit comments

Comments
 (0)