Skip to content

Commit f1c6f83

Browse files
Merge pull request #709 from Mark-Simulacrum/incremental-progress
Incremental progress
2 parents dfe0669 + 7ad3f28 commit f1c6f83

File tree

6 files changed

+192
-79
lines changed

6 files changed

+192
-79
lines changed

collector/src/main.rs

Lines changed: 17 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -195,14 +195,6 @@ fn bench(
195195
self_profile: bool,
196196
) -> BenchmarkErrors {
197197
let mut conn = rt.block_on(pool.connection());
198-
let status_conn;
199-
let status_conn: Option<&dyn database::Connection> =
200-
if conn.separate_transaction_for_collector() {
201-
status_conn = rt.block_on(pool.connection());
202-
Some(&*status_conn)
203-
} else {
204-
None
205-
};
206198
let mut errors = BenchmarkErrors::new();
207199
eprintln!("Benchmarking {} for triple {}", cid, compiler.triple);
208200

@@ -215,38 +207,24 @@ fn bench(
215207
);
216208
}
217209

218-
let mut tx = rt.block_on(conn.transaction());
219-
let index = rt.block_on(database::Index::load(tx.conn()));
220-
let has_collected = match cid {
221-
ArtifactId::Commit(commit) => index.commits().iter().any(|c| c == commit),
222-
ArtifactId::Artifact(id) => index.artifacts().any(|a| a == id),
223-
};
224-
if has_collected {
225-
eprintln!("'{}' has previously been collected, aborting.", cid);
226-
eprintln!(
227-
"Note that this behavior is likely to change in the future \
228-
to collect and append the data instead."
229-
);
230-
return errors;
231-
}
232-
let interned_cid = rt.block_on(tx.conn().artifact_id(&cid));
210+
let interned_cid = rt.block_on(conn.artifact_id(&cid));
233211

234212
let start = Instant::now();
235213
let steps = benchmarks
236214
.iter()
237215
.map(|b| b.name.to_string())
238216
.collect::<Vec<_>>();
239-
rt.block_on(
240-
status_conn
241-
.unwrap_or_else(|| &*tx.conn())
242-
.collector_start(interned_cid, &steps),
243-
);
217+
rt.block_on(conn.collector_start(interned_cid, &steps));
218+
let mut skipped = false;
244219
for (nth_benchmark, benchmark) in benchmarks.iter().enumerate() {
245-
rt.block_on(
246-
status_conn
247-
.unwrap_or_else(|| &*tx.conn())
248-
.collector_start_step(interned_cid, &benchmark.name.to_string()),
249-
);
220+
let is_fresh =
221+
rt.block_on(conn.collector_start_step(interned_cid, &benchmark.name.to_string()));
222+
if !is_fresh {
223+
skipped = true;
224+
eprintln!("skipping {} -- already benchmarked", benchmark.name);
225+
continue;
226+
}
227+
let mut tx = rt.block_on(conn.transaction());
250228
rt.block_on(
251229
tx.conn()
252230
.record_benchmark(benchmark.name.0.as_str(), benchmark.supports_stable()),
@@ -278,19 +256,19 @@ fn bench(
278256
));
279257
};
280258
rt.block_on(
281-
status_conn
282-
.unwrap_or_else(|| &*tx.conn())
259+
tx.conn()
283260
.collector_end_step(interned_cid, &benchmark.name.to_string()),
284261
);
262+
rt.block_on(tx.commit()).expect("committed");
285263
}
286264
let end = start.elapsed();
287265

288266
eprintln!("collection took {:?}", end);
289267

290-
rt.block_on(tx.conn().record_duration(interned_cid, end));
291-
292-
// Publish results now that we've finished fully with this commit.
293-
rt.block_on(tx.commit()).unwrap();
268+
if !skipped {
269+
log::info!("skipping duration record -- skipped parts of run");
270+
rt.block_on(conn.record_duration(interned_cid, end));
271+
}
294272

295273
rt.block_on(async move {
296274
// This ensures that we're good to go with the just updated data.

database/src/pool.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,20 +65,14 @@ pub trait Connection: Send + Sync {
6565
// Collector status API
6666

6767
async fn collector_start(&self, aid: ArtifactIdNumber, steps: &[String]);
68-
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str);
68+
69+
// Returns `true` if the step was started, i.e., it did not previously have
70+
// an end. Otherwise returns false, indicating that we can skip it.
71+
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str) -> bool;
6972
async fn collector_end_step(&self, aid: ArtifactIdNumber, step: &str);
7073

71-
// This returns `true` if the collector commands can be placed in a separate
72-
// transaction.
73-
//
74-
// Currently, the sqlite backend does not support "regular" usage where they
75-
// are used for genuine progress reporting. sqlite does not support
76-
// concurrent writers -- it will return an error (or wait, if a busy timeout
77-
// is configured).
78-
//
79-
// For now we don't care much as sqlite is not used in production and in
80-
// local usage you can just look at the logs.
81-
fn separate_transaction_for_collector(&self) -> bool;
74+
// Returns an artifact that is in progress.
75+
async fn in_progress_artifact(&self) -> Option<ArtifactId>;
8276
}
8377

8478
#[async_trait::async_trait]

database/src/pool/postgres.rs

Lines changed: 67 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction};
22
use crate::{
3-
ArtifactIdNumber, Cache, CollectionId, Commit, Crate, Date, Index, Profile, QueuedCommit,
3+
ArtifactId, ArtifactIdNumber, Cache, CollectionId, Commit, Crate, Date, Index, Profile,
4+
QueuedCommit,
45
};
56
use anyhow::Context as _;
67
use chrono::{DateTime, TimeZone, Utc};
@@ -167,6 +168,7 @@ static MIGRATIONS: &[&str] = &[
167168
end_time timestamptz
168169
);
169170
"#,
171+
r#"alter table collector_progress add unique (aid, step);"#,
170172
];
171173

172174
#[async_trait::async_trait]
@@ -659,9 +661,9 @@ where
659661
.unwrap();
660662
}
661663

662-
async fn artifact_id(&self, artifact: &crate::ArtifactId) -> ArtifactIdNumber {
664+
async fn artifact_id(&self, artifact: &ArtifactId) -> ArtifactIdNumber {
663665
let (name, date, ty) = match artifact {
664-
crate::ArtifactId::Commit(commit) => (
666+
ArtifactId::Commit(commit) => (
665667
commit.sha.to_string(),
666668
if commit.is_try() {
667669
None
@@ -670,7 +672,7 @@ where
670672
},
671673
if commit.is_try() { "try" } else { "master" },
672674
),
673-
crate::ArtifactId::Artifact(a) => (a.clone(), None, "release"),
675+
ArtifactId::Artifact(a) => (a.clone(), None, "release"),
674676
};
675677

676678
let aid = self.conn()
@@ -770,6 +772,19 @@ where
770772
.unwrap();
771773
}
772774
async fn record_benchmark(&self, krate: &str, supports_stable: bool) {
775+
if let Some(r) = self
776+
.conn()
777+
.query_opt(
778+
"select stabilized from benchmark where name = $1",
779+
&[&krate],
780+
)
781+
.await
782+
.unwrap()
783+
{
784+
if r.get::<_, bool>(0) == supports_stable {
785+
return;
786+
}
787+
}
773788
self.conn()
774789
.execute(
775790
"insert into benchmark (name, stabilized) VALUES ($1, $2)
@@ -781,6 +796,7 @@ where
781796
}
782797

783798
async fn collector_start(&self, aid: ArtifactIdNumber, steps: &[String]) {
799+
// Clean up -- we'll re-insert any missing things in the loop below.
784800
self.conn()
785801
.execute(
786802
"delete from collector_progress where start_time is null or end_time is null;",
@@ -792,34 +808,73 @@ where
792808
for step in steps {
793809
self.conn()
794810
.execute(
795-
"insert into collector_progress(aid, step) VALUES ($1, $2)",
811+
"insert into collector_progress(aid, step) VALUES ($1, $2)
812+
ON CONFLICT DO NOTHING",
796813
&[&(aid.0 as i16), &step],
797814
)
798815
.await
799816
.unwrap();
800817
}
801818
}
802-
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str) {
819+
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str) -> bool {
820+
// If we modified a row, then we populated a start time, so we're good
821+
// to go. Otherwise we should just skip this step.
803822
self.conn()
804823
.execute(
805824
"update collector_progress set start_time = statement_timestamp() \
806-
where aid = $1 and step = $2 and start_time is null and end_time is null;",
825+
where aid = $1 and step = $2 and end_time is null;",
807826
&[&(aid.0 as i16), &step],
808827
)
809828
.await
810-
.unwrap();
829+
.unwrap()
830+
== 1
811831
}
812832
async fn collector_end_step(&self, aid: ArtifactIdNumber, step: &str) {
813-
self.conn()
833+
let did_modify = self
834+
.conn()
814835
.execute(
815836
"update collector_progress set end_time = statement_timestamp() \
816837
where aid = $1 and step = $2 and start_time is not null and end_time is null;",
817838
&[&(aid.0 as i16), &step],
818839
)
819840
.await
820-
.unwrap();
841+
.unwrap()
842+
== 1;
843+
if !did_modify {
844+
log::error!("did not end {} for {:?}", step, aid);
845+
}
821846
}
822-
fn separate_transaction_for_collector(&self) -> bool {
823-
true
847+
async fn in_progress_artifact(&self) -> Option<ArtifactId> {
848+
let rows = self
849+
.conn()
850+
.query(
851+
"select distinct aid from collector_progress where end_time is null order by aid limit 1",
852+
&[],
853+
)
854+
.await
855+
.unwrap();
856+
let aid = rows.into_iter().next().map(|row| row.get::<_, i16>(0))?;
857+
858+
let row = self
859+
.conn()
860+
.query_one(
861+
"select name, date, type from artifact where id = $1",
862+
&[&aid],
863+
)
864+
.await
865+
.unwrap();
866+
867+
let ty = row.get::<_, String>(2);
868+
Some(match ty.as_str() {
869+
"try" | "master" => ArtifactId::Commit(Commit {
870+
sha: row.get(0),
871+
date: Date(row.get(1)),
872+
}),
873+
"release" => ArtifactId::Artifact(row.get(0)),
874+
_ => {
875+
log::error!("unknown ty {:?}", ty);
876+
return None;
877+
}
878+
})
824879
}
825880
}

database/src/pool/sqlite.rs

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction};
2+
use crate::{ArtifactId, CollectionId, Commit, Crate, Date, Profile};
23
use crate::{ArtifactIdNumber, Index, QueryDatum, QueuedCommit};
3-
use crate::{CollectionId, Commit, Crate, Date, Profile};
44
use chrono::{TimeZone, Utc};
55
use hashbrown::HashMap;
66
use rusqlite::params;
@@ -145,7 +145,8 @@ static MIGRATIONS: &[&str] = &[
145145
aid integer not null references artifact(id) on delete cascade on update cascade,
146146
step text not null,
147147
start integer,
148-
end integer
148+
end integer,
149+
UNIQUE(aid, step)
149150
);
150151
"#,
151152
];
@@ -227,7 +228,7 @@ impl Connection for SqliteConnection {
227228
async fn record_duration(&self, artifact: ArtifactIdNumber, duration: Duration) {
228229
self.raw_ref()
229230
.prepare_cached(
230-
"insert into artifact_collection_duration (aid, date_recorded, duration) VALUES (?, strftime('%s','now'), ?)",
231+
"insert or ignore into artifact_collection_duration (aid, date_recorded, duration) VALUES (?, strftime('%s','now'), ?)",
231232
)
232233
.unwrap()
233234
.execute(params![artifact.0, duration.as_secs() as i64])
@@ -644,31 +645,75 @@ impl Connection for SqliteConnection {
644645
for step in steps {
645646
self.raw_ref()
646647
.execute(
647-
"insert into collector_progress(aid, step) VALUES (?, ?)",
648+
"insert or ignore into collector_progress(aid, step) VALUES (?, ?)",
648649
params![&aid.0, step],
649650
)
650651
.unwrap();
651652
}
652653
}
653-
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str) {
654+
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str) -> bool {
654655
self.raw_ref()
655656
.execute(
656657
"update collector_progress set start = strftime('%s','now') \
657-
where aid = ? and step = ? and start is null;",
658+
where aid = ? and step = ? and end is null;",
658659
params![&aid.0, &step],
659660
)
660-
.unwrap();
661+
.unwrap()
662+
== 1
661663
}
662664
async fn collector_end_step(&self, aid: ArtifactIdNumber, step: &str) {
663-
self.raw_ref()
665+
let did_modify = self
666+
.raw_ref()
664667
.execute(
665668
"update collector_progress set end = strftime('%s','now') \
666-
where aid = ? and step = ? and start is not null and end is null;",
669+
where aid = ? and step = ? and start is not null and end is null;",
667670
params![&aid.0, &step],
668671
)
669-
.unwrap();
672+
.unwrap()
673+
== 1;
674+
if !did_modify {
675+
log::error!("did not end {} for {:?}", step, aid);
676+
}
670677
}
671-
fn separate_transaction_for_collector(&self) -> bool {
672-
false
678+
async fn in_progress_artifact(&self) -> Option<ArtifactId> {
679+
let aid = self
680+
.raw_ref()
681+
.query_row(
682+
"select distinct aid from collector_progress where end is null order by aid limit 1",
683+
params![],
684+
|r| r.get::<_, i16>(0),
685+
)
686+
.optional()
687+
.unwrap()?;
688+
689+
let (name, date, ty) = self
690+
.raw_ref()
691+
.query_row(
692+
"select name, date, type from artifact where id = ?",
693+
params![&aid],
694+
|r| {
695+
Ok((
696+
r.get::<_, String>(0)?,
697+
r.get::<_, Option<i64>>(1)?,
698+
r.get::<_, String>(2)?,
699+
))
700+
},
701+
)
702+
.unwrap();
703+
704+
Some(match ty.as_str() {
705+
"try" | "master" => ArtifactId::Commit(Commit {
706+
sha: name,
707+
date: date
708+
.map(|d| Utc.timestamp(d, 0))
709+
.map(Date)
710+
.unwrap_or_else(|| Date::ymd_hms(2001, 01, 01, 0, 0, 0)),
711+
}),
712+
"release" => ArtifactId::Artifact(name),
713+
_ => {
714+
log::error!("unknown ty {:?}", ty);
715+
return None;
716+
}
717+
})
673718
}
674719
}

0 commit comments

Comments
 (0)