Skip to content

Commit 097c70a

Browse files
committed
Switch to crossbeam-queue for events
Use a lock-free queue to store the event list because the event callbacks might be called from inside a signal handler. Most system calls are not safe in that context, so we just spin if the queue is full.
1 parent 9511a4f commit 097c70a

File tree

4 files changed

+63
-13
lines changed

4 files changed

+63
-13
lines changed

Cargo.lock

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

analysis/runtime/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,5 @@ bincode = "1.0.1"
1717
once_cell = "1"
1818
enum_dispatch = "0.3"
1919
fs-err = "2"
20+
crossbeam-queue = "0.3"
21+
crossbeam-utils = "0.8"

analysis/runtime/src/runtime/backend.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
use crossbeam_queue::ArrayQueue;
2+
use crossbeam_utils::Backoff;
13
use enum_dispatch::enum_dispatch;
24
use fs_err::{File, OpenOptions};
35
use std::fmt::Debug;
46
use std::io::{stderr, BufWriter, Write};
7+
use std::sync::Arc;
58

69
use bincode;
710

@@ -80,18 +83,32 @@ pub enum Backend {
8083
}
8184

8285
impl Backend {
83-
fn write_all(&mut self, events: impl IntoIterator<Item = Event>) {
84-
for event in events {
86+
fn write_all(&mut self, events: Arc<ArrayQueue<Event>>) {
87+
let backoff = Backoff::new();
88+
loop {
89+
let event = match events.pop() {
90+
Some(event) => event,
91+
None => {
92+
// We can't use anything with a futex here since
93+
// the event sender might run inside a signal handler
94+
backoff.snooze();
95+
continue;
96+
}
97+
};
98+
8599
let done = matches!(event.kind, EventKind::Done);
86100
self.write(event);
87101
if done {
88102
break;
89103
}
104+
105+
// Reset the backoff timer since we got an event
106+
backoff.reset();
90107
}
91108
self.flush();
92109
}
93110

94-
pub fn run(&mut self, events: impl IntoIterator<Item = Event>) {
111+
pub fn run(&mut self, events: Arc<ArrayQueue<Event>>) {
95112
let (lock, cvar) = &*FINISHED;
96113
let mut finished = lock.lock().unwrap();
97114
self.write_all(events);

analysis/runtime/src/runtime/scoped_runtime.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
use std::{
2-
sync::{
3-
mpsc::{self, SyncSender},
4-
Mutex,
5-
},
2+
sync::{Arc, Mutex},
63
thread,
74
};
85

6+
use crossbeam_queue::ArrayQueue;
7+
use crossbeam_utils::Backoff;
98
use enum_dispatch::enum_dispatch;
109
use once_cell::sync::OnceCell;
1110

@@ -117,16 +116,32 @@ impl Runtime for MainThreadRuntime {
117116
}
118117

119118
pub struct BackgroundThreadRuntime {
120-
tx: SyncSender<Event>,
119+
tx: Arc<ArrayQueue<Event>>,
121120
finalized: OnceCell<()>,
122121
}
123122

123+
impl BackgroundThreadRuntime {
124+
fn push_event(&self, mut event: Event, can_sleep: bool) {
125+
let backoff = Backoff::new();
126+
while let Err(event_back) = self.tx.push(event) {
127+
if can_sleep {
128+
backoff.snooze();
129+
} else {
130+
// We have no choice but to spin here because
131+
// we might be inside a signal handler
132+
backoff.spin();
133+
}
134+
event = event_back;
135+
}
136+
}
137+
}
138+
124139
impl ExistingRuntime for BackgroundThreadRuntime {
125140
fn finalize(&self) {
126141
// Only run the finalizer once.
127142
self.finalized.get_or_init(|| {
128143
// Notify the backend that we're done.
129-
self.tx.send(Event::done()).unwrap();
144+
self.push_event(Event::done(), true);
130145

131146
// Wait for the backend thread to finish.
132147
let (lock, cvar) = &*FINISHED;
@@ -147,9 +162,7 @@ impl ExistingRuntime for BackgroundThreadRuntime {
147162
fn send_event(&self, event: Event) {
148163
match self.finalized.get() {
149164
None => {
150-
// `.unwrap()` as we're in no place to handle an error here,
151-
// unless we should silently drop the [`Event`] instead.
152-
self.tx.send(event).unwrap();
165+
self.push_event(event, false);
153166
}
154167
Some(()) => {
155168
// Silently drop the [`Event`] as the [`BackgroundThreadRuntime`] has already been [`BackgroundThreadRuntime::finalize`]d.
@@ -172,7 +185,8 @@ impl Runtime for BackgroundThreadRuntime {
172185
/// Initialize the [`BackgroundThreadRuntime`], which includes [`thread::spawn`]ing,
173186
/// so it must be run post-`main`.
174187
fn try_init(mut backend: Backend) -> Result<Self, AnyError> {
175-
let (tx, rx) = mpsc::sync_channel(1024);
188+
let tx = Arc::new(ArrayQueue::new(1 << 20));
189+
let rx = Arc::clone(&tx);
176190
thread::spawn(move || backend.run(rx));
177191
Ok(Self {
178192
tx,

0 commit comments

Comments
 (0)