Skip to content

Commit 2a40d9b

Browse files
committed
More aggressive vector index re-use, and added some tests.
1 parent c70bbea commit 2a40d9b

File tree

5 files changed

+254
-62
lines changed

5 files changed

+254
-62
lines changed

src/data_race.rs

Lines changed: 129 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::{
2020
use rustc_index::vec::{Idx, IndexVec};
2121
use rustc_target::abi::Size;
2222
use rustc_middle::ty::layout::TyAndLayout;
23-
use rustc_data_structures::fx::FxHashSet;
23+
use rustc_data_structures::fx::{FxHashSet, FxHashMap};
2424

2525
use crate::{
2626
MiriEvalContext, MiriEvalContextExt,
@@ -662,7 +662,7 @@ impl VClockAlloc {
662662
let (index, clocks) = self.global.current_thread_state();
663663
let mut alloc_ranges = self.alloc_ranges.borrow_mut();
664664
for (_,range) in alloc_ranges.iter_mut(pointer.offset, len) {
665-
if range.read_race_detect(&*clocks, index) == Err(DataRace) {
665+
if let Err(DataRace) = range.read_race_detect(&*clocks, index) {
666666
// Report data-race
667667
return Self::report_data_race(
668668
&self.global,range, "READ", false, pointer, len
@@ -674,18 +674,17 @@ impl VClockAlloc {
674674
Ok(())
675675
}
676676
}
677-
/// Detect data-races for an unsychronized write operation, will not perform
678-
/// data-race threads if `multi-threaded` is false, either due to no threads
679-
/// being created or if it is temporarily disabled during a racy read or write
680-
/// operation
681-
pub fn write<'tcx>(&mut self, pointer: Pointer<Tag>, len: Size) -> InterpResult<'tcx> {
677+
678+
679+
// Shared code for detecting data-races on unique access to a section of memory
680+
fn unique_access<'tcx>(&mut self, pointer: Pointer<Tag>, len: Size, action: &str) -> InterpResult<'tcx> {
682681
if self.global.multi_threaded.get() {
683682
let (index, clocks) = self.global.current_thread_state();
684683
for (_,range) in self.alloc_ranges.get_mut().iter_mut(pointer.offset, len) {
685-
if range.write_race_detect(&*clocks, index) == Err(DataRace) {
684+
if let Err(DataRace) = range.write_race_detect(&*clocks, index) {
686685
// Report data-race
687686
return Self::report_data_race(
688-
&self.global, range, "WRITE", false, pointer, len
687+
&self.global, range, action, false, pointer, len
689688
);
690689
}
691690
}
@@ -694,25 +693,20 @@ impl VClockAlloc {
694693
Ok(())
695694
}
696695
}
696+
697+
/// Detect data-races for an unsychronized write operation, will not perform
698+
/// data-race threads if `multi-threaded` is false, either due to no threads
699+
/// being created or if it is temporarily disabled during a racy read or write
700+
/// operation
701+
pub fn write<'tcx>(&mut self, pointer: Pointer<Tag>, len: Size) -> InterpResult<'tcx> {
702+
self.unique_access(pointer, len, "Write")
703+
}
697704
/// Detect data-races for an unsychronized deallocate operation, will not perform
698705
/// data-race threads if `multi-threaded` is false, either due to no threads
699706
/// being created or if it is temporarily disabled during a racy read or write
700707
/// operation
701708
pub fn deallocate<'tcx>(&mut self, pointer: Pointer<Tag>, len: Size) -> InterpResult<'tcx> {
702-
if self.global.multi_threaded.get() {
703-
let (index, clocks) = self.global.current_thread_state();
704-
for (_,range) in self.alloc_ranges.get_mut().iter_mut(pointer.offset, len) {
705-
if range.write_race_detect(&*clocks, index) == Err(DataRace) {
706-
// Report data-race
707-
return Self::report_data_race(
708-
&self.global, range, "DEALLOCATE", false, pointer, len
709-
);
710-
}
711-
}
712-
Ok(())
713-
}else{
714-
Ok(())
715-
}
709+
self.unique_access(pointer, len, "Deallocate")
716710
}
717711
}
718712

@@ -773,6 +767,8 @@ struct ThreadExtraState {
773767
/// The current vector index in use by the
774768
/// thread currently, this is set to None
775769
/// after the vector index has been re-used
770+
/// and hence the value will never need to be
771+
/// read during data-race reporting
776772
vector_index: Option<VectorIdx>,
777773

778774
/// The name of the thread, updated for better
@@ -782,10 +778,8 @@ struct ThreadExtraState {
782778

783779
/// Thread termination vector clock, this
784780
/// is set on thread termination and is used
785-
/// for joining on threads that have already
786-
/// terminated. This should be used first
787-
/// on joining as there is the possibility
788-
/// that `vector_index` is None in some cases
781+
/// for joining on threads since the vector_index
782+
/// may be re-used when the join operation occurs
789783
termination_vector_clock: Option<VClock>,
790784
}
791785

@@ -820,10 +814,26 @@ pub struct GlobalState {
820814
current_index: Cell<VectorIdx>,
821815

822816
/// Potential vector indices that could be re-used on thread creation
823-
/// values are inserted here on thread termination, vector index values
824-
/// are then re-used once all the termination event happens-before all
825-
/// existing thread-clocks
817+
/// values are inserted here on after the thread has terminated and
818+
/// been joined with, and hence may potentially become free
819+
/// for use as the index for a new thread.
820+
/// Elements in this set may still require the vector index to
821+
/// report data-races, and can only be re-used after all
822+
/// active vector-clocks catch up with the threads timestamp.
826823
reuse_candidates: RefCell<FxHashSet<VectorIdx>>,
824+
825+
/// Counts the number of threads that are currently active
826+
/// if the number of active threads reduces to 1 and then
827+
/// a join operation occures with the remaining main thread
828+
/// then multi-threaded execution may be disabled
829+
active_thread_count: Cell<usize>,
830+
831+
/// This contains threads that have terminated, but not yet joined
832+
/// and so cannot become re-use candidates until a join operation
833+
/// occurs.
834+
/// The associated vector index will be moved into re-use candidates
835+
/// after the join operation occurs
836+
terminated_threads: RefCell<FxHashMap<ThreadId, VectorIdx>>,
827837
}
828838
impl GlobalState {
829839

@@ -836,7 +846,9 @@ impl GlobalState {
836846
vector_info: RefCell::new(IndexVec::new()),
837847
thread_info: RefCell::new(IndexVec::new()),
838848
current_index: Cell::new(VectorIdx::new(0)),
849+
active_thread_count: Cell::new(1),
839850
reuse_candidates: RefCell::new(FxHashSet::default()),
851+
terminated_threads: RefCell::new(FxHashMap::default())
840852
};
841853

842854
// Setup the main-thread since it is not explicitly created:
@@ -860,10 +872,24 @@ impl GlobalState {
860872
fn find_vector_index_reuse_candidate(&self) -> Option<VectorIdx> {
861873
let mut reuse = self.reuse_candidates.borrow_mut();
862874
let vector_clocks = self.vector_clocks.borrow();
875+
let vector_info = self.vector_info.borrow();
876+
let terminated_threads = self.terminated_threads.borrow();
863877
for &candidate in reuse.iter() {
864878
let target_timestamp = vector_clocks[candidate].clock[candidate];
865-
if vector_clocks.iter().all(|clock| {
866-
clock.clock[candidate] == target_timestamp
879+
if vector_clocks.iter_enumerated().all(|(clock_idx, clock)| {
880+
// The thread happens before the clock, and hence cannot report
881+
// a data-race with this the candidate index
882+
let no_data_race = clock.clock[candidate] >= target_timestamp;
883+
884+
// The vector represents a thread that has terminated and hence cannot
885+
// report a data-race with the candidate index
886+
let thread_id = vector_info[clock_idx];
887+
let vector_terminated = reuse.contains(&clock_idx)
888+
|| terminated_threads.contains_key(&thread_id);
889+
890+
// The vector index cannot report a race with the candidate index
891+
// and hence allows the candidate index to be re-used
892+
no_data_race || vector_terminated
867893
}) {
868894
// All vector clocks for each vector index are equal to
869895
// the target timestamp, and the thread is known to have
@@ -882,6 +908,10 @@ impl GlobalState {
882908
pub fn thread_created(&self, thread: ThreadId) {
883909
let current_index = self.current_index();
884910

911+
// Increment the number of active threads
912+
let active_threads = self.active_thread_count.get();
913+
self.active_thread_count.set(active_threads + 1);
914+
885915
// Enable multi-threaded execution, there are now two threads
886916
// so data-races are now possible.
887917
self.multi_threaded.set(true);
@@ -946,51 +976,90 @@ impl GlobalState {
946976
/// between the joined thead and the current thread.
947977
#[inline]
948978
pub fn thread_joined(&self, current_thread: ThreadId, join_thread: ThreadId) {
949-
let (current_index, join_index) = {
950-
let thread_info = self.thread_info.borrow();
951-
let current_index = thread_info[current_thread].vector_index
952-
.expect("Joining into thread with no assigned vector");
953-
let join_index = thread_info[join_thread].vector_index
954-
.expect("Joining thread with no assigned vector");
955-
(current_index, join_index)
956-
};
957979
let mut clocks_vec = self.vector_clocks.borrow_mut();
958-
let (current, join) = clocks_vec.pick2_mut(current_index, join_index);
980+
let thread_info = self.thread_info.borrow();
981+
982+
// Load the vector clock of the current thread
983+
let current_index = thread_info[current_thread].vector_index
984+
.expect("Performed thread join on thread with no assigned vector");
985+
let current = &mut clocks_vec[current_index];
986+
987+
// Load the associated vector clock for the terminated thread
988+
let join_clock = thread_info[join_thread].termination_vector_clock
989+
.as_ref().expect("Joined with thread but thread has not terminated");
959990

960991
// Pre increment clocks before atomic operation
961992
current.increment_clock(current_index);
962-
join.increment_clock(join_index);
963993

964994
// The join thread happens-before the current thread
965995
// so update the current vector clock
966-
current.join_with(join);
996+
current.clock.join(join_clock);
967997

968998
// Post increment clocks after atomic operation
969-
// the join clock is not incremented, since there will
970-
// be no future events, also if it was incremented
971-
// the thread re-use condition would never pass
972999
current.increment_clock(current_index);
1000+
1001+
// Check the number of active threads, if the value is 1
1002+
// then test for potentially disabling multi-threaded execution
1003+
let active_threads = self.active_thread_count.get();
1004+
if active_threads == 1 {
1005+
// May potentially be able to disable multi-threaded execution
1006+
let current_clock = &clocks_vec[current_index];
1007+
if clocks_vec.iter_enumerated().all(|(idx, clocks)| {
1008+
clocks.clock[idx] <= current_clock.clock[idx]
1009+
}) {
1010+
// The all thread termations happen-before the current clock
1011+
// therefore no data-races can be reported until a new thread
1012+
// is created, so disable multi-threaded execution
1013+
self.multi_threaded.set(false);
1014+
}
1015+
}
1016+
1017+
// If the thread is marked as terminated but not joined
1018+
// then move the thread to the re-use set
1019+
let mut termination = self.terminated_threads.borrow_mut();
1020+
if let Some(index) = termination.remove(&join_thread) {
1021+
let mut reuse = self.reuse_candidates.borrow_mut();
1022+
reuse.insert(index);
1023+
}
9731024
}
9741025

9751026
/// On thread termination, the vector-clock may re-used
9761027
/// in the future once all remaining thread-clocks catch
977-
/// up with the time index of the terminated thread
1028+
/// up with the time index of the terminated thread.
1029+
/// This assiges thread termination with a unique index
1030+
/// which will be used to join the thread
1031+
/// This should be called strictly before any calls to
1032+
/// `thread_joined`
9781033
#[inline]
979-
pub fn thread_terminated(&self, terminated_thread: ThreadId) {
980-
let mut thread_info = self.thread_info.borrow_mut();
981-
let termination_meta = &mut thread_info[terminated_thread];
1034+
pub fn thread_terminated(&self) {
1035+
let current_index = self.current_index();
1036+
1037+
// Increment the clock to a unique termination timestamp
1038+
let mut vector_clocks = self.vector_clocks.borrow_mut();
1039+
let current_clocks = &mut vector_clocks[current_index];
1040+
current_clocks.increment_clock(current_index);
9821041

983-
// Find the terminated index & setup the termination vector-clock
984-
// in case thread join is called in the future after the thread
985-
// has been re-used
986-
let terminated_index = termination_meta.vector_index
987-
.expect("Joining into thread with no assigned vector");
988-
let vector_clocks = self.vector_clocks.borrow();
989-
termination_meta.termination_vector_clock = Some(vector_clocks[terminated_index].clock.clone());
1042+
// Load the current thread id for the executing vector
1043+
let vector_info = self.vector_info.borrow();
1044+
let current_thread = vector_info[current_index];
9901045

991-
// Add this thread as a candidate for re-use
992-
let mut reuse = self.reuse_candidates.borrow_mut();
993-
reuse.insert(terminated_index);
1046+
// Load the current thread metadata, and move to a terminated
1047+
// vector state. Setting up the vector clock all join operations
1048+
// will use.
1049+
let mut thread_info = self.thread_info.borrow_mut();
1050+
let current = &mut thread_info[current_thread];
1051+
current.termination_vector_clock = Some(current_clocks.clock.clone());
1052+
1053+
// Add this thread as a candidate for re-use after a thread join
1054+
// occurs
1055+
let mut termination = self.terminated_threads.borrow_mut();
1056+
termination.insert(current_thread, current_index);
1057+
1058+
// Reduce the number of active threads, now that a thread has
1059+
// terminated
1060+
let mut active_threads = self.active_thread_count.get();
1061+
active_threads -= 1;
1062+
self.active_thread_count.set(active_threads);
9941063
}
9951064

9961065
/// Hook for updating the local tracker of the currently
@@ -1118,4 +1187,3 @@ impl GlobalState {
11181187
self.current_index.get()
11191188
}
11201189
}
1121-

src/thread.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,8 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
443443
return false;
444444
});
445445
}
446+
// Set the thread into a terminated state in the data-race detector
447+
data_race.thread_terminated();
446448
// Check if we need to unblock any threads.
447449
for (i, thread) in self.threads.iter_enumerated_mut() {
448450
if thread.state == ThreadState::BlockedOnJoin(self.active_thread) {
@@ -452,7 +454,6 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
452454
thread.state = ThreadState::Enabled;
453455
}
454456
}
455-
data_race.thread_terminated(self.active_thread);
456457
return free_tls_statics;
457458
}
458459

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// ignore-windows: Concurrency on Windows is not supported yet.
2+
// compile-flags: -Zmiri-disable-isolation
3+
4+
use std::thread::{spawn, sleep};
5+
use std::time::Duration;
6+
use std::mem;
7+
8+
9+
#[derive(Copy, Clone)]
10+
struct EvilSend<T>(pub T);
11+
12+
unsafe impl<T> Send for EvilSend<T> {}
13+
unsafe impl<T> Sync for EvilSend<T> {}
14+
15+
16+
fn main() {
17+
let mut a = 0u32;
18+
let b = &mut a as *mut u32;
19+
let c = EvilSend(b);
20+
21+
let join = unsafe {
22+
spawn(move || {
23+
*c.0 = 32;
24+
})
25+
};
26+
27+
// Detatch the thread and sleep until it terminates
28+
mem::drop(join);
29+
sleep(Duration::from_millis(100));
30+
31+
// Spawn and immediately join a thread
32+
// to execute the join code-path
33+
// and ensure that data-race detection
34+
// remains enabled
35+
spawn(|| ()).join().unwrap();
36+
37+
let join2 = unsafe {
38+
spawn(move || {
39+
*c.0 = 64; //~ ERROR Data race
40+
})
41+
};
42+
43+
join2.join().unwrap();
44+
}

0 commit comments

Comments
 (0)