Skip to content

Commit d89aa84

Browse files
committed
Auto merge of #649 - Mark-Simulacrum:better-queue, r=Mark-Simulacrum
Switch crate queue to retrieve crates based on timeout Previously, a given crate was assigned to a particular agent, and then that agent had to either complete or fail (e.g., panic) before it was 'released' for another agent to attempt. This is error prone, and makes it harder for agents to be ephemeral (including their name), as it means that it requires manual intervention if an agent just vanishes to put the assigned crates back into the general queue. We switch to a different assignment scheme. Now, when an agent requests crates from a given experiment, we will dequeue crates that are queued (i.e., incomplete) and that have not started yet **or** have started >20 minutes ago. The current agent timeout is blanket set at 15 minutes, and even if it was longer, we expect that most crates build quite quickly. Duplicate builds are also fine for our case (we'll just get a couple extra results but that'll be deduplicated already when inserting into that table).
2 parents d3cf04d + 5406236 commit d89aa84

File tree

8 files changed

+65
-150
lines changed

8 files changed

+65
-150
lines changed

src/cli.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ impl Crater {
471471
let res = runner::run_ex(
472472
&experiment,
473473
&workspace,
474-
&experiment.get_uncompleted_crates(&db, &config, &Assignee::CLI)?,
474+
&experiment.get_uncompleted_crates(&db, &config)?,
475475
&result_db,
476476
threads,
477477
&config,

src/db/migrations.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,15 @@ fn migrations() -> Vec<(&'static str, MigrationKind)> {
371371
})),
372372
));
373373

374+
migrations.push((
375+
"create_experiment_time_column",
376+
MigrationKind::SQL("alter table experiment_crates add column started_at text;"),
377+
));
378+
migrations.push((
379+
"create_agent_assignment",
380+
MigrationKind::SQL("alter table agents add column latest_work_for text;"),
381+
));
382+
374383
migrations
375384
}
376385

src/experiments.rs

Lines changed: 47 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ string_enum!(pub enum Status {
2020
Queued => "queued",
2121
Running => "running",
2222
NeedsReport => "needs-report",
23-
Failed => "failed",
2423
GeneratingReport => "generating-report",
2524
ReportFailed => "report-failed",
2625
Completed => "completed",
@@ -282,10 +281,9 @@ impl Experiment {
282281

283282
pub fn run_by(db: &Database, assignee: &Assignee) -> Fallible<Option<Experiment>> {
284283
let record = db.get_row(
285-
"select * from experiments where name in ( \
286-
select experiment from experiment_crates \
287-
where status = ?2 and skipped = 0 and assigned_to = ?1 and \
288-
experiment in (select name from experiments where status = ?2)) \
284+
"select * from experiments where name = (
285+
select latest_work_for from agents where ('agent:' || agents.name) = ?1
286+
) and status = ?2 \
289287
limit 1",
290288
&[&assignee.to_string(), Status::Running.to_str()],
291289
|r| ExperimentDBRecord::from_row(r),
@@ -357,7 +355,7 @@ impl Experiment {
357355
}
358356

359357
pub fn next(db: &Database, assignee: &Assignee) -> Fallible<Option<(bool, Experiment)>> {
360-
Self::find_next(db, assignee).and_then(|ex| Self::assign_experiment(db, ex))
358+
Self::find_next(db, assignee).and_then(|ex| Self::assign_experiment(db, ex, assignee))
361359
}
362360
pub fn has_next(db: &Database, assignee: &Assignee) -> Fallible<bool> {
363361
Ok(Self::find_next(db, assignee)?.is_some())
@@ -366,8 +364,16 @@ impl Experiment {
366364
fn assign_experiment(
367365
db: &Database,
368366
ex: Option<Experiment>,
367+
agent: &Assignee,
369368
) -> Fallible<Option<(bool, Experiment)>> {
370369
if let Some(mut experiment) = ex {
370+
if let Assignee::Agent(name) = agent {
371+
db.execute(
372+
"update agents set latest_work_for = ?2 where agents.name = ?1;",
373+
rusqlite::params![&name, &experiment.name],
374+
)?;
375+
}
376+
371377
let new_ex = experiment.status != Status::Running;
372378
if new_ex {
373379
experiment.set_status(db, Status::Running)?;
@@ -405,19 +411,12 @@ impl Experiment {
405411
const AGENT_QUERY: &str = r#"
406412
SELECT *
407413
FROM experiments ex
408-
WHERE ( ex.status = "queued"
409-
OR ( status = "running"
410-
AND ( SELECT COUNT (*)
411-
FROM experiment_crates ex_crates
412-
WHERE ex_crates.experiment = ex.name
413-
AND ( status = "queued")
414-
AND ( skipped = 0)
415-
> 0 ) ) )
416-
AND ( ex.assigned_to = ?1 )
417-
AND ( ex.requirement IS NULL
418-
OR ex.requirement IN ( SELECT capability
419-
FROM agent_capabilities
420-
WHERE agent_name = ?2) )
414+
WHERE (ex.status = "queued" OR status = "running")
415+
AND ( ex.assigned_to = ?1 )
416+
AND ( ex.requirement IS NULL
417+
OR ex.requirement IN (SELECT capability
418+
FROM agent_capabilities
419+
WHERE agent_name = ?2) )
421420
ORDER BY ex.priority DESC,
422421
ex.created_at
423422
LIMIT 1;
@@ -434,15 +433,8 @@ impl Experiment {
434433
const CLI_QUERY: &str = r#"
435434
SELECT *
436435
FROM experiments ex
437-
WHERE ( ex.status = "queued"
438-
OR ( status = "running"
439-
AND ( SELECT COUNT (*)
440-
FROM experiment_crates ex_crates
441-
WHERE ex_crates.experiment = ex.name
442-
AND ( status = "queued")
443-
AND ( skipped = 0)
444-
> 0 ) ) )
445-
AND ( ex.assigned_to IS NULL OR ex.assigned_to = ?1 )
436+
WHERE (ex.status = "queued" OR status = "running")
437+
AND (ex.assigned_to IS NULL OR ex.assigned_to = ?1)
446438
ORDER BY ex.assigned_to IS NULL,
447439
ex.priority DESC,
448440
ex.created_at
@@ -456,14 +448,7 @@ impl Experiment {
456448
const AGENT_UNASSIGNED_QUERY: &str = r#"
457449
SELECT *
458450
FROM experiments ex
459-
WHERE ( ex.status = "queued"
460-
OR ( status = "running"
461-
AND ( SELECT COUNT (*)
462-
FROM experiment_crates ex_crates
463-
WHERE ex_crates.experiment = ex.name
464-
AND ( status = "queued")
465-
AND ( skipped = 0)
466-
> 0 ) ) )
451+
WHERE (ex.status = "queued" OR status = "running")
467452
AND ( ex.assigned_to IS NULL )
468453
AND ( ex.requirement IS NULL
469454
OR ex.requirement IN ( SELECT capability
@@ -500,26 +485,6 @@ impl Experiment {
500485
}
501486
}
502487

503-
pub fn clear_agent_progress(&mut self, db: &Database, agent: &str) -> Fallible<()> {
504-
// Mark all the running crates from this agent as queued (so that they
505-
// run again)
506-
db.execute(
507-
"
508-
UPDATE experiment_crates
509-
SET assigned_to = NULL, status = ?1 \
510-
WHERE experiment = ?2 AND status = ?3 \
511-
AND assigned_to = ?4
512-
",
513-
&[
514-
&Status::Queued.to_string(),
515-
&self.name,
516-
&Status::Running.to_string(),
517-
&Assignee::Agent(agent.to_string()).to_string(),
518-
],
519-
)?;
520-
Ok(())
521-
}
522-
523488
pub fn set_status(&mut self, db: &Database, status: Status) -> Fallible<()> {
524489
db.execute(
525490
"UPDATE experiments SET status = ?1 WHERE name = ?2;",
@@ -538,29 +503,13 @@ impl Experiment {
538503
self.started_at = Some(now);
539504
}
540505
// Check if the old status was "running" and there is no completed date
541-
(Status::Running, new_status)
542-
if self.completed_at.is_none() && new_status != Status::Failed =>
543-
{
506+
(Status::Running, _) if self.completed_at.is_none() => {
544507
db.execute(
545508
"UPDATE experiments SET completed_at = ?1 WHERE name = ?2;",
546509
&[&now, &self.name.as_str()],
547510
)?;
548511
self.completed_at = Some(now);
549512
}
550-
// Queue again failed crates
551-
(Status::Failed, Status::Queued) => {
552-
db.execute(
553-
"UPDATE experiment_crates
554-
SET status = ?1 \
555-
WHERE experiment = ?2 AND status = ?3
556-
",
557-
&[
558-
&Status::Queued.to_string(),
559-
&self.name,
560-
&Status::Failed.to_string(),
561-
],
562-
)?;
563-
}
564513
_ => (),
565514
}
566515

@@ -655,41 +604,46 @@ impl Experiment {
655604
}
656605
}
657606

658-
pub fn get_uncompleted_crates(
659-
&self,
660-
db: &Database,
661-
config: &Config,
662-
assigned_to: &Assignee,
663-
) -> Fallible<Vec<Crate>> {
607+
pub fn get_uncompleted_crates(&self, db: &Database, config: &Config) -> Fallible<Vec<Crate>> {
664608
let limit = self.crate_list_size(config);
665-
let assigned_to = assigned_to.to_string();
609+
610+
#[cfg(not(test))]
611+
const RUN_TIMEOUT: u32 = 20;
612+
#[cfg(test)]
613+
const RUN_TIMEOUT: u32 = 1;
666614

667615
db.transaction(|transaction| {
668616
//get the first 'limit' queued crates from the experiment crates list
669617
let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::new();
670618
let crates = transaction
671619
.query(
672-
"SELECT crate FROM experiment_crates WHERE experiment = ?1
673-
AND status = ?2 AND skipped = 0 LIMIT ?3;",
674-
rusqlite::params![self.name, Status::Queued.to_string(), limit],
620+
&format!(
621+
"SELECT crate FROM experiment_crates WHERE experiment = ?1
622+
AND skipped = 0
623+
AND status = 'queued'
624+
AND (started_at is null or started_at <= datetime('now', '-{} minutes'))
625+
LIMIT ?2;",
626+
RUN_TIMEOUT
627+
),
628+
rusqlite::params![self.name, limit],
675629
|r| r.get("crate"),
676630
)?
677631
.into_iter()
678632
.collect::<Vec<String>>();
679633

680634
crates.iter().for_each(|krate| params.push(krate));
681-
let params_header: &[&dyn rusqlite::types::ToSql] = &[&assigned_to, &self.name];
635+
let params_header: &[&dyn rusqlite::types::ToSql] = &[&self.name];
682636
//SQLite cannot handle queries with more than 999 variables
683637
for params in params.chunks(SQL_VARIABLE_LIMIT) {
684638
let params = [params_header, params].concat();
685639
let update_query = &[
686640
"
687641
UPDATE experiment_crates
688-
SET assigned_to = ?1, status = \"running\" \
689-
WHERE experiment = ?2
642+
SET started_at = datetime('now')
643+
WHERE experiment = ?1
690644
AND crate IN ("
691645
.to_string(),
692-
"?,".repeat(params.len() - 3),
646+
"?,".repeat(params.len() - 2),
693647
"?)".to_string(),
694648
]
695649
.join("");
@@ -703,26 +657,6 @@ impl Experiment {
703657
.collect::<Fallible<Vec<Crate>>>()
704658
})
705659
}
706-
707-
pub fn get_running_crates(
708-
&self,
709-
db: &Database,
710-
assigned_to: &Assignee,
711-
) -> Fallible<Vec<Crate>> {
712-
db.query(
713-
"SELECT crate FROM experiment_crates WHERE experiment = ?1 \
714-
AND status = ?2 AND assigned_to = ?3",
715-
&[
716-
&self.name,
717-
&Status::Running.to_string(),
718-
&assigned_to.to_string(),
719-
],
720-
|r| r.get(0),
721-
)?
722-
.into_iter()
723-
.map(|c: String| c.parse())
724-
.collect::<Fallible<Vec<Crate>>>()
725-
}
726660
}
727661

728662
struct ExperimentDBRecord {
@@ -1065,16 +999,12 @@ mod tests {
1065999
// Create a dummy experiment
10661000
CreateExperiment::dummy("dummy").apply(&ctx).unwrap();
10671001
let ex = Experiment::get(&db, "dummy").unwrap().unwrap();
1068-
let crates = ex
1069-
.get_uncompleted_crates(&db, &config, &Assignee::CLI)
1070-
.unwrap();
1002+
let crates = ex.get_uncompleted_crates(&db, &config).unwrap();
10711003
// Assert the whole list is returned
10721004
assert_eq!(crates.len(), ex.get_crates(&db).unwrap().len());
10731005

10741006
// Test already completed crates does not show up again
1075-
let uncompleted_crates = ex
1076-
.get_uncompleted_crates(&db, &config, &Assignee::CLI)
1077-
.unwrap();
1007+
let uncompleted_crates = ex.get_uncompleted_crates(&db, &config).unwrap();
10781008
assert_eq!(uncompleted_crates.len(), 0);
10791009
}
10801010

@@ -1091,17 +1021,11 @@ mod tests {
10911021

10921022
// Create a dummy experiment
10931023
CreateExperiment::dummy("dummy").apply(&ctx).unwrap();
1094-
let mut ex = Experiment::next(&db, &agent1).unwrap().unwrap().1;
1095-
assert!(!ex
1096-
.get_uncompleted_crates(&db, &config, &agent1)
1097-
.unwrap()
1098-
.is_empty());
1099-
ex.clear_agent_progress(&db, "agent-1").unwrap();
1024+
let ex = Experiment::next(&db, &agent1).unwrap().unwrap().1;
1025+
assert!(!ex.get_uncompleted_crates(&db, &config).unwrap().is_empty());
11001026
assert!(Experiment::next(&db, &agent1).unwrap().is_some());
1027+
std::thread::sleep(std::time::Duration::from_secs(80)); // need to wait for at least 60 seconds for timeout to fire
11011028
assert_eq!(ex.status, Status::Running);
1102-
assert!(!ex
1103-
.get_uncompleted_crates(&db, &config, &agent1)
1104-
.unwrap()
1105-
.is_empty());
1029+
assert!(!ex.get_uncompleted_crates(&db, &config).unwrap().is_empty());
11061030
}
11071031
}

src/server/agents.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,7 @@ mod tests {
278278
let (_new, ex) = Experiment::next(&db, &Assignee::Agent("agent".to_string()))
279279
.unwrap()
280280
.unwrap();
281-
ex.get_uncompleted_crates(&db, &config, &Assignee::Agent("agent".to_string()))
282-
.unwrap();
281+
ex.get_uncompleted_crates(&db, &config).unwrap();
283282

284283
// After an experiment is assigned to the agent, the agent is working
285284
let agent = agents.get("agent").unwrap().unwrap();

src/server/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ mod tests {
245245
.apply(&ctx)
246246
.unwrap();
247247
let ex = Experiment::next(&db, &assignee).unwrap().unwrap().1;
248-
ex.get_uncompleted_crates(&db, &config, &assignee).unwrap();
248+
ex.get_uncompleted_crates(&db, &config).unwrap();
249249
METRICS.update_agent_status(&db, &agent_list_ref).unwrap();
250250

251251
// There are no experiments in the queue but agent1 is still executing the

src/server/routes/agent.rs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ fn endpoint_next_experiment(
111111
) -> Fallible<Response<Body>> {
112112
//we need to make sure that Experiment::next executes uninterrupted
113113
let data = mutex.lock().unwrap();
114-
let next = Experiment::next(&data.db, &Assignee::Agent(auth.name.clone()))?;
114+
let next = Experiment::next(&data.db, &Assignee::Agent(auth.name))?;
115115
let result = if let Some((new, ex)) = next {
116116
if new {
117117
if let Some(github_data) = github_data.as_ref() {
@@ -126,18 +126,10 @@ fn endpoint_next_experiment(
126126
}
127127
}
128128

129-
let running_crates =
130-
ex.get_running_crates(&data.db, &Assignee::Agent(auth.name.clone()))?;
131-
132-
//if the agent crashed (i.e. there are already running crates) return those crates
133-
if !running_crates.is_empty() {
134-
Some((ex, running_crates))
135-
} else {
136-
Some((
137-
ex.clone(),
138-
ex.get_uncompleted_crates(&data.db, &data.config, &Assignee::Agent(auth.name))?,
139-
))
140-
}
129+
Some((
130+
ex.clone(),
131+
ex.get_uncompleted_crates(&data.db, &data.config)?,
132+
))
141133
} else {
142134
None
143135
};
@@ -319,11 +311,10 @@ fn endpoint_error(
319311
);
320312

321313
let data = mutex.lock().unwrap();
322-
let mut ex = Experiment::get(&data.db, &error.experiment_name)?
314+
let ex = Experiment::get(&data.db, &error.experiment_name)?
323315
.ok_or_else(|| err_msg("no experiment run by this agent"))?;
324316

325317
data.metrics.record_error(&auth.name, &ex.name);
326-
ex.clear_agent_progress(&data.db, &auth.name)?;
327318

328319
Ok(ApiResponse::Success { result: true }.into_response()?)
329320
}

0 commit comments

Comments
 (0)