Skip to content

Commit 83df93d

Browse files
authored
Merge pull request #13 from ComputerScienceHouse/logging-update-worker
Kill Worker If Redis Connection/Job Fetch Fails Repeatedly
2 parents 5e80ff8 + 4d5a91a commit 83df93d

File tree

1 file changed

+14
-7
lines changed

1 file changed

+14
-7
lines changed

src/worker.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
use std::{collections::HashSet, env};
2-
31
use anyhow::{anyhow, Result};
42
use log::error;
53
use redis_work_queue::{Item, KeyPrefix, WorkQueue};
64
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
5+
use std::{collections::HashSet, env, thread, time};
76

87
use crate::{
98
app::{RedisJob, SimpleRiderChange},
@@ -46,8 +45,7 @@ pub async fn main() -> Result<()> {
4645
env::var("PINGS_REMOVE_ROUTE").expect("PINGS_REMOVE_ROUTE must be set"),
4746
)?;
4847

49-
work_loop(queue, db_pool, pings).await?;
50-
Ok(())
48+
work_loop(queue, db_pool, pings).await
5149
}
5250

5351
async fn get_simple_data(
@@ -263,27 +261,36 @@ pub async fn work_loop(
263261
db_pool: Pool<Postgres>,
264262
pings: PingClient,
265263
) -> Result<()> {
264+
let mut queue_connect_failure = 0;
265+
let three_sec = time::Duration::from_secs(3);
266266
loop {
267267
// Wait for a job with no timeout and a lease time of 5 seconds.
268268
let job: Item = match queue.get_job().await {
269269
Ok(job) => job,
270270
Err(err) => {
271-
error!("{}", err);
271+
error!("Failed to Get Job: {}", err);
272+
queue_connect_failure += 1;
273+
thread::sleep(three_sec);
274+
if queue_connect_failure >= 3 {
275+
error!("Failed to Fetch Job 3+ Times! Failing...");
276+
return Err(anyhow!("Fetch Job Failed 3 Times. Is Redis Running?"));
277+
}
272278
continue;
273279
}
274280
};
281+
queue_connect_failure = 0;
275282
match work(&job, &db_pool, &pings, &mut queue).await {
276283
// Mark successful jobs as complete
277284
Ok(()) => {
278285
queue.complete(&job).await?;
279286
}
280287
// Drop a job that should be retried - it will be returned to the work queue after
281288
// the (5 second) lease expires.
282-
Err(err) if err.should_retry => error!("{}", err.msg),
289+
Err(err) if err.should_retry => error!("Job Failed: {}, Retrying", err.msg),
283290
// Errors that shouldn't cause a retry should mark the job as complete so it isn't
284291
// tried again.
285292
Err(err) => {
286-
error!("{}", err.msg);
293+
error!("Job Failed: {}, Not Retrying", err.msg);
287294
queue.complete(&job).await?;
288295
}
289296
}

0 commit comments

Comments
 (0)