Skip to content

Commit fad3d1f

Browse files
Merge pull request #1658 from mcass19/feature/scheduled-jobs
[FEATURE] Scheduled Jobs
2 parents d7c405b + f4bf721 commit fad3d1f

File tree

9 files changed

+377
-6
lines changed

9 files changed

+377
-6
lines changed

Cargo.lock

Lines changed: 47 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@ hyper = { version = "0.14.4", features = ["server", "stream"]}
2323
tokio = { version = "1.7.1", features = ["macros", "time", "rt"] }
2424
futures = { version = "0.3", default-features = false, features = ["std"] }
2525
async-trait = "0.1.31"
26-
uuid = { version = "0.8", features = ["v4"] }
26+
uuid = { version = "0.8", features = ["v4", "serde"] }
2727
tracing = "0.1"
2828
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
2929
url = "2.1.0"
3030
once_cell = "1"
3131
chrono = { version = "0.4", features = ["serde"] }
32-
tokio-postgres = { version = "0.7.2", features = ["with-chrono-0_4", "with-serde_json-1"] }
32+
tokio-postgres = { version = "0.7.2", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-0_8"] }
3333
postgres-native-tls = "0.5.0"
3434
native-tls = "0.2"
3535
serde_path_to_error = "0.1.2"
@@ -42,6 +42,8 @@ tower = { version = "0.4.13", features = ["util", "limit", "buffer", "load-shed"
4242
github-graphql = { path = "github-graphql" }
4343
rand = "0.8.5"
4444
ignore = "0.4.18"
45+
postgres-types = { version = "0.2.4", features = ["derive"] }
46+
cron = { version = "0.12.0" }
4547

4648
[dependencies.serde]
4749
version = "1"

src/db.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1+
use crate::db::jobs::*;
2+
use crate::handlers::jobs::handle_job;
13
use anyhow::Context as _;
4+
use chrono::Utc;
25
use native_tls::{Certificate, TlsConnector};
36
use postgres_native_tls::MakeTlsConnector;
47
use std::sync::{Arc, Mutex};
58
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
69
use tokio_postgres::Client as DbClient;
710

811
pub mod issue_data;
12+
pub mod jobs;
913
pub mod notifications;
1014
pub mod rustc_commits;
1115

@@ -179,6 +183,43 @@ pub async fn run_migrations(client: &DbClient) -> anyhow::Result<()> {
179183
Ok(())
180184
}
181185

186+
pub async fn schedule_jobs(db: &DbClient, jobs: Vec<JobSchedule>) -> anyhow::Result<()> {
187+
for job in jobs {
188+
let mut upcoming = job.schedule.upcoming(Utc).take(1);
189+
190+
if let Some(scheduled_at) = upcoming.next() {
191+
if let Err(_) = get_job_by_name_and_scheduled_at(&db, &job.name, &scheduled_at).await {
192+
// mean there's no job already in the db with that name and scheduled_at
193+
insert_job(&db, &job.name, &scheduled_at, &job.metadata).await?;
194+
}
195+
}
196+
}
197+
198+
Ok(())
199+
}
200+
201+
pub async fn run_scheduled_jobs(db: &DbClient) -> anyhow::Result<()> {
202+
let jobs = get_jobs_to_execute(&db).await.unwrap();
203+
tracing::trace!("jobs to execute: {:#?}", jobs);
204+
205+
for job in jobs.iter() {
206+
update_job_executed_at(&db, &job.id).await?;
207+
208+
match handle_job(&job.name, &job.metadata).await {
209+
Ok(_) => {
210+
tracing::trace!("job successfully executed (id={})", job.id);
211+
delete_job(&db, &job.id).await?;
212+
}
213+
Err(e) => {
214+
tracing::trace!("job failed on execution (id={:?}, error={:?})", job.id, e);
215+
update_job_error_message(&db, &job.id, &e.to_string()).await?;
216+
}
217+
}
218+
}
219+
220+
Ok(())
221+
}
222+
182223
static MIGRATIONS: &[&str] = &[
183224
"
184225
CREATE TABLE notifications (
@@ -215,5 +256,21 @@ CREATE TABLE issue_data (
215256
data JSONB,
216257
PRIMARY KEY (repo, issue_number, key)
217258
);
259+
",
260+
"
261+
CREATE TABLE jobs (
262+
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
263+
name TEXT NOT NULL,
264+
scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL,
265+
metadata JSONB,
266+
executed_at TIMESTAMP WITH TIME ZONE,
267+
error_message TEXT
268+
);
269+
",
270+
"
271+
CREATE UNIQUE INDEX jobs_name_scheduled_at_unique_index
272+
ON jobs (
273+
name, scheduled_at
274+
);
218275
",
219276
];

src/db/jobs.rs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
//! The `jobs` table provides a way to have scheduled jobs
2+
use anyhow::{Context as _, Result};
3+
use chrono::{DateTime, Utc};
4+
use cron::Schedule;
5+
use serde::{Deserialize, Serialize};
6+
use tokio_postgres::Client as DbClient;
7+
use uuid::Uuid;
8+
9+
pub struct JobSchedule {
10+
pub name: String,
11+
pub schedule: Schedule,
12+
pub metadata: serde_json::Value,
13+
}
14+
15+
#[derive(Serialize, Deserialize, Debug)]
16+
pub struct Job {
17+
pub id: Uuid,
18+
pub name: String,
19+
pub scheduled_at: DateTime<Utc>,
20+
pub metadata: serde_json::Value,
21+
pub executed_at: Option<DateTime<Utc>>,
22+
pub error_message: Option<String>,
23+
}
24+
25+
pub async fn insert_job(
26+
db: &DbClient,
27+
name: &String,
28+
scheduled_at: &DateTime<Utc>,
29+
metadata: &serde_json::Value,
30+
) -> Result<()> {
31+
tracing::trace!("insert_job(name={})", name);
32+
33+
db.execute(
34+
"INSERT INTO jobs (name, scheduled_at, metadata) VALUES ($1, $2, $3)
35+
ON CONFLICT (name, scheduled_at) DO UPDATE SET metadata = EXCLUDED.metadata",
36+
&[&name, &scheduled_at, &metadata],
37+
)
38+
.await
39+
.context("Inserting job")?;
40+
41+
Ok(())
42+
}
43+
44+
pub async fn delete_job(db: &DbClient, id: &Uuid) -> Result<()> {
45+
tracing::trace!("delete_job(id={})", id);
46+
47+
db.execute("DELETE FROM jobs WHERE id = $1", &[&id])
48+
.await
49+
.context("Deleting job")?;
50+
51+
Ok(())
52+
}
53+
54+
pub async fn update_job_error_message(db: &DbClient, id: &Uuid, message: &String) -> Result<()> {
55+
tracing::trace!("update_job_error_message(id={})", id);
56+
57+
db.execute(
58+
"UPDATE jobs SET error_message = $2 WHERE id = $1",
59+
&[&id, &message],
60+
)
61+
.await
62+
.context("Updating job error message")?;
63+
64+
Ok(())
65+
}
66+
67+
pub async fn update_job_executed_at(db: &DbClient, id: &Uuid) -> Result<()> {
68+
tracing::trace!("update_job_executed_at(id={})", id);
69+
70+
db.execute("UPDATE jobs SET executed_at = now() WHERE id = $1", &[&id])
71+
.await
72+
.context("Updating job executed at")?;
73+
74+
Ok(())
75+
}
76+
77+
pub async fn get_job_by_name_and_scheduled_at(
78+
db: &DbClient,
79+
name: &String,
80+
scheduled_at: &DateTime<Utc>,
81+
) -> Result<Job> {
82+
tracing::trace!(
83+
"get_job_by_name_and_scheduled_at(name={}, scheduled_at={})",
84+
name,
85+
scheduled_at
86+
);
87+
88+
let job = db
89+
.query_one(
90+
"SELECT * FROM jobs WHERE name = $1 AND scheduled_at = $2",
91+
&[&name, &scheduled_at],
92+
)
93+
.await
94+
.context("Select job by name and scheduled at")?;
95+
96+
deserialize_job(&job)
97+
}
98+
99+
// Selects all jobs with:
100+
// - scheduled_at in the past
101+
// - error_message is null or executed_at is at least 60 minutes ago (intended to make repeat executions rare enough)
102+
pub async fn get_jobs_to_execute(db: &DbClient) -> Result<Vec<Job>> {
103+
let jobs = db
104+
.query(
105+
"
106+
SELECT * FROM jobs WHERE scheduled_at <= now() AND (error_message IS NULL OR executed_at <= now() - INTERVAL '60 minutes')",
107+
&[],
108+
)
109+
.await
110+
.context("Getting jobs data")?;
111+
112+
let mut data = Vec::with_capacity(jobs.len());
113+
for job in jobs {
114+
let serialized_job = deserialize_job(&job);
115+
data.push(serialized_job.unwrap());
116+
}
117+
118+
Ok(data)
119+
}
120+
121+
fn deserialize_job(row: &tokio_postgres::row::Row) -> Result<Job> {
122+
let id: Uuid = row.try_get(0)?;
123+
let name: String = row.try_get(1)?;
124+
let scheduled_at: DateTime<Utc> = row.try_get(2)?;
125+
let metadata: serde_json::Value = row.try_get(3)?;
126+
let executed_at: Option<DateTime<Utc>> = row.try_get(4)?;
127+
let error_message: Option<String> = row.try_get(5)?;
128+
129+
Ok(Job {
130+
id,
131+
name,
132+
scheduled_at,
133+
metadata,
134+
executed_at,
135+
error_message,
136+
})
137+
}

src/handlers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ mod autolabel;
2828
mod close;
2929
mod github_releases;
3030
mod glacier;
31+
pub mod jobs;
3132
mod major_change;
3233
mod mentions;
3334
mod milestone_prs;

src/handlers/jobs.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Function to match the scheduled job function with its corresponding handler.
2+
// In case you want to add a new one, just add a new clause to the match with
3+
// the job name and the corresponding function.
4+
5+
// Further info could be find in src/jobs.rs
6+
7+
pub async fn handle_job(name: &String, metadata: &serde_json::Value) -> anyhow::Result<()> {
8+
match name {
9+
_ => default(&name, &metadata),
10+
}
11+
}
12+
13+
fn default(name: &String, metadata: &serde_json::Value) -> anyhow::Result<()> {
14+
tracing::trace!(
15+
"handle_job fell into default case: (name={:?}, metadata={:?})",
16+
name,
17+
metadata
18+
);
19+
20+
Ok(())
21+
}

0 commit comments

Comments
 (0)