Skip to content

Commit c2f25db

Browse files
committed
proc_macro: use crossbeam channels for the proc_macro cross-thread bridge
This is done by having the crossbeam dependency inserted into the proc_macro server code from the server side, to avoid adding a dependency to proc_macro. In addition, this introduces a -Z command-line option which will switch rustc to run proc-macros using this cross-thread executor. With the changes to the bridge in #98186, #98187, #98188 and #98189, the performance of the executor should be much closer to same-thread execution. In local testing, the crossbeam executor was substantially more performant than either of the two existing CrossThread strategies, so they have been removed to keep things simple.
1 parent 6341bad commit c2f25db

File tree

1 file changed

+64
-72
lines changed

1 file changed

+64
-72
lines changed

proc_macro/src/bridge/server.rs

Lines changed: 64 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
33
use super::*;
44

5+
use std::marker::PhantomData;
6+
57
// FIXME(eddyb) generate the definition of `HandleStore` in `server.rs`.
68
use super::client::HandleStore;
79

@@ -143,6 +145,41 @@ pub trait ExecutionStrategy {
143145
) -> Buffer;
144146
}
145147

148+
pub struct MaybeCrossThread<P> {
149+
cross_thread: bool,
150+
marker: PhantomData<P>,
151+
}
152+
153+
impl<P> MaybeCrossThread<P> {
154+
pub const fn new(cross_thread: bool) -> Self {
155+
MaybeCrossThread { cross_thread, marker: PhantomData }
156+
}
157+
}
158+
159+
impl<P> ExecutionStrategy for MaybeCrossThread<P>
160+
where
161+
P: MessagePipe<Buffer> + Send + 'static,
162+
{
163+
fn run_bridge_and_client(
164+
&self,
165+
dispatcher: &mut impl DispatcherTrait,
166+
input: Buffer,
167+
run_client: extern "C" fn(BridgeConfig<'_>) -> Buffer,
168+
force_show_panics: bool,
169+
) -> Buffer {
170+
if self.cross_thread {
171+
<CrossThread<P>>::new().run_bridge_and_client(
172+
dispatcher,
173+
input,
174+
run_client,
175+
force_show_panics,
176+
)
177+
} else {
178+
SameThread.run_bridge_and_client(dispatcher, input, run_client, force_show_panics)
179+
}
180+
}
181+
}
182+
146183
pub struct SameThread;
147184

148185
impl ExecutionStrategy for SameThread {
@@ -164,28 +201,31 @@ impl ExecutionStrategy for SameThread {
164201
}
165202
}
166203

167-
// NOTE(eddyb) Two implementations are provided, the second one is a bit
168-
// faster but neither is anywhere near as fast as same-thread execution.
204+
pub struct CrossThread<P>(PhantomData<P>);
169205

170-
pub struct CrossThread1;
206+
impl<P> CrossThread<P> {
207+
pub const fn new() -> Self {
208+
CrossThread(PhantomData)
209+
}
210+
}
171211

172-
impl ExecutionStrategy for CrossThread1 {
212+
impl<P> ExecutionStrategy for CrossThread<P>
213+
where
214+
P: MessagePipe<Buffer> + Send + 'static,
215+
{
173216
fn run_bridge_and_client(
174217
&self,
175218
dispatcher: &mut impl DispatcherTrait,
176219
input: Buffer,
177220
run_client: extern "C" fn(BridgeConfig<'_>) -> Buffer,
178221
force_show_panics: bool,
179222
) -> Buffer {
180-
use std::sync::mpsc::channel;
181-
182-
let (req_tx, req_rx) = channel();
183-
let (res_tx, res_rx) = channel();
223+
let (mut server, mut client) = P::new();
184224

185225
let join_handle = thread::spawn(move || {
186-
let mut dispatch = |buf| {
187-
req_tx.send(buf).unwrap();
188-
res_rx.recv().unwrap()
226+
let mut dispatch = |b: Buffer| -> Buffer {
227+
client.send(b);
228+
client.recv().expect("server died while client waiting for reply")
189229
};
190230

191231
run_client(BridgeConfig {
@@ -196,75 +236,27 @@ impl ExecutionStrategy for CrossThread1 {
196236
})
197237
});
198238

199-
for b in req_rx {
200-
res_tx.send(dispatcher.dispatch(b)).unwrap();
239+
while let Some(b) = server.recv() {
240+
server.send(dispatcher.dispatch(b));
201241
}
202242

203243
join_handle.join().unwrap()
204244
}
205245
}
206246

207-
pub struct CrossThread2;
247+
/// A message pipe used for communicating between server and client threads.
248+
pub trait MessagePipe<T>: Sized {
249+
/// Create a new pair of endpoints for the message pipe.
250+
fn new() -> (Self, Self);
208251

209-
impl ExecutionStrategy for CrossThread2 {
210-
fn run_bridge_and_client(
211-
&self,
212-
dispatcher: &mut impl DispatcherTrait,
213-
input: Buffer,
214-
run_client: extern "C" fn(BridgeConfig<'_>) -> Buffer,
215-
force_show_panics: bool,
216-
) -> Buffer {
217-
use std::sync::{Arc, Mutex};
218-
219-
enum State<T> {
220-
Req(T),
221-
Res(T),
222-
}
223-
224-
let mut state = Arc::new(Mutex::new(State::Res(Buffer::new())));
225-
226-
let server_thread = thread::current();
227-
let state2 = state.clone();
228-
let join_handle = thread::spawn(move || {
229-
let mut dispatch = |b| {
230-
*state2.lock().unwrap() = State::Req(b);
231-
server_thread.unpark();
232-
loop {
233-
thread::park();
234-
if let State::Res(b) = &mut *state2.lock().unwrap() {
235-
break b.take();
236-
}
237-
}
238-
};
239-
240-
let r = run_client(BridgeConfig {
241-
input,
242-
dispatch: (&mut dispatch).into(),
243-
force_show_panics,
244-
_marker: marker::PhantomData,
245-
});
246-
247-
// Wake up the server so it can exit the dispatch loop.
248-
drop(state2);
249-
server_thread.unpark();
250-
251-
r
252-
});
253-
254-
// Check whether `state2` was dropped, to know when to stop.
255-
while Arc::get_mut(&mut state).is_none() {
256-
thread::park();
257-
let mut b = match &mut *state.lock().unwrap() {
258-
State::Req(b) => b.take(),
259-
_ => continue,
260-
};
261-
b = dispatcher.dispatch(b.take());
262-
*state.lock().unwrap() = State::Res(b);
263-
join_handle.thread().unpark();
264-
}
252+
/// Send a message to the other endpoint of this pipe.
253+
fn send(&mut self, value: T);
265254

266-
join_handle.join().unwrap()
267-
}
255+
/// Receive a message from the other endpoint of this pipe.
256+
///
257+
/// Returns `None` if the other end of the pipe has been destroyed, and no
258+
/// message was received.
259+
fn recv(&mut self) -> Option<T>;
268260
}
269261

270262
fn run_server<

0 commit comments

Comments
 (0)