Skip to content

Commit 1e7fda2

Browse files
Rework getting next crate to be per-worker, rather than a global queue
The previous commit added the notion of timeouts on retrieved crates from the server, which meant that when an agent (which has multiple workers) requested the 1024 crates from the server, the tail of that request would likely have timed out by the time the agent started/completed working on those crates. This isn't technically a problem, but it is likely to lead to duplicated work for that tail, so we avoid it by retrieving crates one at a time with the new commit.
1 parent d89aa84 commit 1e7fda2

File tree

10 files changed

+88
-46
lines changed

10 files changed

+88
-46
lines changed

src/agent/api.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,15 @@ impl AgentApi {
120120
})
121121
}
122122

123-
pub fn next_experiment(&self) -> Fallible<(Experiment, Vec<Crate>)> {
123+
pub fn next_experiment(&self) -> Fallible<Experiment> {
124124
self.retry(|this| loop {
125125
let resp: Option<_> = this
126-
.build_request(Method::GET, "next-experiment")
126+
.build_request(Method::POST, "next-experiment")
127127
.send()?
128128
.to_api_response()?;
129129

130-
if let Some((experiment, crates)) = resp {
131-
return Ok((experiment, crates));
130+
if let Some(experiment) = resp {
131+
return Ok(experiment);
132132
}
133133

134134
// If we're just waiting for an experiment, we should be considered
@@ -139,6 +139,15 @@ impl AgentApi {
139139
})
140140
}
141141

142+
pub fn next_crate(&self, ex: &str) -> Fallible<Option<Crate>> {
143+
self.retry(|this| loop {
144+
this.build_request(Method::POST, "next-crate")
145+
.json(&json!(ex))
146+
.send()?
147+
.to_api_response()?
148+
})
149+
}
150+
142151
pub fn record_progress(
143152
&self,
144153
ex: &Experiment,

src/agent/mod.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ impl FromIterator<String> for Capabilities {
6969
}
7070
}
7171

72-
struct Agent {
72+
pub struct Agent {
7373
api: AgentApi,
74-
config: Config,
74+
pub config: Config,
7575
}
7676

7777
impl Agent {
@@ -90,10 +90,14 @@ impl Agent {
9090
})
9191
}
9292

93-
fn experiment(&self) -> Fallible<(Experiment, Vec<Crate>)> {
93+
fn experiment(&self) -> Fallible<Experiment> {
9494
info!("asking the server for a new experiment...");
9595
Ok(self.api.next_experiment()?)
9696
}
97+
98+
pub fn next_crate(&self, ex: &str) -> Fallible<Option<Crate>> {
99+
self.api.next_crate(ex)
100+
}
97101
}
98102

99103
static HEALTH_CHECK: AtomicBool = AtomicBool::new(false);
@@ -150,7 +154,7 @@ fn run_experiment(
150154
threads_count: usize,
151155
past_experiment: &mut Option<String>,
152156
) -> Result<(), (Option<Experiment>, Error)> {
153-
let (ex, crates) = agent.experiment().map_err(|e| (None, e))?;
157+
let ex = agent.experiment().map_err(|e| (None, e))?;
154158

155159
if Some(&ex.name) != past_experiment.as_ref() {
156160
debug!("purging build directories...");
@@ -170,8 +174,10 @@ fn run_experiment(
170174
}
171175
}
172176

173-
crate::runner::run_ex(&ex, workspace, &crates, db, threads_count, &agent.config)
174-
.map_err(|err| (Some(ex), err))?;
177+
crate::runner::run_ex(&ex, workspace, db, threads_count, &agent.config, &|| {
178+
agent.next_crate(&ex.name)
179+
})
180+
.map_err(|err| (Some(ex), err))?;
175181
Ok(())
176182
}
177183

src/cli.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,13 +468,16 @@ impl Crater {
468468
let workspace = self
469469
.workspace(docker_env.as_ref().map(|s| s.as_str()), fast_workspace_init)?;
470470
workspace.purge_all_build_dirs()?;
471+
472+
let crates =
473+
std::sync::Mutex::new(experiment.get_uncompleted_crates(&db, None)?);
471474
let res = runner::run_ex(
472475
&experiment,
473476
&workspace,
474-
&experiment.get_uncompleted_crates(&db, &config)?,
475477
&result_db,
476478
threads,
477479
&config,
480+
&|| Ok(crates.lock().unwrap().pop()),
478481
);
479482
workspace.purge_all_build_dirs()?;
480483
res?;

src/db/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ impl Database {
5858
#[cfg(test)]
5959
pub fn temp() -> Fallible<Self> {
6060
let tempfile = NamedTempFile::new()?;
61+
dbg!(&tempfile.path());
6162
Database::new(
6263
SqliteConnectionManager::file(tempfile.path()),
6364
Some(tempfile),

src/experiments.rs

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::config::Config;
21
use crate::crates::Crate;
32
use crate::db::{Database, QueryUtils};
43
use crate::prelude::*;
@@ -13,7 +12,6 @@ use std::str::FromStr;
1312
use url::Url;
1413

1514
//sqlite limit is ignored if the expression evaluates to a negative value
16-
static FULL_LIST: i32 = -1;
1715
static SQL_VARIABLE_LIMIT: usize = 500;
1816

1917
string_enum!(pub enum Status {
@@ -595,18 +593,12 @@ impl Experiment {
595593
.collect::<Fallible<Vec<Crate>>>()
596594
}
597595

598-
fn crate_list_size(&self, config: &Config) -> i32 {
599-
match self.assigned_to {
600-
//if experiment is distributed return chunk
601-
Some(Assignee::Distributed) => config.chunk_size(),
602-
//if experiment is assigned to specific agent return all the crates
603-
_ => FULL_LIST,
604-
}
605-
}
606-
607-
pub fn get_uncompleted_crates(&self, db: &Database, config: &Config) -> Fallible<Vec<Crate>> {
608-
let limit = self.crate_list_size(config);
609-
596+
pub fn get_uncompleted_crates(
597+
&self,
598+
db: &Database,
599+
limit: Option<u32>,
600+
) -> Fallible<Vec<Crate>> {
601+
let limit = limit.map(|l| l as i32).unwrap_or(-1);
610602
#[cfg(not(test))]
611603
const RUN_TIMEOUT: u32 = 20;
612604
#[cfg(test)]
@@ -999,12 +991,12 @@ mod tests {
999991
// Create a dummy experiment
1000992
CreateExperiment::dummy("dummy").apply(&ctx).unwrap();
1001993
let ex = Experiment::get(&db, "dummy").unwrap().unwrap();
1002-
let crates = ex.get_uncompleted_crates(&db, &config).unwrap();
994+
let crates = ex.get_uncompleted_crates(&db, None).unwrap();
1003995
// Assert the whole list is returned
1004996
assert_eq!(crates.len(), ex.get_crates(&db).unwrap().len());
1005997

1006998
// Test already completed crates does not show up again
1007-
let uncompleted_crates = ex.get_uncompleted_crates(&db, &config).unwrap();
999+
let uncompleted_crates = ex.get_uncompleted_crates(&db, None).unwrap();
10081000
assert_eq!(uncompleted_crates.len(), 0);
10091001
}
10101002

@@ -1022,10 +1014,10 @@ mod tests {
10221014
// Create a dummy experiment
10231015
CreateExperiment::dummy("dummy").apply(&ctx).unwrap();
10241016
let ex = Experiment::next(&db, &agent1).unwrap().unwrap().1;
1025-
assert!(!ex.get_uncompleted_crates(&db, &config).unwrap().is_empty());
1017+
assert!(!ex.get_uncompleted_crates(&db, None).unwrap().is_empty());
10261018
assert!(Experiment::next(&db, &agent1).unwrap().is_some());
10271019
std::thread::sleep(std::time::Duration::from_secs(80)); // need to wait for at least 60 seconds for timeout to fire
10281020
assert_eq!(ex.status, Status::Running);
1029-
assert!(!ex.get_uncompleted_crates(&db, &config).unwrap().is_empty());
1021+
assert!(!ex.get_uncompleted_crates(&db, None).unwrap().is_empty());
10301022
}
10311023
}

src/runner/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@ impl RunnerState {
4848
pub fn run_ex<DB: WriteResults + Sync>(
4949
ex: &Experiment,
5050
workspace: &Workspace,
51-
crates: &[Crate],
5251
db: &DB,
5352
threads_count: usize,
5453
config: &Config,
54+
next_crate: &(dyn Fn() -> Fallible<Option<Crate>> + Send + Sync),
5555
) -> Fallible<()> {
5656
// Attempt to spin indefinitely until docker is up. Ideally, we would
5757
// decomission this agent until docker is up, instead of leaving the
@@ -105,17 +105,16 @@ pub fn run_ex<DB: WriteResults + Sync>(
105105

106106
let state = RunnerState::new();
107107

108-
let crates = Mutex::new(crates.to_vec());
109108
let workers = (0..threads_count)
110109
.map(|i| {
111110
Worker::new(
112111
format!("worker-{}", i),
113112
workspace,
114113
ex,
115114
config,
116-
&crates,
117115
&state,
118116
db,
117+
next_crate,
119118
)
120119
})
121120
.collect::<Vec<_>>();

src/runner/worker.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::config::Config;
21
use crate::crates::Crate;
32
use crate::experiments::{Experiment, Mode};
43
use crate::prelude::*;
@@ -19,30 +18,30 @@ pub(super) struct Worker<'a, DB: WriteResults + Sync> {
1918
workspace: &'a Workspace,
2019
build_dir: Mutex<BuildDirectory>,
2120
ex: &'a Experiment,
22-
config: &'a Config,
23-
crates: &'a Mutex<Vec<Crate>>,
21+
config: &'a crate::config::Config,
2422
state: &'a RunnerState,
2523
db: &'a DB,
2624
target_dir_cleanup: AtomicBool,
25+
next_crate: &'a (dyn Fn() -> Fallible<Option<Crate>> + Send + Sync),
2726
}
2827

2928
impl<'a, DB: WriteResults + Sync> Worker<'a, DB> {
3029
pub(super) fn new(
3130
name: String,
3231
workspace: &'a Workspace,
3332
ex: &'a Experiment,
34-
config: &'a Config,
35-
crates: &'a Mutex<Vec<Crate>>,
33+
config: &'a crate::config::Config,
3634
state: &'a RunnerState,
3735
db: &'a DB,
36+
next_crate: &'a (dyn Fn() -> Fallible<Option<Crate>> + Send + Sync),
3837
) -> Self {
3938
Worker {
4039
build_dir: Mutex::new(workspace.build_dir(&name)),
4140
name,
4241
workspace,
4342
ex,
4443
config,
45-
crates,
44+
next_crate,
4645
state,
4746
db,
4847
target_dir_cleanup: AtomicBool::new(false),
@@ -92,7 +91,7 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> {
9291

9392
pub(super) fn run(&self) -> Fallible<()> {
9493
loop {
95-
let krate = if let Some(next) = self.crates.lock().unwrap().pop() {
94+
let krate = if let Some(next) = (self.next_crate)()? {
9695
next
9796
} else {
9897
// We're done if no more crates left.

src/server/agents.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ pub struct Agent {
2828
impl Agent {
2929
fn with_experiment(mut self, db: &Database) -> Fallible<Self> {
3030
self.experiment = Experiment::run_by(db, &Assignee::Agent(self.name.clone()))?;
31+
eprintln!(
32+
"{} has experiment {:?}",
33+
self.name,
34+
self.experiment.as_ref().map(|e| &e.name)
35+
);
3136
Ok(self)
3237
}
3338

@@ -278,10 +283,12 @@ mod tests {
278283
let (_new, ex) = Experiment::next(&db, &Assignee::Agent("agent".to_string()))
279284
.unwrap()
280285
.unwrap();
281-
ex.get_uncompleted_crates(&db, &config).unwrap();
286+
ex.get_uncompleted_crates(&db, None).unwrap();
282287

283288
// After an experiment is assigned to the agent, the agent is working
284289
let agent = agents.get("agent").unwrap().unwrap();
290+
dbg!(agent.status());
291+
std::thread::sleep(std::time::Duration::from_secs(100000));
285292
assert_eq!(agent.status(), AgentStatus::Working);
286293
}
287294

src/server/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ mod tests {
245245
.apply(&ctx)
246246
.unwrap();
247247
let ex = Experiment::next(&db, &assignee).unwrap().unwrap().1;
248-
ex.get_uncompleted_crates(&db, &config).unwrap();
248+
ex.get_uncompleted_crates(&db, None).unwrap();
249249
METRICS.update_agent_status(&db, &agent_list_ref).unwrap();
250250

251251
// There are no experiments in the queue but agent1 is still executing the

src/server/routes/agent.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,22 @@ pub fn routes(
4040
.and(auth_filter(data.clone(), TokenType::Agent))
4141
.map(endpoint_config);
4242

43-
let next_experiment = warp::get2()
43+
let next_experiment = warp::post2()
4444
.and(warp::path("next-experiment"))
4545
.and(warp::path::end())
4646
.and(mutex_filter.clone())
4747
.and(github_data_filter)
4848
.and(auth_filter(data.clone(), TokenType::Agent))
4949
.map(endpoint_next_experiment);
5050

51+
let next_crate = warp::post2()
52+
.and(warp::path("next-crate"))
53+
.and(warp::path::end())
54+
.and(warp::body::json())
55+
.and(data_filter.clone())
56+
.and(auth_filter(data.clone(), TokenType::Agent))
57+
.map(endpoint_next_crate);
58+
5159
let record_progress = warp::post2()
5260
.and(warp::path("record-progress"))
5361
.and(warp::path::end())
@@ -76,6 +84,8 @@ pub fn routes(
7684
config
7785
.or(next_experiment)
7886
.unify()
87+
.or(next_crate)
88+
.unify()
7989
.or(record_progress)
8090
.unify()
8191
.or(heartbeat)
@@ -126,10 +136,26 @@ fn endpoint_next_experiment(
126136
}
127137
}
128138

129-
Some((
130-
ex.clone(),
131-
ex.get_uncompleted_crates(&data.db, &data.config)?,
132-
))
139+
Some(ex)
140+
} else {
141+
None
142+
};
143+
144+
Ok(ApiResponse::Success { result }.into_response()?)
145+
}
146+
147+
fn endpoint_next_crate(
148+
experiment: String,
149+
data: Arc<Data>,
150+
_auth: AuthDetails,
151+
) -> Fallible<Response<Body>> {
152+
let result = if let Some(ex) = Experiment::get(&data.db, &experiment)? {
153+
let mut crates = ex.get_uncompleted_crates(&data.db, Some(1))?;
154+
if crates.is_empty() {
155+
None
156+
} else {
157+
Some(crates.remove(0))
158+
}
133159
} else {
134160
None
135161
};

0 commit comments

Comments
 (0)