Skip to content

Commit 86e4ca4

Browse files
committed
Do not pass Runtime to BenchProcessor
1 parent 9e94ced commit 86e4ca4

File tree

7 files changed

+449
-421
lines changed

7 files changed

+449
-421
lines changed

collector/src/bin/collector.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use collector::runtime::{
3434
use collector::toolchain::{
3535
create_toolchain_from_published_version, get_local_toolchain, Sysroot, Toolchain,
3636
};
37+
use collector::utils::wait_for_future;
3738

3839
fn n_normal_benchmarks_remaining(n: usize) -> String {
3940
let suffix = if n == 1 { "" } else { "s" };
@@ -262,7 +263,13 @@ fn profile(
262263
let benchmark_id = format!("{} ({}/{})", benchmark.name, i + 1, benchmarks.len());
263264
eprintln!("Executing benchmark {benchmark_id}");
264265
let mut processor = ProfileProcessor::new(profiler, out_dir, &toolchain.id);
265-
let result = benchmark.measure(&mut processor, profiles, scenarios, toolchain, Some(1));
266+
let result = wait_for_future(benchmark.measure(
267+
&mut processor,
268+
profiles,
269+
scenarios,
270+
toolchain,
271+
Some(1),
272+
));
266273
eprintln!("Finished benchmark {benchmark_id}");
267274

268275
if let Err(ref s) = result {
@@ -1121,7 +1128,6 @@ fn bench_compile(
11211128
print_intro();
11221129

11231130
let mut processor = BenchProcessor::new(
1124-
rt,
11251131
tx.conn(),
11261132
benchmark_name,
11271133
&shared.artifact_id,
@@ -1157,13 +1163,13 @@ fn bench_compile(
11571163
)
11581164
},
11591165
&|processor| {
1160-
benchmark.measure(
1166+
rt.block_on(benchmark.measure(
11611167
processor,
11621168
&config.profiles,
11631169
&config.scenarios,
11641170
&shared.toolchain,
11651171
config.iterations,
1166-
)
1172+
))
11671173
},
11681174
)
11691175
}
@@ -1175,8 +1181,7 @@ fn bench_compile(
11751181
Category::Primary,
11761182
&|| eprintln!("Special benchmark commencing (due to `--bench-rustc`)"),
11771183
&|processor| {
1178-
processor
1179-
.measure_rustc(&shared.toolchain)
1184+
rt.block_on(processor.measure_rustc(&shared.toolchain))
11801185
.context("measure rustc")
11811186
},
11821187
);

collector/src/compile/benchmark/mod.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::compile::benchmark::profile::Profile;
44
use crate::compile::benchmark::scenario::Scenario;
55
use crate::compile::execute::{CargoProcess, Processor};
66
use crate::toolchain::Toolchain;
7+
use crate::utils::wait_for_future;
78
use anyhow::{bail, Context};
89
use log::debug;
910
use std::collections::{HashMap, HashSet};
@@ -189,7 +190,7 @@ impl Benchmark {
189190
}
190191

191192
/// Run a specific benchmark under a processor + profiler combination.
192-
pub fn measure(
193+
pub async fn measure(
193194
&self,
194195
processor: &mut dyn Processor,
195196
profiles: &[Profile],
@@ -261,9 +262,11 @@ impl Benchmark {
261262
for (profile, prep_dir) in &profile_dirs {
262263
let server = server.clone();
263264
let thread = s.spawn::<_, anyhow::Result<()>>(move || {
264-
self.mk_cargo_process(toolchain, prep_dir.path(), *profile)
265-
.jobserver(server)
266-
.run_rustc(false)?;
265+
wait_for_future(
266+
self.mk_cargo_process(toolchain, prep_dir.path(), *profile)
267+
.jobserver(server)
268+
.run_rustc(false),
269+
)?;
267270
Ok(())
268271
});
269272
threads.push(thread);
@@ -310,7 +313,8 @@ impl Benchmark {
310313
if scenarios.contains(&Scenario::Full) {
311314
self.mk_cargo_process(toolchain, cwd, profile)
312315
.processor(processor, Scenario::Full, "Full", None)
313-
.run_rustc(true)?;
316+
.run_rustc(true)
317+
.await?;
314318
}
315319

316320
// Rustdoc does not support incremental compilation
@@ -321,15 +325,17 @@ impl Benchmark {
321325
self.mk_cargo_process(toolchain, cwd, profile)
322326
.incremental(true)
323327
.processor(processor, Scenario::IncrFull, "IncrFull", None)
324-
.run_rustc(true)?;
328+
.run_rustc(true)
329+
.await?;
325330
}
326331

327332
// An incremental build with no changes (fastest incremental case).
328333
if scenarios.contains(&Scenario::IncrUnchanged) {
329334
self.mk_cargo_process(toolchain, cwd, profile)
330335
.incremental(true)
331336
.processor(processor, Scenario::IncrUnchanged, "IncrUnchanged", None)
332-
.run_rustc(true)?;
337+
.run_rustc(true)
338+
.await?;
333339
}
334340

335341
if scenarios.contains(&Scenario::IncrPatched) {
@@ -348,7 +354,8 @@ impl Benchmark {
348354
&scenario_str,
349355
Some(patch),
350356
)
351-
.run_rustc(true)?;
357+
.run_rustc(true)
358+
.await?;
352359
}
353360
}
354361
}

collector/src/compile/execute/bencher.rs

Lines changed: 71 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@ use crate::toolchain::Toolchain;
1010
use crate::utils::git::get_rustc_perf_commit;
1111
use futures::stream::FuturesUnordered;
1212
use futures::StreamExt;
13+
use std::future::Future;
1314
use std::path::PathBuf;
15+
use std::pin::Pin;
1416
use std::process::Command;
1517
use std::{env, process};
16-
use tokio::runtime::Runtime;
1718

1819
// Tools usable with the benchmarking subcommands.
1920
#[derive(Clone, Copy, Debug, PartialEq)]
@@ -25,7 +26,6 @@ pub enum Bencher {
2526
}
2627

2728
pub struct BenchProcessor<'a> {
28-
rt: &'a mut Runtime,
2929
benchmark: &'a BenchmarkName,
3030
conn: &'a mut dyn database::Connection,
3131
artifact: &'a database::ArtifactId,
@@ -38,7 +38,6 @@ pub struct BenchProcessor<'a> {
3838

3939
impl<'a> BenchProcessor<'a> {
4040
pub fn new(
41-
rt: &'a mut Runtime,
4241
conn: &'a mut dyn database::Connection,
4342
benchmark: &'a BenchmarkName,
4443
artifact: &'a database::ArtifactId,
@@ -63,7 +62,6 @@ impl<'a> BenchProcessor<'a> {
6362
}
6463

6564
BenchProcessor {
66-
rt,
6765
upload: None,
6866
conn,
6967
benchmark,
@@ -75,15 +73,15 @@ impl<'a> BenchProcessor<'a> {
7573
}
7674
}
7775

78-
fn insert_stats(
76+
async fn insert_stats(
7977
&mut self,
8078
scenario: database::Scenario,
8179
profile: Profile,
8280
stats: (Stats, Option<SelfProfile>, Option<SelfProfileFiles>),
8381
) {
8482
let version = get_rustc_perf_commit();
8583

86-
let collection = self.rt.block_on(self.conn.collection_id(&version));
84+
let collection = self.conn.collection_id(&version).await;
8785
let profile = match profile {
8886
Profile::Check => database::Profile::Check,
8987
Profile::Debug => database::Profile::Debug,
@@ -110,13 +108,15 @@ impl<'a> BenchProcessor<'a> {
110108
.join(profile.to_string())
111109
.join(scenario.to_id());
112110
self.upload = Some(Upload::new(prefix, collection, files));
113-
self.rt.block_on(self.conn.record_raw_self_profile(
114-
collection,
115-
self.artifact_row_id,
116-
self.benchmark.0.as_str(),
117-
profile,
118-
scenario,
119-
));
111+
self.conn
112+
.record_raw_self_profile(
113+
collection,
114+
self.artifact_row_id,
115+
self.benchmark.0.as_str(),
116+
profile,
117+
scenario,
118+
)
119+
.await;
120120
}
121121
}
122122

@@ -156,18 +156,11 @@ impl<'a> BenchProcessor<'a> {
156156
}
157157
}
158158

159-
self.rt
160-
.block_on(async move { while let Some(()) = buf.next().await {} });
159+
while let Some(()) = buf.next().await {}
161160
}
162161

163-
pub fn measure_rustc(&mut self, toolchain: &Toolchain) -> anyhow::Result<()> {
164-
rustc::measure(
165-
self.rt,
166-
self.conn,
167-
toolchain,
168-
self.artifact,
169-
self.artifact_row_id,
170-
)
162+
pub async fn measure_rustc(&mut self, toolchain: &Toolchain) -> anyhow::Result<()> {
163+
rustc::measure(self.conn, toolchain, self.artifact, self.artifact_row_id).await
171164
}
172165
}
173166

@@ -197,63 +190,70 @@ impl<'a> Processor for BenchProcessor<'a> {
197190
self.perf_tool() != original
198191
}
199192

200-
fn process_output(
201-
&mut self,
202-
data: &ProcessOutputData<'_>,
193+
fn process_output<'b>(
194+
&'b mut self,
195+
data: &'b ProcessOutputData<'_>,
203196
output: process::Output,
204-
) -> anyhow::Result<Retry> {
205-
match execute::process_stat_output(output) {
206-
Ok(mut res) => {
207-
if let Some(ref profile) = res.1 {
208-
execute::store_artifact_sizes_into_stats(&mut res.0, profile);
209-
}
210-
if let Profile::Doc = data.profile {
211-
let doc_dir = data.cwd.join("target/doc");
212-
if doc_dir.is_dir() {
213-
execute::store_documentation_size_into_stats(&mut res.0, &doc_dir);
214-
}
215-
}
216-
217-
match data.scenario {
218-
Scenario::Full => {
219-
self.insert_stats(database::Scenario::Empty, data.profile, res);
197+
) -> Pin<Box<dyn Future<Output = anyhow::Result<Retry>> + 'b>> {
198+
Box::pin(async move {
199+
match execute::process_stat_output(output) {
200+
Ok(mut res) => {
201+
if let Some(ref profile) = res.1 {
202+
execute::store_artifact_sizes_into_stats(&mut res.0, profile);
220203
}
221-
Scenario::IncrFull => {
222-
self.insert_stats(database::Scenario::IncrementalEmpty, data.profile, res);
204+
if let Profile::Doc = data.profile {
205+
let doc_dir = data.cwd.join("target/doc");
206+
if doc_dir.is_dir() {
207+
execute::store_documentation_size_into_stats(&mut res.0, &doc_dir);
208+
}
223209
}
224-
Scenario::IncrUnchanged => {
225-
self.insert_stats(database::Scenario::IncrementalFresh, data.profile, res);
226-
}
227-
Scenario::IncrPatched => {
228-
let patch = data.patch.unwrap();
229-
self.insert_stats(
230-
database::Scenario::IncrementalPatch(patch.name),
210+
211+
let fut = match data.scenario {
212+
Scenario::Full => {
213+
self.insert_stats(database::Scenario::Empty, data.profile, res)
214+
}
215+
Scenario::IncrFull => self.insert_stats(
216+
database::Scenario::IncrementalEmpty,
217+
data.profile,
218+
res,
219+
),
220+
Scenario::IncrUnchanged => self.insert_stats(
221+
database::Scenario::IncrementalFresh,
231222
data.profile,
232223
res,
224+
),
225+
Scenario::IncrPatched => {
226+
let patch = data.patch.unwrap();
227+
self.insert_stats(
228+
database::Scenario::IncrementalPatch(patch.name),
229+
data.profile,
230+
res,
231+
)
232+
}
233+
};
234+
fut.await;
235+
Ok(Retry::No)
236+
}
237+
Err(DeserializeStatError::NoOutput(output)) => {
238+
if self.tries < 5 {
239+
log::warn!(
240+
"failed to deserialize stats, retrying (try {}); output: {:?}",
241+
self.tries,
242+
output
233243
);
244+
self.tries += 1;
245+
Ok(Retry::Yes)
246+
} else {
247+
panic!("failed to collect statistics after 5 tries");
234248
}
235249
}
236-
Ok(Retry::No)
237-
}
238-
Err(DeserializeStatError::NoOutput(output)) => {
239-
if self.tries < 5 {
240-
log::warn!(
241-
"failed to deserialize stats, retrying (try {}); output: {:?}",
242-
self.tries,
243-
output
244-
);
245-
self.tries += 1;
246-
Ok(Retry::Yes)
247-
} else {
248-
panic!("failed to collect statistics after 5 tries");
250+
Err(
251+
e @ (DeserializeStatError::ParseError { .. }
252+
| DeserializeStatError::XperfError(..)),
253+
) => {
254+
panic!("process_perf_stat_output failed: {:?}", e);
249255
}
250256
}
251-
Err(
252-
e
253-
@ (DeserializeStatError::ParseError { .. } | DeserializeStatError::XperfError(..)),
254-
) => {
255-
panic!("process_perf_stat_output failed: {:?}", e);
256-
}
257-
}
257+
})
258258
}
259259
}

collector/src/compile/execute/mod.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@ use database::QueryLabel;
1212
use std::collections::HashMap;
1313
use std::env;
1414
use std::fs;
15+
use std::future::Future;
1516
use std::io::Read;
1617
use std::path::{Path, PathBuf};
18+
use std::pin::Pin;
1719
use std::process::{self, Command};
1820
use std::str;
1921
use std::time::Duration;
@@ -187,7 +189,7 @@ impl<'a> CargoProcess<'a> {
187189
// FIXME: the needs_final and processor_etc interactions aren't ideal; we
188190
// would like to "auto know" when we need final but currently we don't
189191
// really.
190-
pub fn run_rustc(&mut self, needs_final: bool) -> anyhow::Result<()> {
192+
pub async fn run_rustc(&mut self, needs_final: bool) -> anyhow::Result<()> {
191193
log::info!(
192194
"run_rustc with incremental={}, profile={:?}, scenario={:?}, patch={:?}",
193195
self.incremental,
@@ -317,7 +319,7 @@ impl<'a> CargoProcess<'a> {
317319
scenario_str,
318320
patch,
319321
};
320-
match processor.process_output(&data, output) {
322+
match processor.process_output(&data, output).await {
321323
Ok(Retry::No) => return Ok(()),
322324
Ok(Retry::Yes) => {}
323325
Err(e) => return Err(e),
@@ -375,11 +377,11 @@ pub trait Processor {
375377
fn perf_tool(&self) -> PerfTool;
376378

377379
/// Process the output produced by the particular `Profiler` being used.
378-
fn process_output(
379-
&mut self,
380-
data: &ProcessOutputData<'_>,
380+
fn process_output<'a>(
381+
&'a mut self,
382+
data: &'a ProcessOutputData<'_>,
381383
output: process::Output,
382-
) -> anyhow::Result<Retry>;
384+
) -> Pin<Box<dyn Future<Output = anyhow::Result<Retry>> + 'a>>;
383385

384386
/// Provided to permit switching on more expensive profiling if it's needed
385387
/// for the "first" run for any given benchmark (we reuse the processor),

0 commit comments

Comments
 (0)