Skip to content

Commit f2b24d2

Browse files
committed
Auto merge of #650 - Mark-Simulacrum:better-queue, r=Mark-Simulacrum
Rework getting next crate to be per-worker, rather than a global queue The previous commit added the notion of timeouts on retrieved crates from the server, which meant that when an agent (which has multiple workers) requested the 1024 crates from the server, the tail of that request would likely have timed out by the time the agent started/completed working on those crates. This isn't technically a problem, but it is likely to lead to duplicated work for that tail which is a performance issue, so we avoid it by retrieving crates one at a time with the new commit.
2 parents d89aa84 + d99ee66 commit f2b24d2

File tree

11 files changed

+80
-67
lines changed

11 files changed

+80
-67
lines changed

config.toml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,6 @@ remove = "^S-"
1212
experiment-queued = "S-waiting-on-crater"
1313
experiment-completed = "S-waiting-on-review"
1414

15-
[server.distributed]
16-
# Number of crates in each chunk when running distributed experiments
17-
# A negative value selects all the available crates
18-
chunk-size = 1024
19-
2015
# This section contains the list of tested crates when defining an experiment
2116
# with `--crate-select demo`.
2217

src/agent/api.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,15 @@ impl AgentApi {
120120
})
121121
}
122122

123-
pub fn next_experiment(&self) -> Fallible<(Experiment, Vec<Crate>)> {
123+
pub fn next_experiment(&self) -> Fallible<Experiment> {
124124
self.retry(|this| loop {
125125
let resp: Option<_> = this
126-
.build_request(Method::GET, "next-experiment")
126+
.build_request(Method::POST, "next-experiment")
127127
.send()?
128128
.to_api_response()?;
129129

130-
if let Some((experiment, crates)) = resp {
131-
return Ok((experiment, crates));
130+
if let Some(experiment) = resp {
131+
return Ok(experiment);
132132
}
133133

134134
// If we're just waiting for an experiment, we should be considered
@@ -139,6 +139,15 @@ impl AgentApi {
139139
})
140140
}
141141

142+
pub fn next_crate(&self, ex: &str) -> Fallible<Option<Crate>> {
143+
self.retry(|this| loop {
144+
this.build_request(Method::POST, "next-crate")
145+
.json(&json!(ex))
146+
.send()?
147+
.to_api_response()?
148+
})
149+
}
150+
142151
pub fn record_progress(
143152
&self,
144153
ex: &Experiment,

src/agent/mod.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ impl FromIterator<String> for Capabilities {
6969
}
7070
}
7171

72-
struct Agent {
72+
pub struct Agent {
7373
api: AgentApi,
74-
config: Config,
74+
pub config: Config,
7575
}
7676

7777
impl Agent {
@@ -90,10 +90,14 @@ impl Agent {
9090
})
9191
}
9292

93-
fn experiment(&self) -> Fallible<(Experiment, Vec<Crate>)> {
93+
fn experiment(&self) -> Fallible<Experiment> {
9494
info!("asking the server for a new experiment...");
9595
Ok(self.api.next_experiment()?)
9696
}
97+
98+
pub fn next_crate(&self, ex: &str) -> Fallible<Option<Crate>> {
99+
self.api.next_crate(ex)
100+
}
97101
}
98102

99103
static HEALTH_CHECK: AtomicBool = AtomicBool::new(false);
@@ -150,7 +154,7 @@ fn run_experiment(
150154
threads_count: usize,
151155
past_experiment: &mut Option<String>,
152156
) -> Result<(), (Option<Experiment>, Error)> {
153-
let (ex, crates) = agent.experiment().map_err(|e| (None, e))?;
157+
let ex = agent.experiment().map_err(|e| (None, e))?;
154158

155159
if Some(&ex.name) != past_experiment.as_ref() {
156160
debug!("purging build directories...");
@@ -170,8 +174,10 @@ fn run_experiment(
170174
}
171175
}
172176

173-
crate::runner::run_ex(&ex, workspace, &crates, db, threads_count, &agent.config)
174-
.map_err(|err| (Some(ex), err))?;
177+
crate::runner::run_ex(&ex, workspace, db, threads_count, &agent.config, &|| {
178+
agent.next_crate(&ex.name)
179+
})
180+
.map_err(|err| (Some(ex), err))?;
175181
Ok(())
176182
}
177183

src/cli.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,13 +468,16 @@ impl Crater {
468468
let workspace = self
469469
.workspace(docker_env.as_ref().map(|s| s.as_str()), fast_workspace_init)?;
470470
workspace.purge_all_build_dirs()?;
471+
472+
let crates =
473+
std::sync::Mutex::new(experiment.get_uncompleted_crates(&db, None)?);
471474
let res = runner::run_ex(
472475
&experiment,
473476
&workspace,
474-
&experiment.get_uncompleted_crates(&db, &config)?,
475477
&result_db,
476478
threads,
477479
&config,
480+
&|| Ok(crates.lock().unwrap().pop()),
478481
);
479482
workspace.purge_all_build_dirs()?;
480483
res?;

src/config.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ fn default_false() -> bool {
4343
pub struct ServerConfig {
4444
pub bot_acl: BotACL,
4545
pub labels: ServerLabels,
46-
pub distributed: ChunkConfig,
4746
}
4847

4948
#[derive(Clone, Serialize, Deserialize)]
@@ -78,12 +77,6 @@ pub struct SandboxConfig {
7877
pub build_log_max_lines: usize,
7978
}
8079

81-
#[derive(Clone, Serialize, Deserialize)]
82-
#[serde(rename_all = "kebab-case")]
83-
pub struct ChunkConfig {
84-
pub chunk_size: i32,
85-
}
86-
8780
#[derive(Clone, Serialize, Deserialize)]
8881
#[serde(rename_all = "kebab-case")]
8982
pub struct Config {
@@ -138,10 +131,6 @@ impl Config {
138131
&self.demo_crates
139132
}
140133

141-
pub fn chunk_size(&self) -> i32 {
142-
self.server.distributed.chunk_size
143-
}
144-
145134
pub fn check(file: &Option<String>) -> Fallible<()> {
146135
if let Some(file) = file {
147136
Self::check_all(file.into())
@@ -269,7 +258,6 @@ impl Default for Config {
269258
experiment_queued: "".into(),
270259
experiment_completed: "".into(),
271260
},
272-
distributed: ChunkConfig { chunk_size: 1 },
273261
},
274262
}
275263
}
@@ -300,8 +288,6 @@ mod tests {
300288
"remove = \"\"\n",
301289
"experiment-queued = \"\"\n",
302290
"experiment-completed = \"\"\n",
303-
"[server.distributed]\n",
304-
"chunk-size = 32\n",
305291
"[demo-crates]\n",
306292
"crates = []\n",
307293
"github-repos = []\n",
@@ -338,7 +324,5 @@ mod tests {
338324
name: "cargo".into(),
339325
sha: None,
340326
})));
341-
342-
assert_eq!(list.chunk_size(), 32);
343327
}
344328
}

src/experiments.rs

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::config::Config;
21
use crate::crates::Crate;
32
use crate::db::{Database, QueryUtils};
43
use crate::prelude::*;
@@ -13,7 +12,6 @@ use std::str::FromStr;
1312
use url::Url;
1413

1514
//sqlite limit is ignored if the expression evaluates to a negative value
16-
static FULL_LIST: i32 = -1;
1715
static SQL_VARIABLE_LIMIT: usize = 500;
1816

1917
string_enum!(pub enum Status {
@@ -595,18 +593,12 @@ impl Experiment {
595593
.collect::<Fallible<Vec<Crate>>>()
596594
}
597595

598-
fn crate_list_size(&self, config: &Config) -> i32 {
599-
match self.assigned_to {
600-
//if experiment is distributed return chunk
601-
Some(Assignee::Distributed) => config.chunk_size(),
602-
//if experiment is assigned to specific agent return all the crates
603-
_ => FULL_LIST,
604-
}
605-
}
606-
607-
pub fn get_uncompleted_crates(&self, db: &Database, config: &Config) -> Fallible<Vec<Crate>> {
608-
let limit = self.crate_list_size(config);
609-
596+
pub fn get_uncompleted_crates(
597+
&self,
598+
db: &Database,
599+
limit: Option<u32>,
600+
) -> Fallible<Vec<Crate>> {
601+
let limit = limit.map(|l| l as i32).unwrap_or(-1);
610602
#[cfg(not(test))]
611603
const RUN_TIMEOUT: u32 = 20;
612604
#[cfg(test)]
@@ -999,12 +991,12 @@ mod tests {
999991
// Create a dummy experiment
1000992
CreateExperiment::dummy("dummy").apply(&ctx).unwrap();
1001993
let ex = Experiment::get(&db, "dummy").unwrap().unwrap();
1002-
let crates = ex.get_uncompleted_crates(&db, &config).unwrap();
994+
let crates = ex.get_uncompleted_crates(&db, None).unwrap();
1003995
// Assert the whole list is returned
1004996
assert_eq!(crates.len(), ex.get_crates(&db).unwrap().len());
1005997

1006998
// Test already completed crates does not show up again
1007-
let uncompleted_crates = ex.get_uncompleted_crates(&db, &config).unwrap();
999+
let uncompleted_crates = ex.get_uncompleted_crates(&db, None).unwrap();
10081000
assert_eq!(uncompleted_crates.len(), 0);
10091001
}
10101002

@@ -1022,10 +1014,10 @@ mod tests {
10221014
// Create a dummy experiment
10231015
CreateExperiment::dummy("dummy").apply(&ctx).unwrap();
10241016
let ex = Experiment::next(&db, &agent1).unwrap().unwrap().1;
1025-
assert!(!ex.get_uncompleted_crates(&db, &config).unwrap().is_empty());
1017+
assert!(!ex.get_uncompleted_crates(&db, None).unwrap().is_empty());
10261018
assert!(Experiment::next(&db, &agent1).unwrap().is_some());
10271019
std::thread::sleep(std::time::Duration::from_secs(80)); // need to wait for at least 60 seconds for timeout to fire
10281020
assert_eq!(ex.status, Status::Running);
1029-
assert!(!ex.get_uncompleted_crates(&db, &config).unwrap().is_empty());
1021+
assert!(!ex.get_uncompleted_crates(&db, None).unwrap().is_empty());
10301022
}
10311023
}

src/runner/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@ impl RunnerState {
4848
pub fn run_ex<DB: WriteResults + Sync>(
4949
ex: &Experiment,
5050
workspace: &Workspace,
51-
crates: &[Crate],
5251
db: &DB,
5352
threads_count: usize,
5453
config: &Config,
54+
next_crate: &(dyn Fn() -> Fallible<Option<Crate>> + Send + Sync),
5555
) -> Fallible<()> {
5656
// Attempt to spin indefinitely until docker is up. Ideally, we would
5757
// decomission this agent until docker is up, instead of leaving the
@@ -105,17 +105,16 @@ pub fn run_ex<DB: WriteResults + Sync>(
105105

106106
let state = RunnerState::new();
107107

108-
let crates = Mutex::new(crates.to_vec());
109108
let workers = (0..threads_count)
110109
.map(|i| {
111110
Worker::new(
112111
format!("worker-{}", i),
113112
workspace,
114113
ex,
115114
config,
116-
&crates,
117115
&state,
118116
db,
117+
next_crate,
119118
)
120119
})
121120
.collect::<Vec<_>>();

src/runner/worker.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::config::Config;
21
use crate::crates::Crate;
32
use crate::experiments::{Experiment, Mode};
43
use crate::prelude::*;
@@ -19,30 +18,30 @@ pub(super) struct Worker<'a, DB: WriteResults + Sync> {
1918
workspace: &'a Workspace,
2019
build_dir: Mutex<BuildDirectory>,
2120
ex: &'a Experiment,
22-
config: &'a Config,
23-
crates: &'a Mutex<Vec<Crate>>,
21+
config: &'a crate::config::Config,
2422
state: &'a RunnerState,
2523
db: &'a DB,
2624
target_dir_cleanup: AtomicBool,
25+
next_crate: &'a (dyn Fn() -> Fallible<Option<Crate>> + Send + Sync),
2726
}
2827

2928
impl<'a, DB: WriteResults + Sync> Worker<'a, DB> {
3029
pub(super) fn new(
3130
name: String,
3231
workspace: &'a Workspace,
3332
ex: &'a Experiment,
34-
config: &'a Config,
35-
crates: &'a Mutex<Vec<Crate>>,
33+
config: &'a crate::config::Config,
3634
state: &'a RunnerState,
3735
db: &'a DB,
36+
next_crate: &'a (dyn Fn() -> Fallible<Option<Crate>> + Send + Sync),
3837
) -> Self {
3938
Worker {
4039
build_dir: Mutex::new(workspace.build_dir(&name)),
4140
name,
4241
workspace,
4342
ex,
4443
config,
45-
crates,
44+
next_crate,
4645
state,
4746
db,
4847
target_dir_cleanup: AtomicBool::new(false),
@@ -92,7 +91,7 @@ impl<'a, DB: WriteResults + Sync> Worker<'a, DB> {
9291

9392
pub(super) fn run(&self) -> Fallible<()> {
9493
loop {
95-
let krate = if let Some(next) = self.crates.lock().unwrap().pop() {
94+
let krate = if let Some(next) = (self.next_crate)()? {
9695
next
9796
} else {
9897
// We're done if no more crates left.

src/server/agents.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ mod tests {
278278
let (_new, ex) = Experiment::next(&db, &Assignee::Agent("agent".to_string()))
279279
.unwrap()
280280
.unwrap();
281-
ex.get_uncompleted_crates(&db, &config).unwrap();
281+
ex.get_uncompleted_crates(&db, None).unwrap();
282282

283283
// After an experiment is assigned to the agent, the agent is working
284284
let agent = agents.get("agent").unwrap().unwrap();

src/server/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ mod tests {
245245
.apply(&ctx)
246246
.unwrap();
247247
let ex = Experiment::next(&db, &assignee).unwrap().unwrap().1;
248-
ex.get_uncompleted_crates(&db, &config).unwrap();
248+
ex.get_uncompleted_crates(&db, None).unwrap();
249249
METRICS.update_agent_status(&db, &agent_list_ref).unwrap();
250250

251251
// There are no experiments in the queue but agent1 is still executing the

0 commit comments

Comments
 (0)