Skip to content

Commit 0b76b73

Browse files
authored
Merge pull request #4408 from Patrick-6/rework-thread-join
Only write join return value once join succeeds
2 parents 18da864 + 1a9d0f4 commit 0b76b73

File tree

5 files changed

+111
-104
lines changed

5 files changed

+111
-104
lines changed

src/concurrency/thread.rs

Lines changed: 91 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -582,88 +582,6 @@ impl<'tcx> ThreadManager<'tcx> {
582582
interp_ok(())
583583
}
584584

585-
/// Mark that the active thread tries to join the thread with `joined_thread_id`.
586-
fn join_thread(
587-
&mut self,
588-
joined_thread_id: ThreadId,
589-
data_race_handler: &mut GlobalDataRaceHandler,
590-
) -> InterpResult<'tcx> {
591-
if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
592-
// On Windows this corresponds to joining on a closed handle.
593-
throw_ub_format!("trying to join a detached thread");
594-
}
595-
596-
fn after_join<'tcx>(
597-
threads: &mut ThreadManager<'_>,
598-
joined_thread_id: ThreadId,
599-
data_race_handler: &mut GlobalDataRaceHandler,
600-
) -> InterpResult<'tcx> {
601-
match data_race_handler {
602-
GlobalDataRaceHandler::None => {}
603-
GlobalDataRaceHandler::Vclocks(data_race) =>
604-
data_race.thread_joined(threads, joined_thread_id),
605-
GlobalDataRaceHandler::Genmc(genmc_ctx) =>
606-
genmc_ctx.handle_thread_join(threads.active_thread, joined_thread_id)?,
607-
}
608-
interp_ok(())
609-
}
610-
611-
// Mark the joined thread as being joined so that we detect if other
612-
// threads try to join it.
613-
self.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
614-
if !self.threads[joined_thread_id].state.is_terminated() {
615-
trace!(
616-
"{:?} blocked on {:?} when trying to join",
617-
self.active_thread, joined_thread_id
618-
);
619-
// The joined thread is still running, we need to wait for it.
620-
// Unce we get unblocked, perform the appropriate synchronization.
621-
self.block_thread(
622-
BlockReason::Join(joined_thread_id),
623-
None,
624-
callback!(
625-
@capture<'tcx> {
626-
joined_thread_id: ThreadId,
627-
}
628-
|this, unblock: UnblockKind| {
629-
assert_eq!(unblock, UnblockKind::Ready);
630-
after_join(&mut this.machine.threads, joined_thread_id, &mut this.machine.data_race)
631-
}
632-
),
633-
);
634-
} else {
635-
// The thread has already terminated - establish happens-before
636-
after_join(self, joined_thread_id, data_race_handler)?;
637-
}
638-
interp_ok(())
639-
}
640-
641-
/// Mark that the active thread tries to exclusively join the thread with `joined_thread_id`.
642-
/// If the thread is already joined by another thread, it will throw UB
643-
fn join_thread_exclusive(
644-
&mut self,
645-
joined_thread_id: ThreadId,
646-
data_race_handler: &mut GlobalDataRaceHandler,
647-
) -> InterpResult<'tcx> {
648-
if self.threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
649-
throw_ub_format!("trying to join an already joined thread");
650-
}
651-
652-
if joined_thread_id == self.active_thread {
653-
throw_ub_format!("trying to join itself");
654-
}
655-
656-
// Sanity check `join_status`.
657-
assert!(
658-
self.threads
659-
.iter()
660-
.all(|thread| { !thread.state.is_blocked_on(BlockReason::Join(joined_thread_id)) }),
661-
"this thread already has threads waiting for its termination"
662-
);
663-
664-
self.join_thread(joined_thread_id, data_race_handler)
665-
}
666-
667585
/// Set the name of the given thread.
668586
pub fn set_thread_name(&mut self, thread: ThreadId, new_thread_name: Vec<u8>) {
669587
self.threads[thread].thread_name = Some(new_thread_name);
@@ -1114,20 +1032,102 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
11141032
this.machine.threads.detach_thread(thread_id, allow_terminated_joined)
11151033
}
11161034

1117-
#[inline]
1118-
fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
1035+
/// Mark that the active thread tries to join the thread with `joined_thread_id`.
1036+
///
1037+
/// When the join is successful (immediately, or as soon as the joined thread finishes), `success_retval` will be written to `return_dest`.
1038+
fn join_thread(
1039+
&mut self,
1040+
joined_thread_id: ThreadId,
1041+
success_retval: Scalar,
1042+
return_dest: &MPlaceTy<'tcx>,
1043+
) -> InterpResult<'tcx> {
11191044
let this = self.eval_context_mut();
1120-
this.machine.threads.join_thread(joined_thread_id, &mut this.machine.data_race)?;
1045+
let thread_mgr = &mut this.machine.threads;
1046+
if thread_mgr.threads[joined_thread_id].join_status == ThreadJoinStatus::Detached {
1047+
// On Windows this corresponds to joining on a closed handle.
1048+
throw_ub_format!("trying to join a detached thread");
1049+
}
1050+
1051+
fn after_join<'tcx>(
1052+
this: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
1053+
joined_thread_id: ThreadId,
1054+
success_retval: Scalar,
1055+
return_dest: &MPlaceTy<'tcx>,
1056+
) -> InterpResult<'tcx> {
1057+
let threads = &this.machine.threads;
1058+
match &mut this.machine.data_race {
1059+
GlobalDataRaceHandler::None => {}
1060+
GlobalDataRaceHandler::Vclocks(data_race) =>
1061+
data_race.thread_joined(threads, joined_thread_id),
1062+
GlobalDataRaceHandler::Genmc(genmc_ctx) =>
1063+
genmc_ctx.handle_thread_join(threads.active_thread, joined_thread_id)?,
1064+
}
1065+
this.write_scalar(success_retval, return_dest)?;
1066+
interp_ok(())
1067+
}
1068+
1069+
// Mark the joined thread as being joined so that we detect if other
1070+
// threads try to join it.
1071+
thread_mgr.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined;
1072+
if !thread_mgr.threads[joined_thread_id].state.is_terminated() {
1073+
trace!(
1074+
"{:?} blocked on {:?} when trying to join",
1075+
thread_mgr.active_thread, joined_thread_id
1076+
);
1077+
// The joined thread is still running, we need to wait for it.
1078+
// Once we get unblocked, perform the appropriate synchronization and write the return value.
1079+
let dest = return_dest.clone();
1080+
thread_mgr.block_thread(
1081+
BlockReason::Join(joined_thread_id),
1082+
None,
1083+
callback!(
1084+
@capture<'tcx> {
1085+
joined_thread_id: ThreadId,
1086+
dest: MPlaceTy<'tcx>,
1087+
success_retval: Scalar,
1088+
}
1089+
|this, unblock: UnblockKind| {
1090+
assert_eq!(unblock, UnblockKind::Ready);
1091+
after_join(this, joined_thread_id, success_retval, &dest)
1092+
}
1093+
),
1094+
);
1095+
} else {
1096+
// The thread has already terminated - establish happens-before and write the return value.
1097+
after_join(this, joined_thread_id, success_retval, return_dest)?;
1098+
}
11211099
interp_ok(())
11221100
}
11231101

1124-
#[inline]
1125-
fn join_thread_exclusive(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
1102+
/// Mark that the active thread tries to exclusively join the thread with `joined_thread_id`.
1103+
/// If the thread is already joined by another thread, it will throw UB.
1104+
///
1105+
/// When the join is successful (immediately, or as soon as the joined thread finishes), `success_retval` will be written to `return_dest`.
1106+
fn join_thread_exclusive(
1107+
&mut self,
1108+
joined_thread_id: ThreadId,
1109+
success_retval: Scalar,
1110+
return_dest: &MPlaceTy<'tcx>,
1111+
) -> InterpResult<'tcx> {
11261112
let this = self.eval_context_mut();
1127-
this.machine
1128-
.threads
1129-
.join_thread_exclusive(joined_thread_id, &mut this.machine.data_race)?;
1130-
interp_ok(())
1113+
let threads = &this.machine.threads.threads;
1114+
if threads[joined_thread_id].join_status == ThreadJoinStatus::Joined {
1115+
throw_ub_format!("trying to join an already joined thread");
1116+
}
1117+
1118+
if joined_thread_id == this.machine.threads.active_thread {
1119+
throw_ub_format!("trying to join itself");
1120+
}
1121+
1122+
// Sanity check `join_status`.
1123+
assert!(
1124+
threads
1125+
.iter()
1126+
.all(|thread| { !thread.state.is_blocked_on(BlockReason::Join(joined_thread_id)) }),
1127+
"this thread already has threads waiting for its termination"
1128+
);
1129+
1130+
this.join_thread(joined_thread_id, success_retval, return_dest)
11311131
}
11321132

11331133
#[inline]

src/shims/unix/foreign_items.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -946,8 +946,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
946946
}
947947
"pthread_join" => {
948948
let [thread, retval] = this.check_shim(abi, CanonAbi::C, link_name, args)?;
949-
let res = this.pthread_join(thread, retval)?;
950-
this.write_scalar(res, dest)?;
949+
this.pthread_join(thread, retval, dest)?;
951950
}
952951
"pthread_detach" => {
953952
let [thread] = this.check_shim(abi, CanonAbi::C, link_name, args)?;

src/shims/unix/thread.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
4141
&mut self,
4242
thread: &OpTy<'tcx>,
4343
retval: &OpTy<'tcx>,
44-
) -> InterpResult<'tcx, Scalar> {
44+
return_dest: &MPlaceTy<'tcx>,
45+
) -> InterpResult<'tcx> {
4546
let this = self.eval_context_mut();
4647

4748
if !this.ptr_is_null(this.read_pointer(retval)?)? {
@@ -51,12 +52,15 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
5152

5253
let thread = this.read_scalar(thread)?.to_int(this.libc_ty_layout("pthread_t").size)?;
5354
let Ok(thread) = this.thread_id_try_from(thread) else {
54-
return interp_ok(this.eval_libc("ESRCH"));
55+
this.write_scalar(this.eval_libc("ESRCH"), return_dest)?;
56+
return interp_ok(());
5557
};
5658

57-
this.join_thread_exclusive(thread)?;
58-
59-
interp_ok(Scalar::from_u32(0))
59+
this.join_thread_exclusive(
60+
thread,
61+
/* success_retval */ Scalar::from_u32(0),
62+
return_dest,
63+
)
6064
}
6165

6266
fn pthread_detach(&mut self, thread: &OpTy<'tcx>) -> InterpResult<'tcx, Scalar> {

src/shims/windows/foreign_items.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -573,8 +573,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
573573
"WaitForSingleObject" => {
574574
let [handle, timeout] = this.check_shim(abi, sys_conv, link_name, args)?;
575575

576-
let ret = this.WaitForSingleObject(handle, timeout)?;
577-
this.write_scalar(ret, dest)?;
576+
this.WaitForSingleObject(handle, timeout, dest)?;
578577
}
579578
"GetCurrentProcess" => {
580579
let [] = this.check_shim(abi, sys_conv, link_name, args)?;

src/shims/windows/thread.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,14 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
5959
&mut self,
6060
handle_op: &OpTy<'tcx>,
6161
timeout_op: &OpTy<'tcx>,
62-
) -> InterpResult<'tcx, Scalar> {
62+
return_dest: &MPlaceTy<'tcx>,
63+
) -> InterpResult<'tcx> {
6364
let this = self.eval_context_mut();
6465

6566
let handle = this.read_handle(handle_op, "WaitForSingleObject")?;
6667
let timeout = this.read_scalar(timeout_op)?.to_u32()?;
6768

68-
let thread = match handle {
69+
let joined_thread_id = match handle {
6970
Handle::Thread(thread) => thread,
7071
// Unlike on posix, the outcome of joining the current thread is not documented.
7172
// On current Windows, it just deadlocks.
@@ -77,8 +78,12 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
7778
throw_unsup_format!("`WaitForSingleObject` with non-infinite timeout");
7879
}
7980

80-
this.join_thread(thread)?;
81+
this.join_thread(
82+
joined_thread_id,
83+
/* success_retval */ this.eval_windows("c", "WAIT_OBJECT_0"),
84+
return_dest,
85+
)?;
8186

82-
interp_ok(this.eval_windows("c", "WAIT_OBJECT_0"))
87+
interp_ok(())
8388
}
8489
}

0 commit comments

Comments
 (0)