Skip to content

Commit 47bea3f

Browse files
Record incremental progress into the database when collecting
1 parent fe17f8e commit 47bea3f

File tree

4 files changed

+110
-8
lines changed

4 files changed

+110
-8
lines changed

collector/src/main.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
extern crate clap;
55

66
use anyhow::{bail, Context};
7-
use database::{pool::Connection, ArtifactId, Commit};
7+
use database::{ArtifactId, Commit};
88
use log::debug;
99
use std::collections::HashSet;
1010
use std::fs;
@@ -185,7 +185,7 @@ impl BenchmarkErrors {
185185

186186
fn bench(
187187
rt: &mut Runtime,
188-
mut conn: Box<dyn Connection>,
188+
pool: database::Pool,
189189
cid: &ArtifactId,
190190
build_kinds: &[BuildKind],
191191
run_kinds: &[RunKind],
@@ -194,6 +194,8 @@ fn bench(
194194
iterations: usize,
195195
self_profile: bool,
196196
) -> BenchmarkErrors {
197+
let status_conn = rt.block_on(pool.connection());
198+
let mut conn = rt.block_on(pool.connection());
197199
let mut errors = BenchmarkErrors::new();
198200
eprintln!("Benchmarking {} for triple {}", cid, compiler.triple);
199201

@@ -223,7 +225,13 @@ fn bench(
223225
let interned_cid = rt.block_on(tx.conn().artifact_id(&cid));
224226

225227
let start = Instant::now();
228+
let steps = benchmarks
229+
.iter()
230+
.map(|b| b.name.to_string())
231+
.collect::<Vec<_>>();
232+
rt.block_on(status_conn.collector_start(interned_cid, &steps));
226233
for (nth_benchmark, benchmark) in benchmarks.iter().enumerate() {
234+
rt.block_on(status_conn.collector_start_step(interned_cid, &benchmark.name.to_string()));
227235
rt.block_on(
228236
tx.conn()
229237
.record_benchmark(benchmark.name.0.as_str(), benchmark.supports_stable()),
@@ -254,6 +262,7 @@ fn bench(
254262
&format!("{:?}", s),
255263
));
256264
};
265+
rt.block_on(status_conn.collector_end_step(interned_cid, &benchmark.name.to_string()));
257266
}
258267
let end = start.elapsed();
259268

@@ -516,15 +525,14 @@ fn main_result() -> anyhow::Result<i32> {
516525
let self_profile = sub_m.is_present("SELF_PROFILE");
517526

518527
let pool = database::Pool::open(db);
519-
let conn = rt.block_on(pool.connection());
520528

521529
let (rustc, rustdoc, cargo) = get_local_toolchain(&build_kinds, rustc, rustdoc, cargo)?;
522530

523531
let benchmarks = get_benchmarks(&benchmark_dir, include, exclude)?;
524532

525533
let res = bench(
526534
&mut rt,
527-
conn,
535+
pool,
528536
&ArtifactId::Artifact(id.to_string()),
529537
&build_kinds,
530538
&run_kinds,
@@ -567,7 +575,6 @@ fn main_result() -> anyhow::Result<i32> {
567575
let commit = get_commit_or_fake_it(&commit)?;
568576

569577
let pool = database::Pool::open(db);
570-
let conn = rt.block_on(pool.connection());
571578

572579
let sysroot = Sysroot::install(commit.sha.to_string(), "x86_64-unknown-linux-gnu")
573580
.with_context(|| format!("failed to install sysroot for {:?}", commit))?;
@@ -576,7 +583,7 @@ fn main_result() -> anyhow::Result<i32> {
576583

577584
let res = bench(
578585
&mut rt,
579-
conn,
586+
pool,
580587
&ArtifactId::Commit(commit),
581588
&BuildKind::all(),
582589
&RunKind::all(),
@@ -608,7 +615,6 @@ fn main_result() -> anyhow::Result<i32> {
608615
}
609616

610617
let pool = database::Pool::open(db);
611-
let conn = rt.block_on(pool.connection());
612618

613619
let run_kinds = if collector::version_supports_incremental(toolchain) {
614620
RunKind::all()
@@ -647,7 +653,7 @@ fn main_result() -> anyhow::Result<i32> {
647653

648654
let res = bench(
649655
&mut rt,
650-
conn,
656+
pool,
651657
&ArtifactId::Artifact(toolchain.to_string()),
652658
&build_kinds,
653659
&run_kinds,

database/src/pool.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ pub trait Connection: Send + Sync {
6161
async fn pr_attach_commit(&self, pr: u32, sha: &str, parent_sha: &str) -> bool;
6262
async fn queued_commits(&self) -> Vec<QueuedCommit>;
6363
async fn mark_complete(&self, sha: &str) -> Option<QueuedCommit>;
64+
65+
// Collector status API
66+
67+
async fn collector_start(&self, aid: ArtifactIdNumber, steps: &[String]);
68+
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str);
69+
async fn collector_end_step(&self, aid: ArtifactIdNumber, step: &str);
6470
}
6571

6672
#[async_trait::async_trait]

database/src/pool/postgres.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,14 @@ static MIGRATIONS: &[&str] = &[
159159
duration integer not null
160160
);
161161
"#,
162+
r#"
163+
create table collector_progress(
164+
aid smallint not null references artifact(id) on delete cascade on update cascade,
165+
step text not null,
166+
start timestamptz,
167+
end timestamptz
168+
);
169+
"#,
162170
];
163171

164172
#[async_trait::async_trait]
@@ -771,4 +779,44 @@ where
771779
.await
772780
.unwrap();
773781
}
782+
783+
async fn collector_start(&self, aid: ArtifactIdNumber, steps: &[String]) {
784+
self.conn()
785+
.execute(
786+
"delete from collector_progress where start is null or end is null;",
787+
&[],
788+
)
789+
.await
790+
.unwrap();
791+
792+
for step in steps {
793+
self.conn()
794+
.execute(
795+
"insert into collector_progress(aid, step) VALUES ($1, $2)",
796+
&[&(aid.0 as i16), &step],
797+
)
798+
.await
799+
.unwrap();
800+
}
801+
}
802+
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str) {
803+
self.conn()
804+
.execute(
805+
"update collector_progress set start = CURRENT_TIMESTAMP \
806+
where aid = $1 and step = $2 and start is null and end is null;",
807+
&[&(aid.0 as i16), &step],
808+
)
809+
.await
810+
.unwrap();
811+
}
812+
async fn collector_end_step(&self, aid: ArtifactIdNumber, step: &str) {
813+
self.conn()
814+
.execute(
815+
"update collector_progress set end = CURRENT_TIMESTAMP \
816+
where aid = $1 and step = $2 and start is not null and end is null;",
817+
&[&(aid.0 as i16), &step],
818+
)
819+
.await
820+
.unwrap();
821+
}
774822
}

database/src/pool/sqlite.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,14 @@ static MIGRATIONS: &[&str] = &[
140140
duration integer not null
141141
);
142142
"#,
143+
r#"
144+
create table collector_progress(
145+
aid integer not null references artifact(id) on delete cascade on update cascade,
146+
step text not null,
147+
start timestamp without time zone,
148+
end timestamp without time zone
149+
);
150+
"#,
143151
];
144152

145153
#[async_trait::async_trait]
@@ -626,4 +634,38 @@ impl Connection for SqliteConnection {
626634
)
627635
.unwrap();
628636
}
637+
async fn collector_start(&self, aid: ArtifactIdNumber, steps: &[String]) {
638+
// Clean out any leftover unterminated steps.
639+
self.raw_ref()
640+
.execute_batch("delete from collector_progress where start is null or end is null;")
641+
.unwrap();
642+
643+
// Populate unstarted and unfinished steps into collector_progress.
644+
for step in steps {
645+
self.raw_ref()
646+
.execute(
647+
"insert into collector_progress(aid, step) VALUES (?, ?)",
648+
params![&aid.0, step],
649+
)
650+
.unwrap();
651+
}
652+
}
653+
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str) {
654+
self.raw_ref()
655+
.execute(
656+
"update collector_progress set start = now \
657+
where aid = ? and step = ? and start is null;",
658+
params![&aid.0, &step],
659+
)
660+
.unwrap();
661+
}
662+
async fn collector_end_step(&self, aid: ArtifactIdNumber, step: &str) {
663+
self.raw_ref()
664+
.execute(
665+
"update collector_progress set end = now \
666+
where aid = ? and step = ? and start is not null and end is null;",
667+
params![&aid.0, &step],
668+
)
669+
.unwrap();
670+
}
629671
}

0 commit comments

Comments
 (0)