Skip to content

Commit a7379ab

Browse files
author
Mauricio Cassola
committed
Use cron crate in separate loop for job scheduling
1 parent c32c486 commit a7379ab

File tree

8 files changed

+185
-52
lines changed

8 files changed

+185
-52
lines changed

Cargo.lock

Lines changed: 28 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ github-graphql = { path = "github-graphql" }
4343
rand = "0.8.5"
4444
ignore = "0.4.18"
4545
postgres-types = { version = "0.2.4", features = ["derive"] }
46+
cron = { version = "0.12.0" }
4647

4748
[dependencies.serde]
4849
version = "1"

src/db.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::db::jobs::*;
22
use crate::handlers::jobs::handle_job;
33
use anyhow::Context as _;
4+
use chrono::Utc;
45
use native_tls::{Certificate, TlsConnector};
56
use postgres_native_tls::MakeTlsConnector;
67
use std::sync::{Arc, Mutex};
@@ -182,6 +183,21 @@ pub async fn run_migrations(client: &DbClient) -> anyhow::Result<()> {
182183
Ok(())
183184
}
184185

186+
pub async fn schedule_jobs(db: &DbClient, jobs: Vec<JobSchedule>) -> anyhow::Result<()> {
187+
for job in jobs {
188+
let mut upcoming = job.schedule.upcoming(Utc).take(1);
189+
190+
if let Some(scheduled_at) = upcoming.next() {
191+
if let Err(_) = get_job_by_name_and_scheduled_at(&db, &job.name, &scheduled_at).await {
192+
// mean there's no job already in the db with that name and scheduled_at
193+
insert_job(&db, &job.name, &scheduled_at, &job.metadata).await?;
194+
}
195+
}
196+
}
197+
198+
Ok(())
199+
}
200+
185201
pub async fn run_scheduled_jobs(db: &DbClient) -> anyhow::Result<()> {
186202
let jobs = get_jobs_to_execute(&db).await.unwrap();
187203
tracing::trace!("jobs to execute: {:#?}", jobs);
@@ -192,12 +208,10 @@ pub async fn run_scheduled_jobs(db: &DbClient) -> anyhow::Result<()> {
192208
match handle_job(&job.name, &job.metadata).await {
193209
Ok(_) => {
194210
tracing::trace!("job succesfully executed (id={})", job.id);
195-
196211
delete_job(&db, &job.id).await?;
197212
}
198213
Err(e) => {
199214
tracing::trace!("job failed on execution (id={:?}, error={:?})", job.id, e);
200-
201215
update_job_error_message(&db, &job.id, &e.to_string()).await?;
202216
}
203217
}

src/db/jobs.rs

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,37 @@
11
//! The `jobs` table provides a way to have scheduled jobs
22
use anyhow::{Context as _, Result};
3-
use chrono::{DateTime, FixedOffset};
3+
use chrono::{DateTime, Utc};
4+
use cron::Schedule;
45
use serde::{Deserialize, Serialize};
56
use tokio_postgres::Client as DbClient;
67
use uuid::Uuid;
78

9+
pub struct JobSchedule {
10+
pub name: String,
11+
pub schedule: Schedule,
12+
pub metadata: serde_json::Value,
13+
}
14+
815
#[derive(Serialize, Deserialize, Debug)]
916
pub struct Job {
1017
pub id: Uuid,
1118
pub name: String,
12-
pub scheduled_at: DateTime<FixedOffset>,
19+
pub scheduled_at: DateTime<Utc>,
1320
pub metadata: serde_json::Value,
14-
pub executed_at: Option<DateTime<FixedOffset>>,
21+
pub executed_at: Option<DateTime<Utc>>,
1522
pub error_message: Option<String>,
1623
}
1724

1825
pub async fn insert_job(
1926
db: &DbClient,
2027
name: &String,
21-
scheduled_at: &DateTime<FixedOffset>,
28+
scheduled_at: &DateTime<Utc>,
2229
metadata: &serde_json::Value,
2330
) -> Result<()> {
2431
tracing::trace!("insert_job(name={})", name);
2532

2633
db.execute(
27-
"INSERT INTO jobs (name, scheduled_at, metadata) VALUES ($1, $2, $3, $4, $5)
34+
"INSERT INTO jobs (name, scheduled_at, metadata) VALUES ($1, $2, $3)
2835
ON CONFLICT (name, scheduled_at) DO UPDATE SET metadata = EXCLUDED.metadata",
2936
&[&name, &scheduled_at, &metadata],
3037
)
@@ -67,6 +74,28 @@ pub async fn update_job_executed_at(db: &DbClient, id: &Uuid) -> Result<()> {
6774
Ok(())
6875
}
6976

77+
pub async fn get_job_by_name_and_scheduled_at(
78+
db: &DbClient,
79+
name: &String,
80+
scheduled_at: &DateTime<Utc>,
81+
) -> Result<Job> {
82+
tracing::trace!(
83+
"get_job_by_name_and_scheduled_at(name={}, scheduled_at={})",
84+
name,
85+
scheduled_at
86+
);
87+
88+
let job = db
89+
.query_one(
90+
"SELECT * FROM jobs WHERE name = $1 AND scheduled_at = $2",
91+
&[&name, &scheduled_at],
92+
)
93+
.await
94+
.context("Select job by name and scheduled at")?;
95+
96+
serialize_job(&job)
97+
}
98+
7099
// Selects all jobs with:
71100
// - scheduled_at in the past
72101
// - error_message is null or executed_at is at least 60 minutes ago (intended to make repeat executions rare enough)
@@ -82,22 +111,27 @@ pub async fn get_jobs_to_execute(db: &DbClient) -> Result<Vec<Job>> {
82111

83112
let mut data = Vec::with_capacity(jobs.len());
84113
for job in jobs {
85-
let id: Uuid = job.get(0);
86-
let name: String = job.get(1);
87-
let scheduled_at: DateTime<FixedOffset> = job.get(2);
88-
let metadata: serde_json::Value = job.get(5);
89-
let executed_at: Option<DateTime<FixedOffset>> = job.get(6);
90-
let error_message: Option<String> = job.get(7);
91-
92-
data.push(Job {
93-
id,
94-
name,
95-
scheduled_at,
96-
metadata,
97-
executed_at,
98-
error_message,
99-
});
114+
let serialized_job = serialize_job(&job);
115+
data.push(serialized_job.unwrap());
100116
}
101117

102118
Ok(data)
103119
}
120+
121+
fn serialize_job(row: &tokio_postgres::row::Row) -> Result<Job> {
122+
let id: Uuid = row.try_get(0)?;
123+
let name: String = row.try_get(1)?;
124+
let scheduled_at: DateTime<Utc> = row.try_get(2)?;
125+
let metadata: serde_json::Value = row.try_get(3)?;
126+
let executed_at: Option<DateTime<Utc>> = row.try_get(4)?;
127+
let error_message: Option<String> = row.try_get(5)?;
128+
129+
Ok(Job {
130+
id,
131+
name,
132+
scheduled_at,
133+
metadata,
134+
executed_at,
135+
error_message,
136+
})
137+
}

src/handlers/jobs.rs

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,7 @@
22
// In case you want to add a new one, just add a new clause to the match with
33
// the job name and the corresponding function.
44

5-
// The metadata is a serde_json::Value
6-
// Please refer to https://docs.rs/serde_json/latest/serde_json/value/fn.from_value.html
7-
// on how to interpret it as an instance of type T, implementing Serialize/Deserialize.
8-
9-
// For example, if we want to sends a Zulip message every Friday at 11:30am ET into #t-release
10-
// with a @T-release meeting! content, we should create some Job like:
11-
//
12-
// #[derive(Serialize, Deserialize)]
13-
// struct ZulipMetadata {
14-
// pub message: String
15-
// }
16-
//
17-
// let metadata = serde_json::value::to_value(ZulipMetadata {
18-
// message: "@T-release meeting!".to_string()
19-
// }).unwrap();
20-
//
21-
// Job {
22-
// name: "send_zulip_message",
23-
// scheduled_at: "2022-09-30T11:30:00+10:00",
24-
// metadata: metadata
25-
// }
26-
//
27-
// ... and add the corresponding "send_zulip_message" handler.
5+
// Further info could be find in src/jobs.rs
286

297
pub async fn handle_job(name: &String, metadata: &serde_json::Value) -> anyhow::Result<()> {
308
match name {

src/jobs.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
//! SCHEDULED JOBS
2+
//!
3+
//! The metadata is a serde_json::Value
4+
//! Please refer to https://docs.rs/serde_json/latest/serde_json/value/fn.from_value.html
5+
//! on how to interpret it as an instance of type T, implementing Serialize/Deserialize.
6+
//!
7+
//! The schedule is a cron::Schedule
8+
//! Please refer to https://docs.rs/cron/latest/cron/struct.Schedule.html for further info
9+
//!
10+
//! For example, if we want to sends a Zulip message every Friday at 11:30am ET into #t-release
11+
//! with a @T-release meeting! content, we should create some JobSchedule like:
12+
//!
13+
//! #[derive(Serialize, Deserialize)]
14+
//! struct ZulipMetadata {
15+
//! pub message: String
16+
//! }
17+
//!
18+
//! let metadata = serde_json::value::to_value(ZulipMetadata {
19+
//! message: "@T-release meeting!".to_string()
20+
//! }).unwrap();
21+
//!
22+
//! let schedule = Schedule::from_str("0 30 11 * * FRI *").unwrap();
23+
//!
24+
//! let new_job = JobSchedule {
25+
//! name: "send_zulip_message".to_owned(),
26+
//! schedule: schedule,
27+
//! metadata: metadata
28+
//! }
29+
//!
30+
//! and include it in the below vector in jobs():
31+
//!
32+
//! jobs.push(new_job);
33+
//!
34+
//! ... fianlly, add the corresponding "send_zulip_message" handler in src/handlers/jobs.rs
35+
36+
use crate::db::jobs::JobSchedule;
37+
38+
// Cadence in seconds with which the jobs will be scheduled
39+
pub const JOB_SCHEDULING_CADENCE_IN_SECS: u64 = 1800;
40+
41+
// Cadence in seconds with which the jobs will be processed
42+
pub const JOB_PROCESSING_CADENCE_IN_SECS: u64 = 60;
43+
44+
pub fn jobs() -> Vec<JobSchedule> {
45+
// Add to this vector any new cron task you want (as explained above)
46+
let jobs: Vec<JobSchedule> = Vec::new();
47+
48+
jobs
49+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub mod github;
2020
pub mod handlers;
2121
pub mod http_client;
2222
pub mod interactions;
23+
pub mod jobs;
2324
pub mod notification_listing;
2425
pub mod payload;
2526
pub mod rfcbot;

src/main.rs

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,14 @@ use futures::StreamExt;
66
use hyper::{header, Body, Request, Response, Server, StatusCode};
77
use reqwest::Client;
88
use route_recognizer::Router;
9-
use std::{env, net::SocketAddr, sync::Arc, time::Duration};
10-
use tokio::{task, time::sleep};
9+
use std::{env, net::SocketAddr, sync::Arc};
10+
use tokio::{task, time};
1111
use tower::{Service, ServiceExt};
1212
use tracing as log;
1313
use tracing::Instrument;
14+
use triagebot::jobs::{jobs, JOB_PROCESSING_CADENCE_IN_SECS, JOB_SCHEDULING_CADENCE_IN_SECS};
1415
use triagebot::{db, github, handlers::Context, notification_listing, payload, EventName};
1516

16-
const JOB_PROCESSING_CADENCE_IN_SECS: u64 = 60;
17-
1817
async fn handle_agenda_request(req: String) -> anyhow::Result<String> {
1918
if req == "/agenda/lang/triage" {
2019
return triagebot::agenda::lang().call().await;
@@ -240,20 +239,49 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
240239
.await
241240
.context("database migrations")?;
242241

242+
// spawning a background task that will schedule the jobs
243+
// every JOB_SCHEDULING_CADENCE_IN_SECS
244+
task::spawn(async move {
245+
loop {
246+
let res = task::spawn(async move {
247+
let pool = db::ClientPool::new();
248+
let mut interval =
249+
time::interval(time::Duration::from_secs(JOB_SCHEDULING_CADENCE_IN_SECS));
250+
251+
loop {
252+
interval.tick().await;
253+
db::schedule_jobs(&*pool.get().await, jobs())
254+
.await
255+
.context("database schedule jobs")
256+
.unwrap();
257+
}
258+
});
259+
260+
match res.await {
261+
Err(err) if err.is_panic() => {
262+
/* handle panic in above task, re-launching */
263+
tracing::trace!("schedule_jobs task died (error={})", err);
264+
}
265+
_ => unreachable!(),
266+
}
267+
}
268+
});
269+
243270
// spawning a background task that will run the scheduled jobs
244271
// every JOB_PROCESSING_CADENCE_IN_SECS
245272
task::spawn(async move {
246273
loop {
247274
let res = task::spawn(async move {
248275
let pool = db::ClientPool::new();
276+
let mut interval =
277+
time::interval(time::Duration::from_secs(JOB_PROCESSING_CADENCE_IN_SECS));
249278

250279
loop {
280+
interval.tick().await;
251281
db::run_scheduled_jobs(&*pool.get().await)
252282
.await
253283
.context("run database scheduled jobs")
254284
.unwrap();
255-
256-
sleep(Duration::from_secs(JOB_PROCESSING_CADENCE_IN_SECS)).await;
257285
}
258286
});
259287

@@ -262,7 +290,7 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
262290
/* handle panic in above task, re-launching */
263291
tracing::trace!("run_scheduled_jobs task died (error={})", err);
264292
}
265-
_ => unreachable!()
293+
_ => unreachable!(),
266294
}
267295
}
268296
});

0 commit comments

Comments
 (0)