Skip to content

Commit 5d7ffee

Browse files
Incrementally make progress during collection
This permits restarting of the collection server without losing hour(s) of work. It also removes the extremely long-lived (2h30m as of now) transaction in favor of one wrapping each individual benchmark. Ideally, that would be by itself "sufficiently small." However, currently, even just limiting to one benchmark can mean a hour-long transaction (for script-servo-2). This is a bit unfortunate, but at least killing that transaction will be dealt with fairly smoothly.
1 parent 0680323 commit 5d7ffee

File tree

4 files changed

+54
-73
lines changed

4 files changed

+54
-73
lines changed

collector/src/main.rs

Lines changed: 17 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -195,14 +195,6 @@ fn bench(
195195
self_profile: bool,
196196
) -> BenchmarkErrors {
197197
let mut conn = rt.block_on(pool.connection());
198-
let status_conn;
199-
let status_conn: Option<&dyn database::Connection> =
200-
if conn.separate_transaction_for_collector() {
201-
status_conn = rt.block_on(pool.connection());
202-
Some(&*status_conn)
203-
} else {
204-
None
205-
};
206198
let mut errors = BenchmarkErrors::new();
207199
eprintln!("Benchmarking {} for triple {}", cid, compiler.triple);
208200

@@ -215,38 +207,24 @@ fn bench(
215207
);
216208
}
217209

218-
let mut tx = rt.block_on(conn.transaction());
219-
let index = rt.block_on(database::Index::load(tx.conn()));
220-
let has_collected = match cid {
221-
ArtifactId::Commit(commit) => index.commits().iter().any(|c| c == commit),
222-
ArtifactId::Artifact(id) => index.artifacts().any(|a| a == id),
223-
};
224-
if has_collected {
225-
eprintln!("'{}' has previously been collected, aborting.", cid);
226-
eprintln!(
227-
"Note that this behavior is likely to change in the future \
228-
to collect and append the data instead."
229-
);
230-
return errors;
231-
}
232-
let interned_cid = rt.block_on(tx.conn().artifact_id(&cid));
210+
let interned_cid = rt.block_on(conn.artifact_id(&cid));
233211

234212
let start = Instant::now();
235213
let steps = benchmarks
236214
.iter()
237215
.map(|b| b.name.to_string())
238216
.collect::<Vec<_>>();
239-
rt.block_on(
240-
status_conn
241-
.unwrap_or_else(|| &*tx.conn())
242-
.collector_start(interned_cid, &steps),
243-
);
217+
rt.block_on(conn.collector_start(interned_cid, &steps));
218+
let mut skipped = false;
244219
for (nth_benchmark, benchmark) in benchmarks.iter().enumerate() {
245-
rt.block_on(
246-
status_conn
247-
.unwrap_or_else(|| &*tx.conn())
248-
.collector_start_step(interned_cid, &benchmark.name.to_string()),
249-
);
220+
let is_fresh =
221+
rt.block_on(conn.collector_start_step(interned_cid, &benchmark.name.to_string()));
222+
if !is_fresh {
223+
skipped = true;
224+
eprintln!("skipping {} -- already benchmarked", benchmark.name);
225+
continue;
226+
}
227+
let mut tx = rt.block_on(conn.transaction());
250228
rt.block_on(
251229
tx.conn()
252230
.record_benchmark(benchmark.name.0.as_str(), benchmark.supports_stable()),
@@ -278,19 +256,19 @@ fn bench(
278256
));
279257
};
280258
rt.block_on(
281-
status_conn
282-
.unwrap_or_else(|| &*tx.conn())
259+
tx.conn()
283260
.collector_end_step(interned_cid, &benchmark.name.to_string()),
284261
);
262+
rt.block_on(tx.commit()).expect("committed");
285263
}
286264
let end = start.elapsed();
287265

288266
eprintln!("collection took {:?}", end);
289267

290-
rt.block_on(tx.conn().record_duration(interned_cid, end));
291-
292-
// Publish results now that we've finished fully with this commit.
293-
rt.block_on(tx.commit()).unwrap();
268+
if !skipped {
269+
log::info!("skipping duration record -- skipped parts of run");
270+
rt.block_on(conn.record_duration(interned_cid, end));
271+
}
294272

295273
rt.block_on(async move {
296274
// This ensures that we're good to go with the just updated data.

database/src/pool.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -65,20 +65,11 @@ pub trait Connection: Send + Sync {
6565
// Collector status API
6666

6767
async fn collector_start(&self, aid: ArtifactIdNumber, steps: &[String]);
68-
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str);
69-
async fn collector_end_step(&self, aid: ArtifactIdNumber, step: &str);
7068

71-
// This returns `true` if the collector commands can be placed in a separate
72-
// transaction.
73-
//
74-
// Currently, the sqlite backend does not support "regular" usage where they
75-
// are used for genuine progress reporting. sqlite does not support
76-
// concurrent writers -- it will return an error (or wait, if a busy timeout
77-
// is configured).
78-
//
79-
// For now we don't care much as sqlite is not used in production and in
80-
// local usage you can just look at the logs.
81-
fn separate_transaction_for_collector(&self) -> bool;
69+
// Returns `true` if the step was started, i.e., it did not previously have
70+
// an end. Otherwise returns false, indicating that we can skip it.
71+
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str) -> bool;
72+
async fn collector_end_step(&self, aid: ArtifactIdNumber, step: &str);
8273
}
8374

8475
#[async_trait::async_trait]

database/src/pool/postgres.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ static MIGRATIONS: &[&str] = &[
167167
end_time timestamptz
168168
);
169169
"#,
170+
r#"alter table collector_progress add unique (aid, step);"#,
170171
];
171172

172173
#[async_trait::async_trait]
@@ -781,6 +782,7 @@ where
781782
}
782783

783784
async fn collector_start(&self, aid: ArtifactIdNumber, steps: &[String]) {
785+
// Clean up -- we'll re-insert any missing things in the loop below.
784786
self.conn()
785787
.execute(
786788
"delete from collector_progress where start_time is null or end_time is null;",
@@ -792,34 +794,40 @@ where
792794
for step in steps {
793795
self.conn()
794796
.execute(
795-
"insert into collector_progress(aid, step) VALUES ($1, $2)",
797+
"insert into collector_progress(aid, step) VALUES ($1, $2)
798+
ON CONFLICT DO NOTHING",
796799
&[&(aid.0 as i16), &step],
797800
)
798801
.await
799802
.unwrap();
800803
}
801804
}
802-
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str) {
805+
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str) -> bool {
806+
// If we modified a row, then we populated a start time, so we're good
807+
// to go. Otherwise we should just skip this step.
803808
self.conn()
804809
.execute(
805810
"update collector_progress set start_time = statement_timestamp() \
806-
where aid = $1 and step = $2 and start_time is null and end_time is null;",
811+
where aid = $1 and step = $2 and end_time is null;",
807812
&[&(aid.0 as i16), &step],
808813
)
809814
.await
810-
.unwrap();
815+
.unwrap()
816+
== 1
811817
}
812818
async fn collector_end_step(&self, aid: ArtifactIdNumber, step: &str) {
813-
self.conn()
819+
let did_modify = self
820+
.conn()
814821
.execute(
815822
"update collector_progress set end_time = statement_timestamp() \
816823
where aid = $1 and step = $2 and start_time is not null and end_time is null;",
817824
&[&(aid.0 as i16), &step],
818825
)
819826
.await
820-
.unwrap();
821-
}
822-
fn separate_transaction_for_collector(&self) -> bool {
823-
true
827+
.unwrap()
828+
== 1;
829+
if !did_modify {
830+
log::error!("did not end {} for {:?}", step, aid);
831+
}
824832
}
825833
}

database/src/pool/sqlite.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ static MIGRATIONS: &[&str] = &[
145145
aid integer not null references artifact(id) on delete cascade on update cascade,
146146
step text not null,
147147
start integer,
148-
end integer
148+
end integer,
149+
UNIQUE(aid, step)
149150
);
150151
"#,
151152
];
@@ -227,7 +228,7 @@ impl Connection for SqliteConnection {
227228
async fn record_duration(&self, artifact: ArtifactIdNumber, duration: Duration) {
228229
self.raw_ref()
229230
.prepare_cached(
230-
"insert into artifact_collection_duration (aid, date_recorded, duration) VALUES (?, strftime('%s','now'), ?)",
231+
"insert or ignore into artifact_collection_duration (aid, date_recorded, duration) VALUES (?, strftime('%s','now'), ?)",
231232
)
232233
.unwrap()
233234
.execute(params![artifact.0, duration.as_secs() as i64])
@@ -644,31 +645,34 @@ impl Connection for SqliteConnection {
644645
for step in steps {
645646
self.raw_ref()
646647
.execute(
647-
"insert into collector_progress(aid, step) VALUES (?, ?)",
648+
"insert or ignore into collector_progress(aid, step) VALUES (?, ?)",
648649
params![&aid.0, step],
649650
)
650651
.unwrap();
651652
}
652653
}
653-
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str) {
654+
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str) -> bool {
654655
self.raw_ref()
655656
.execute(
656657
"update collector_progress set start = strftime('%s','now') \
657-
where aid = ? and step = ? and start is null;",
658+
where aid = ? and step = ? and end is null;",
658659
params![&aid.0, &step],
659660
)
660-
.unwrap();
661+
.unwrap()
662+
== 1
661663
}
662664
async fn collector_end_step(&self, aid: ArtifactIdNumber, step: &str) {
663-
self.raw_ref()
665+
let did_modify = self
666+
.raw_ref()
664667
.execute(
665668
"update collector_progress set end = strftime('%s','now') \
666-
where aid = ? and step = ? and start is not null and end is null;",
669+
where aid = ? and step = ? and start is not null and end is null;",
667670
params![&aid.0, &step],
668671
)
669-
.unwrap();
670-
}
671-
fn separate_transaction_for_collector(&self) -> bool {
672-
false
672+
.unwrap()
673+
== 1;
674+
if !did_modify {
675+
log::error!("did not end {} for {:?}", step, aid);
676+
}
673677
}
674678
}

0 commit comments

Comments
 (0)