Skip to content

Commit c533357

Browse files
author
Mauricio Cassola
committed
Rename expected_time to scheduled_at
1 parent f96b297 commit c533357

File tree

3 files changed

+29
-38
lines changed

3 files changed

+29
-38
lines changed

src/db.rs

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -184,39 +184,34 @@ pub async fn run_migrations(client: &DbClient) -> anyhow::Result<()> {
184184

185185
pub async fn run_scheduled_jobs(db: &DbClient) -> anyhow::Result<()> {
186186
let jobs = get_jobs_to_execute(&db).await.unwrap();
187-
println!("jobs to execute: {:#?}", jobs);
188187
tracing::trace!("jobs to execute: {:#?}", jobs);
189188

190189
for job in jobs.iter() {
191190
update_job_executed_at(&db, &job.id).await?;
192191

192+
if let Some(frequency) = job.frequency {
193+
let duration = get_duration_from_cron(frequency, job.frequency_unit.as_ref().unwrap());
194+
let new_scheduled_at = job.scheduled_at.checked_add_signed(duration).unwrap();
195+
196+
insert_job(
197+
&db,
198+
&job.name,
199+
&new_scheduled_at,
200+
&Some(frequency),
201+
&job.frequency_unit,
202+
&job.metadata,
203+
)
204+
.await?;
205+
tracing::trace!("job succesfully reinserted (name={})", job.name);
206+
}
207+
193208
match handle_job(&job.name, &job.metadata).await {
194209
Ok(_) => {
195-
println!("job succesfully executed (id={})", job.id);
196210
tracing::trace!("job succesfully executed (id={})", job.id);
197211

198-
if let Some(frequency) = job.frequency {
199-
let duration =
200-
get_duration_from_cron(frequency, job.frequency_unit.as_ref().unwrap());
201-
let new_expected_time = job.expected_time.checked_add_signed(duration).unwrap();
202-
203-
insert_job(
204-
&db,
205-
&job.name,
206-
&new_expected_time,
207-
&Some(frequency),
208-
&job.frequency_unit,
209-
&job.metadata,
210-
)
211-
.await?;
212-
println!("job succesfully reinserted (name={})", job.name);
213-
tracing::trace!("job succesfully reinserted (name={})", job.name);
214-
}
215-
216212
delete_job(&db, &job.id).await?;
217213
}
218214
Err(e) => {
219-
println!("job failed on execution (id={:?}, error={:?})", job.id, e);
220215
tracing::trace!("job failed on execution (id={:?}, error={:?})", job.id, e);
221216

222217
update_job_error_message(&db, &job.id, &e.to_string()).await?;
@@ -271,7 +266,7 @@ CREATE TYPE frequency_unit AS ENUM ('days', 'hours', 'minutes', 'seconds');
271266
CREATE TABLE jobs (
272267
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
273268
name TEXT NOT NULL,
274-
expected_time TIMESTAMP WITH TIME ZONE NOT NULL,
269+
scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL,
275270
frequency INTEGER,
276271
frequency_unit frequency_unit,
277272
metadata JSONB,
@@ -280,9 +275,9 @@ CREATE TABLE jobs (
280275
);
281276
",
282277
"
283-
CREATE UNIQUE INDEX jobs_name_expected_time_unique_index
278+
CREATE UNIQUE INDEX jobs_name_scheduled_at_unique_index
284279
ON jobs (
285-
name, expected_time
280+
name, scheduled_at
286281
);
287282
",
288283
];

src/db/jobs.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use uuid::Uuid;
1010
pub struct Job {
1111
pub id: Uuid,
1212
pub name: String,
13-
pub expected_time: DateTime<FixedOffset>,
13+
pub scheduled_at: DateTime<FixedOffset>,
1414
pub frequency: Option<i32>,
1515
pub frequency_unit: Option<FrequencyUnit>,
1616
pub metadata: serde_json::Value,
@@ -34,17 +34,17 @@ pub enum FrequencyUnit {
3434
pub async fn insert_job(
3535
db: &DbClient,
3636
name: &String,
37-
expected_time: &DateTime<FixedOffset>,
37+
scheduled_at: &DateTime<FixedOffset>,
3838
frequency: &Option<i32>,
3939
frequency_unit: &Option<FrequencyUnit>,
4040
metadata: &serde_json::Value,
4141
) -> Result<()> {
4242
tracing::trace!("insert_job(name={})", name);
4343

4444
db.execute(
45-
"INSERT INTO jobs (name, expected_time, frequency, frequency_unit, metadata) VALUES ($1, $2, $3, $4, $5)
46-
ON CONFLICT (name, expected_time) DO UPDATE SET metadata = EXCLUDED.metadata",
47-
&[&name, &expected_time, &frequency, &frequency_unit, &metadata],
45+
"INSERT INTO jobs (name, scheduled_at, frequency, frequency_unit, metadata) VALUES ($1, $2, $3, $4, $5)
46+
ON CONFLICT (name, scheduled_at) DO UPDATE SET metadata = EXCLUDED.metadata",
47+
&[&name, &scheduled_at, &frequency, &frequency_unit, &metadata],
4848
)
4949
.await
5050
.context("Inserting job")?;
@@ -86,13 +86,13 @@ pub async fn update_job_executed_at(db: &DbClient, id: &Uuid) -> Result<()> {
8686
}
8787

8888
// Selects all jobs with:
89-
// - expected_time in the past
89+
// - scheduled_at in the past
9090
// - error_message is null or executed_at is at least 60 minutes ago (intended to make repeat executions rare enough)
9191
pub async fn get_jobs_to_execute(db: &DbClient) -> Result<Vec<Job>> {
9292
let jobs = db
9393
.query(
9494
"
95-
SELECT * FROM jobs WHERE expected_time <= now() AND (error_message IS NULL OR executed_at <= now() - INTERVAL '60 minutes')",
95+
SELECT * FROM jobs WHERE scheduled_at <= now() AND (error_message IS NULL OR executed_at <= now() - INTERVAL '60 minutes')",
9696
&[],
9797
)
9898
.await
@@ -102,7 +102,7 @@ pub async fn get_jobs_to_execute(db: &DbClient) -> Result<Vec<Job>> {
102102
for job in jobs {
103103
let id: Uuid = job.get(0);
104104
let name: String = job.get(1);
105-
let expected_time: DateTime<FixedOffset> = job.get(2);
105+
let scheduled_at: DateTime<FixedOffset> = job.get(2);
106106
let frequency: Option<i32> = job.get(3);
107107
let frequency_unit: Option<FrequencyUnit> = job.get(4);
108108
let metadata: serde_json::Value = job.get(5);
@@ -112,7 +112,7 @@ pub async fn get_jobs_to_execute(db: &DbClient) -> Result<Vec<Job>> {
112112
data.push(Job {
113113
id,
114114
name,
115-
expected_time,
115+
scheduled_at,
116116
frequency,
117117
frequency_unit,
118118
metadata,

src/handlers/jobs.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
//
2121
// Job {
2222
// name: "send_zulip_message",
23-
// expected_time: "2022-09-30T11:30:00+10:00",
23+
// scheduled_at: "2022-09-30T11:30:00+10:00",
2424
// frequency: Some(7),
2525
// frequency_unit: Some(FrequencyUnit::Days),
2626
// metadata: metadata
@@ -35,10 +35,6 @@ pub async fn handle_job(name: &String, metadata: &serde_json::Value) -> anyhow::
3535
}
3636

3737
fn default(name: &String, metadata: &serde_json::Value) -> anyhow::Result<()> {
38-
println!(
39-
"handle_job fell into default case: (name={:?}, metadata={:?})",
40-
name, metadata
41-
);
4238
tracing::trace!(
4339
"handle_job fell into default case: (name={:?}, metadata={:?})",
4440
name,

0 commit comments

Comments
 (0)