Skip to content

Commit 62249a6

Browse files
committed
Move job task spawners to separate functions.
This helps keep the `run_server` function a little smaller, and makes the disabling code a little clearer.
1 parent 0184c17 commit 62249a6

File tree

1 file changed

+81
-57
lines changed

1 file changed

+81
-57
lines changed

src/main.rs

Lines changed: 81 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -239,34 +239,6 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
239239
.await
240240
.context("database migrations")?;
241241

242-
// spawning a background task that will schedule the jobs
243-
// every JOB_SCHEDULING_CADENCE_IN_SECS
244-
task::spawn(async move {
245-
loop {
246-
let res = task::spawn(async move {
247-
let pool = db::ClientPool::new();
248-
let mut interval =
249-
time::interval(time::Duration::from_secs(JOB_SCHEDULING_CADENCE_IN_SECS));
250-
251-
loop {
252-
interval.tick().await;
253-
db::schedule_jobs(&*pool.get().await, jobs())
254-
.await
255-
.context("database schedule jobs")
256-
.unwrap();
257-
}
258-
});
259-
260-
match res.await {
261-
Err(err) if err.is_panic() => {
262-
/* handle panic in above task, re-launching */
263-
tracing::trace!("schedule_jobs task died (error={})", err);
264-
}
265-
_ => unreachable!(),
266-
}
267-
}
268-
});
269-
270242
let client = Client::new();
271243
let gh = github::GithubClient::new_with_default_token(client.clone());
272244
let oc = octocrab::OctocrabBuilder::new()
@@ -280,35 +252,10 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
280252
octocrab: oc,
281253
});
282254

283-
// spawning a background task that will run the scheduled jobs
284-
// every JOB_PROCESSING_CADENCE_IN_SECS
285-
let ctx2 = ctx.clone();
286-
task::spawn(async move {
287-
loop {
288-
let ctx = ctx2.clone();
289-
let res = task::spawn(async move {
290-
let pool = db::ClientPool::new();
291-
let mut interval =
292-
time::interval(time::Duration::from_secs(JOB_PROCESSING_CADENCE_IN_SECS));
293-
294-
loop {
295-
interval.tick().await;
296-
db::run_scheduled_jobs(&ctx, &*pool.get().await)
297-
.await
298-
.context("run database scheduled jobs")
299-
.unwrap();
300-
}
301-
});
302-
303-
match res.await {
304-
Err(err) if err.is_panic() => {
305-
/* handle panic in above task, re-launching */
306-
tracing::trace!("run_scheduled_jobs task died (error={})", err);
307-
}
308-
_ => unreachable!(),
309-
}
310-
}
311-
});
255+
if !is_scheduled_jobs_disabled() {
256+
spawn_job_scheduler();
257+
spawn_job_runner(ctx.clone());
258+
}
312259

313260
let agenda = tower::ServiceBuilder::new()
314261
.buffer(10)
@@ -351,6 +298,83 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
351298
Ok(())
352299
}
353300

301+
/// Spawns a background tokio task which runs continuously to queue up jobs
302+
/// to be run by the job runner.
303+
///
304+
/// The scheduler wakes up every `JOB_SCHEDULING_CADENCE_IN_SECS` seconds to
305+
/// check if there are any jobs ready to run. Jobs get inserted into the the
306+
/// database which acts as a queue.
307+
fn spawn_job_scheduler() {
308+
task::spawn(async move {
309+
loop {
310+
let res = task::spawn(async move {
311+
let pool = db::ClientPool::new();
312+
let mut interval =
313+
time::interval(time::Duration::from_secs(JOB_SCHEDULING_CADENCE_IN_SECS));
314+
315+
loop {
316+
interval.tick().await;
317+
db::schedule_jobs(&*pool.get().await, jobs())
318+
.await
319+
.context("database schedule jobs")
320+
.unwrap();
321+
}
322+
});
323+
324+
match res.await {
325+
Err(err) if err.is_panic() => {
326+
/* handle panic in above task, re-launching */
327+
tracing::trace!("schedule_jobs task died (error={})", err);
328+
}
329+
_ => unreachable!(),
330+
}
331+
}
332+
});
333+
}
334+
335+
/// Spawns a background tokio task which runs continuously to run scheduled
336+
/// jobs.
337+
///
338+
/// The runner wakes up every `JOB_PROCESSING_CADENCE_IN_SECS` seconds to
339+
/// check if any jobs have been put into the queue by the scheduler. They
340+
/// will get popped off the queue and run if any are found.
341+
fn spawn_job_runner(ctx: Arc<Context>) {
342+
task::spawn(async move {
343+
loop {
344+
let ctx = ctx.clone();
345+
let res = task::spawn(async move {
346+
let pool = db::ClientPool::new();
347+
let mut interval =
348+
time::interval(time::Duration::from_secs(JOB_PROCESSING_CADENCE_IN_SECS));
349+
350+
loop {
351+
interval.tick().await;
352+
db::run_scheduled_jobs(&ctx, &*pool.get().await)
353+
.await
354+
.context("run database scheduled jobs")
355+
.unwrap();
356+
}
357+
});
358+
359+
match res.await {
360+
Err(err) if err.is_panic() => {
361+
/* handle panic in above task, re-launching */
362+
tracing::trace!("run_scheduled_jobs task died (error={})", err);
363+
}
364+
_ => unreachable!(),
365+
}
366+
}
367+
});
368+
}
369+
370+
/// Determines whether or not background scheduled jobs should be disabled for
371+
/// the purpose of testing.
372+
///
373+
/// This helps avoid having random jobs run while testing other things.
374+
fn is_scheduled_jobs_disabled() -> bool {
375+
env::var_os("TRIAGEBOT_TEST_DISABLE_JOBS").is_some()
376+
}
377+
354378
#[tokio::main(flavor = "current_thread")]
355379
async fn main() {
356380
dotenv::dotenv().ok();

0 commit comments

Comments
 (0)