Skip to content

Commit df11506

Browse files
Merge pull request #1742 from jackh726/scheduled_jobs
Some scheduling cleanup and automatic types planning stream opening
2 parents 0a68321 + b9d17bc commit df11506

File tree

11 files changed

+386
-89
lines changed

11 files changed

+386
-89
lines changed

src/db.rs

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
use crate::handlers::jobs::handle_job;
2-
use crate::{db::jobs::*, handlers::Context};
1+
use crate::{db::jobs::*, handlers::Context, jobs::jobs};
32
use anyhow::Context as _;
43
use chrono::Utc;
54
use native_tls::{Certificate, TlsConnector};
@@ -188,16 +187,32 @@ pub async fn schedule_jobs(db: &DbClient, jobs: Vec<JobSchedule>) -> anyhow::Res
188187
let mut upcoming = job.schedule.upcoming(Utc).take(1);
189188

190189
if let Some(scheduled_at) = upcoming.next() {
191-
if let Err(_) = get_job_by_name_and_scheduled_at(&db, &job.name, &scheduled_at).await {
192-
// mean there's no job already in the db with that name and scheduled_at
193-
insert_job(&db, &job.name, &scheduled_at, &job.metadata).await?;
194-
}
190+
schedule_job(db, job.name, job.metadata, scheduled_at).await?;
195191
}
196192
}
197193

198194
Ok(())
199195
}
200196

197+
pub async fn schedule_job(
198+
db: &DbClient,
199+
job_name: &str,
200+
job_metadata: serde_json::Value,
201+
when: chrono::DateTime<Utc>,
202+
) -> anyhow::Result<()> {
203+
let all_jobs = jobs();
204+
if !all_jobs.iter().any(|j| j.name() == job_name) {
205+
anyhow::bail!("Job {} does not exist in the current job list.", job_name);
206+
}
207+
208+
if let Err(_) = get_job_by_name_and_scheduled_at(&db, job_name, &when).await {
209+
// mean there's no job already in the db with that name and scheduled_at
210+
insert_job(&db, job_name, &when, &job_metadata).await?;
211+
}
212+
213+
Ok(())
214+
}
215+
201216
pub async fn run_scheduled_jobs(ctx: &Context, db: &DbClient) -> anyhow::Result<()> {
202217
let jobs = get_jobs_to_execute(&db).await.unwrap();
203218
tracing::trace!("jobs to execute: {:#?}", jobs);
@@ -220,6 +235,26 @@ pub async fn run_scheduled_jobs(ctx: &Context, db: &DbClient) -> anyhow::Result<
220235
Ok(())
221236
}
222237

238+
// Try to handle a specific job
239+
async fn handle_job(
240+
ctx: &Context,
241+
name: &String,
242+
metadata: &serde_json::Value,
243+
) -> anyhow::Result<()> {
244+
for job in jobs() {
245+
if &job.name() == &name {
246+
return job.run(ctx, metadata).await;
247+
}
248+
}
249+
tracing::trace!(
250+
"handle_job fell into default case: (name={:?}, metadata={:?})",
251+
name,
252+
metadata
253+
);
254+
255+
Ok(())
256+
}
257+
223258
static MIGRATIONS: &[&str] = &[
224259
"
225260
CREATE TABLE notifications (

src/db/jobs.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tokio_postgres::Client as DbClient;
77
use uuid::Uuid;
88

99
pub struct JobSchedule {
10-
pub name: String,
10+
pub name: &'static str,
1111
pub schedule: Schedule,
1212
pub metadata: serde_json::Value,
1313
}
@@ -24,7 +24,7 @@ pub struct Job {
2424

2525
pub async fn insert_job(
2626
db: &DbClient,
27-
name: &String,
27+
name: &str,
2828
scheduled_at: &DateTime<Utc>,
2929
metadata: &serde_json::Value,
3030
) -> Result<()> {
@@ -76,7 +76,7 @@ pub async fn update_job_executed_at(db: &DbClient, id: &Uuid) -> Result<()> {
7676

7777
pub async fn get_job_by_name_and_scheduled_at(
7878
db: &DbClient,
79-
name: &String,
79+
name: &str,
8080
scheduled_at: &DateTime<Utc>,
8181
) -> Result<Job> {
8282
tracing::trace!(

src/github.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,21 @@ impl Issue {
515515
.await?)
516516
}
517517

518+
// returns an array of one element
519+
pub async fn get_first100_comments(
520+
&self,
521+
client: &GithubClient,
522+
) -> anyhow::Result<Vec<Comment>> {
523+
let comment_url = format!(
524+
"{}/issues/{}/comments?page=1&per_page=100",
525+
self.repository().url(),
526+
self.number,
527+
);
528+
Ok(client
529+
.json::<Vec<Comment>>(client.get(&comment_url))
530+
.await?)
531+
}
532+
518533
pub async fn edit_body(&self, client: &GithubClient, body: &str) -> anyhow::Result<()> {
519534
let edit_url = format!("{}/issues/{}", self.repository().url(), self.number);
520535
#[derive(serde::Serialize)]

src/handlers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ mod close;
2929
pub mod docs_update;
3030
mod github_releases;
3131
mod glacier;
32-
pub mod jobs;
3332
mod major_change;
3433
mod mentions;
3534
mod milestone_prs;
@@ -46,6 +45,7 @@ mod review_submitted;
4645
mod rfc_helper;
4746
pub mod rustc_commits;
4847
mod shortcut;
48+
pub mod types_planning_updates;
4949

5050
pub async fn handle(ctx: &Context, event: &Event) -> Vec<HandlerError> {
5151
let config = config::get(&ctx.github, event.repo()).await;

src/handlers/docs_update.rs

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
//! A scheduled job to post a PR to update the documentation on rust-lang/rust.
22
3-
use crate::db::jobs::JobSchedule;
43
use crate::github::{self, GitTreeEntry, GithubClient, Issue, Repository};
4+
use crate::jobs::Job;
55
use anyhow::Context;
66
use anyhow::Result;
7-
use cron::Schedule;
7+
use async_trait::async_trait;
88
use reqwest::Client;
99
use std::fmt::Write;
10-
use std::str::FromStr;
1110

1211
/// This is the repository where the commits will be created.
1312
const WORK_REPO: &str = "rustbot/rust";
@@ -28,38 +27,42 @@ const SUBMODULES: &[&str] = &[
2827

2928
const TITLE: &str = "Update books";
3029

31-
pub fn job() -> JobSchedule {
32-
JobSchedule {
33-
name: "docs_update".to_string(),
34-
// Around 9am Pacific time on every Monday.
35-
schedule: Schedule::from_str("0 00 17 * * Mon *").unwrap(),
36-
metadata: serde_json::Value::Null,
37-
}
38-
}
30+
pub struct DocsUpdateJob;
3931

40-
pub async fn handle_job() -> Result<()> {
41-
// Only run every other week. Doing it every week can be a bit noisy, and
42-
// (rarely) a PR can take longer than a week to merge (like if there are
43-
// CI issues). `Schedule` does not allow expressing this, so check it
44-
// manually.
45-
//
46-
// This is set to run the first week after a release, and the week just
47-
// before a release. That allows getting the latest changes in the next
48-
// release, accounting for possibly taking a few days for the PR to land.
49-
let today = chrono::Utc::today().naive_utc();
50-
let base = chrono::naive::NaiveDate::from_ymd(2015, 12, 10);
51-
let duration = today.signed_duration_since(base);
52-
let weeks = duration.num_weeks();
53-
if weeks % 2 != 0 {
54-
tracing::trace!("skipping job, this is an odd week");
55-
return Ok(());
32+
#[async_trait]
33+
impl Job for DocsUpdateJob {
34+
fn name(&self) -> &'static str {
35+
"docs_update"
5636
}
5737

58-
tracing::trace!("starting docs-update");
59-
docs_update()
60-
.await
61-
.context("failed to process docs update")?;
62-
Ok(())
38+
async fn run(
39+
&self,
40+
_ctx: &super::Context,
41+
_metadata: &serde_json::Value,
42+
) -> anyhow::Result<()> {
43+
// Only run every other week. Doing it every week can be a bit noisy, and
44+
// (rarely) a PR can take longer than a week to merge (like if there are
45+
// CI issues). `Schedule` does not allow expressing this, so check it
46+
// manually.
47+
//
48+
// This is set to run the first week after a release, and the week just
49+
// before a release. That allows getting the latest changes in the next
50+
// release, accounting for possibly taking a few days for the PR to land.
51+
let today = chrono::Utc::today().naive_utc();
52+
let base = chrono::naive::NaiveDate::from_ymd(2015, 12, 10);
53+
let duration = today.signed_duration_since(base);
54+
let weeks = duration.num_weeks();
55+
if weeks % 2 != 0 {
56+
tracing::trace!("skipping job, this is an odd week");
57+
return Ok(());
58+
}
59+
60+
tracing::trace!("starting docs-update");
61+
docs_update()
62+
.await
63+
.context("failed to process docs update")?;
64+
Ok(())
65+
}
6366
}
6467

6568
pub async fn docs_update() -> Result<Option<Issue>> {

src/handlers/jobs.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,20 @@ use super::Context;
88

99
pub async fn handle_job(
1010
ctx: &Context,
11-
name: &String,
11+
name: &str,
1212
metadata: &serde_json::Value,
1313
) -> anyhow::Result<()> {
14-
match name.as_str() {
14+
match name {
1515
"docs_update" => super::docs_update::handle_job().await,
1616
"rustc_commits" => {
1717
super::rustc_commits::synchronize_commits_inner(ctx, None).await;
1818
Ok(())
1919
}
20-
_ => default(&name, &metadata),
20+
_ => default(name, &metadata),
2121
}
2222
}
2323

24-
fn default(name: &String, metadata: &serde_json::Value) -> anyhow::Result<()> {
24+
fn default(name: &str, metadata: &serde_json::Value) -> anyhow::Result<()> {
2525
tracing::trace!(
2626
"handle_job fell into default case: (name={:?}, metadata={:?})",
2727
name,

src/handlers/rustc_commits.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
1-
use crate::db::jobs::JobSchedule;
21
use crate::db::rustc_commits;
32
use crate::db::rustc_commits::get_missing_commits;
3+
use crate::jobs::Job;
44
use crate::{
55
github::{self, Event},
66
handlers::Context,
77
};
8-
use cron::Schedule;
8+
use async_trait::async_trait;
99
use std::collections::VecDeque;
1010
use std::convert::TryInto;
11-
use std::str::FromStr;
1211
use tracing as log;
1312

1413
const BORS_GH_ID: i64 = 3372342;
@@ -153,12 +152,17 @@ pub async fn synchronize_commits_inner(ctx: &Context, starter: Option<(String, u
153152
}
154153
}
155154

156-
pub fn job() -> JobSchedule {
157-
JobSchedule {
158-
name: "rustc_commits".to_string(),
159-
// Every 30 minutes...
160-
schedule: Schedule::from_str("* 0,30 * * * * *").unwrap(),
161-
metadata: serde_json::Value::Null,
155+
pub struct RustcCommitsJob;
156+
157+
#[async_trait]
158+
impl Job for RustcCommitsJob {
159+
fn name(&self) -> &'static str {
160+
"rustc_commits"
161+
}
162+
163+
async fn run(&self, ctx: &super::Context, _metadata: &serde_json::Value) -> anyhow::Result<()> {
164+
synchronize_commits_inner(ctx, None).await;
165+
Ok(())
162166
}
163167
}
164168

0 commit comments

Comments
 (0)