Skip to content

Commit f96b297

Browse files
author
Mauricio Cassola
committed
Run format
1 parent 394851a commit f96b297

File tree

5 files changed

+61
-50
lines changed

5 files changed

+61
-50
lines changed

src/db.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1+
use crate::db::jobs::*;
2+
use crate::handlers::jobs::handle_job;
13
use anyhow::Context as _;
24
use native_tls::{Certificate, TlsConnector};
35
use postgres_native_tls::MakeTlsConnector;
46
use std::sync::{Arc, Mutex};
57
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
68
use tokio_postgres::Client as DbClient;
7-
use crate::db::jobs::*;
8-
use crate::handlers::jobs::handle_job;
99

10-
pub mod jobs;
1110
pub mod issue_data;
11+
pub mod jobs;
1212
pub mod notifications;
1313
pub mod rustc_commits;
1414

@@ -196,22 +196,31 @@ pub async fn run_scheduled_jobs(db: &DbClient) -> anyhow::Result<()> {
196196
tracing::trace!("job succesfully executed (id={})", job.id);
197197

198198
if let Some(frequency) = job.frequency {
199-
let duration = get_duration_from_cron(frequency, job.frequency_unit.as_ref().unwrap());
199+
let duration =
200+
get_duration_from_cron(frequency, job.frequency_unit.as_ref().unwrap());
200201
let new_expected_time = job.expected_time.checked_add_signed(duration).unwrap();
201202

202-
insert_job(&db, &job.name, &new_expected_time, &Some(frequency), &job.frequency_unit, &job.metadata).await?;
203+
insert_job(
204+
&db,
205+
&job.name,
206+
&new_expected_time,
207+
&Some(frequency),
208+
&job.frequency_unit,
209+
&job.metadata,
210+
)
211+
.await?;
203212
println!("job succesfully reinserted (name={})", job.name);
204213
tracing::trace!("job succesfully reinserted (name={})", job.name);
205214
}
206215

207216
delete_job(&db, &job.id).await?;
208-
},
217+
}
209218
Err(e) => {
210219
println!("job failed on execution (id={:?}, error={:?})", job.id, e);
211220
tracing::trace!("job failed on execution (id={:?}, error={:?})", job.id, e);
212221

213222
update_job_error_message(&db, &job.id, &e.to_string()).await?;
214-
},
223+
}
215224
}
216225
}
217226

@@ -255,7 +264,7 @@ CREATE TABLE issue_data (
255264
PRIMARY KEY (repo, issue_number, key)
256265
);
257266
",
258-
"
267+
"
259268
CREATE TYPE frequency_unit AS ENUM ('days', 'hours', 'minutes', 'seconds');
260269
",
261270
"
@@ -275,5 +284,5 @@ CREATE UNIQUE INDEX jobs_name_expected_time_unique_index
275284
ON jobs (
276285
name, expected_time
277286
);
278-
"
287+
",
279288
];

src/db/jobs.rs

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
//! The `jobs` table provides a way to have scheduled jobs
2-
use anyhow::{Result, Context as _};
3-
use chrono::{DateTime, FixedOffset, Duration};
4-
use tokio_postgres::{Client as DbClient};
5-
use uuid::Uuid;
2+
use anyhow::{Context as _, Result};
3+
use chrono::{DateTime, Duration, FixedOffset};
4+
use postgres_types::{FromSql, ToSql};
65
use serde::{Deserialize, Serialize};
7-
use postgres_types::{ToSql, FromSql};
6+
use tokio_postgres::Client as DbClient;
7+
use uuid::Uuid;
88

99
#[derive(Serialize, Deserialize, Debug)]
1010
pub struct Job {
@@ -32,15 +32,15 @@ pub enum FrequencyUnit {
3232
}
3333

3434
pub async fn insert_job(
35-
db: &DbClient,
35+
db: &DbClient,
3636
name: &String,
3737
expected_time: &DateTime<FixedOffset>,
3838
frequency: &Option<i32>,
3939
frequency_unit: &Option<FrequencyUnit>,
40-
metadata: &serde_json::Value
40+
metadata: &serde_json::Value,
4141
) -> Result<()> {
4242
tracing::trace!("insert_job(name={})", name);
43-
43+
4444
db.execute(
4545
"INSERT INTO jobs (name, expected_time, frequency, frequency_unit, metadata) VALUES ($1, $2, $3, $4, $5)
4646
ON CONFLICT (name, expected_time) DO UPDATE SET metadata = EXCLUDED.metadata",
@@ -54,20 +54,17 @@ pub async fn insert_job(
5454

5555
pub async fn delete_job(db: &DbClient, id: &Uuid) -> Result<()> {
5656
tracing::trace!("delete_job(id={})", id);
57-
58-
db.execute(
59-
"DELETE FROM jobs WHERE id = $1",
60-
&[&id],
61-
)
62-
.await
63-
.context("Deleting job")?;
57+
58+
db.execute("DELETE FROM jobs WHERE id = $1", &[&id])
59+
.await
60+
.context("Deleting job")?;
6461

6562
Ok(())
6663
}
6764

6865
pub async fn update_job_error_message(db: &DbClient, id: &Uuid, message: &String) -> Result<()> {
6966
tracing::trace!("update_job_error_message(id={})", id);
70-
67+
7168
db.execute(
7269
"UPDATE jobs SET error_message = $2 WHERE id = $1",
7370
&[&id, &message],
@@ -80,21 +77,18 @@ pub async fn update_job_error_message(db: &DbClient, id: &Uuid, message: &String
8077

8178
pub async fn update_job_executed_at(db: &DbClient, id: &Uuid) -> Result<()> {
8279
tracing::trace!("update_job_executed_at(id={})", id);
83-
84-
db.execute(
85-
"UPDATE jobs SET executed_at = now() WHERE id = $1",
86-
&[&id],
87-
)
88-
.await
89-
.context("Updating job executed at")?;
80+
81+
db.execute("UPDATE jobs SET executed_at = now() WHERE id = $1", &[&id])
82+
.await
83+
.context("Updating job executed at")?;
9084

9185
Ok(())
9286
}
9387

9488
// Selects all jobs with:
95-
// - expected_time in the past
89+
// - expected_time in the past
9690
// - error_message is null or executed_at is at least 60 minutes ago (intended to make repeat executions rare enough)
97-
pub async fn get_jobs_to_execute(db: &DbClient) -> Result<Vec<Job>> {
91+
pub async fn get_jobs_to_execute(db: &DbClient) -> Result<Vec<Job>> {
9892
let jobs = db
9993
.query(
10094
"
@@ -123,7 +117,7 @@ pub async fn get_jobs_to_execute(db: &DbClient) -> Result<Vec<Job>> {
123117
frequency_unit,
124118
metadata,
125119
executed_at,
126-
error_message
120+
error_message,
127121
});
128122
}
129123

src/handlers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ mod autolabel;
2828
mod close;
2929
mod github_releases;
3030
mod glacier;
31+
pub mod jobs;
3132
mod major_change;
3233
mod mentions;
3334
mod milestone_prs;
@@ -43,7 +44,6 @@ mod review_submitted;
4344
mod rfc_helper;
4445
mod rustc_commits;
4546
mod shortcut;
46-
pub mod jobs;
4747

4848
pub async fn handle(ctx: &Context, event: &Event) -> Vec<HandlerError> {
4949
let config = config::get(&ctx.github, event.repo()).await;

src/handlers/jobs.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,49 @@
11
// Function to match the scheduled job function with its corresponding handler.
2-
// In case you want to add a new one, just add a new clause to the match with
2+
// In case you want to add a new one, just add a new clause to the match with
33
// the job name and the corresponding function.
44

55
// The metadata is a serde_json::Value
66
// Please refer to https://docs.rs/serde_json/latest/serde_json/value/fn.from_value.html
77
// on how to interpret it as an instance of type T, implementing Serialize/Deserialize.
88

9-
// For example, if we want to sends a Zulip message every Friday at 11:30am ET into #t-release
9+
// For example, if we want to sends a Zulip message every Friday at 11:30am ET into #t-release
1010
// with a @T-release meeting! content, we should create some Job like:
11-
//
11+
//
1212
// #[derive(Serialize, Deserialize)]
1313
// struct ZulipMetadata {
1414
// pub message: String
1515
// }
16-
//
16+
//
1717
// let metadata = serde_json::value::to_value(ZulipMetadata {
1818
// message: "@T-release meeting!".to_string()
1919
// }).unwrap();
20-
//
20+
//
2121
// Job {
2222
// name: "send_zulip_message",
2323
// expected_time: "2022-09-30T11:30:00+10:00",
2424
// frequency: Some(7),
2525
// frequency_unit: Some(FrequencyUnit::Days),
2626
// metadata: metadata
2727
// }
28-
//
28+
//
2929
// ... and add the corresponding "send_zulip_message" handler.
3030

3131
pub async fn handle_job(name: &String, metadata: &serde_json::Value) -> anyhow::Result<()> {
3232
match name {
33-
_ => default(&name, &metadata)
33+
_ => default(&name, &metadata),
3434
}
3535
}
3636

3737
fn default(name: &String, metadata: &serde_json::Value) -> anyhow::Result<()> {
38-
println!("handle_job fell into default case: (name={:?}, metadata={:?})", name, metadata);
39-
tracing::trace!("handle_job fell into default case: (name={:?}, metadata={:?})", name, metadata);
38+
println!(
39+
"handle_job fell into default case: (name={:?}, metadata={:?})",
40+
name, metadata
41+
);
42+
tracing::trace!(
43+
"handle_job fell into default case: (name={:?}, metadata={:?})",
44+
name,
45+
metadata
46+
);
4047

41-
Ok(())
48+
Ok(())
4249
}

src/main.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ use hyper::{header, Body, Request, Response, Server, StatusCode};
77
use reqwest::Client;
88
use route_recognizer::Router;
99
use std::{env, net::SocketAddr, sync::Arc, time::Duration};
10+
use tokio::{task, time::sleep};
1011
use tower::{Service, ServiceExt};
1112
use tracing as log;
1213
use tracing::Instrument;
1314
use triagebot::{db, github, handlers::Context, notification_listing, payload, EventName};
14-
use tokio::{task, time::sleep};
1515

1616
const JOB_PROCESSING_CADENCE_IN_SECS: u64 = 60;
1717

@@ -238,17 +238,18 @@ async fn run_server(addr: SocketAddr) -> anyhow::Result<()> {
238238
let pool = db::ClientPool::new();
239239
db::run_migrations(&*pool.get().await)
240240
.await
241-
.context("database migrations")?;
241+
.context("database migrations")?;
242242

243243
// spawning a background task that will run the scheduled jobs
244244
// every JOB_PROCESSING_CADENCE_IN_SECS
245245
task::spawn(async move {
246246
let pool = db::ClientPool::new();
247247

248-
loop {
248+
loop {
249249
db::run_scheduled_jobs(&*pool.get().await)
250250
.await
251-
.context("run database scheduled jobs").unwrap();
251+
.context("run database scheduled jobs")
252+
.unwrap();
252253

253254
sleep(Duration::from_secs(JOB_PROCESSING_CADENCE_IN_SECS)).await;
254255
}

0 commit comments

Comments
 (0)