Skip to content

Commit ba6e8c0

Browse files
committed
fix(aws_s3 source): add retry delay in sqs::Ingestor
This commit adds logic to wait 500ms between subsequent calls of run_once if it returns an error. This is required to prevent spamming the SQS API if it fails to receive messages, e.g. due to invalid IAM permissions or an SQS outage.
1 parent 96bc594 commit ba6e8c0

File tree

1 file changed

+23
-11
lines changed

1 file changed

+23
-11
lines changed

src/sources/aws_s3/sqs.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::collections::HashMap;
2+
use std::time::Duration;
23
use std::{future::ready, num::NonZeroUsize, panic, sync::Arc, sync::LazyLock};
34

45
use aws_sdk_s3::operation::get_object::GetObjectError;
@@ -401,27 +402,37 @@ impl IngestorProcess {
401402
async fn run(mut self) {
402403
let shutdown = self.shutdown.clone().fuse();
403404
pin!(shutdown);
405+
let delay = Duration::from_millis(500);
404406

405407
loop {
406408
select! {
407409
_ = &mut shutdown => break,
408-
_ = self.run_once() => {},
410+
result = self.run_once() => {
411+
match result {
412+
Ok(()) => {}
413+
Err(_) => {
414+
trace!("run_once failed, will retry after delay");
415+
tokio::time::sleep(delay).await;
416+
}
417+
}
418+
},
409419
}
410420
}
411421
}
412422

413-
async fn run_once(&mut self) {
414-
let messages = self.receive_messages().await;
415-
let messages = messages
416-
.inspect(|messages| {
423+
async fn run_once(&mut self) -> Result<(), ()> {
424+
let messages = match self.receive_messages().await {
425+
Ok(messages) => {
417426
emit!(SqsMessageReceiveSucceeded {
418427
count: messages.len(),
419428
});
420-
})
421-
.inspect_err(|err| {
422-
emit!(SqsMessageReceiveError { error: err });
423-
})
424-
.unwrap_or_default();
429+
messages
430+
}
431+
Err(err) => {
432+
emit!(SqsMessageReceiveError { error: &err });
433+
return Err(());
434+
}
435+
};
425436

426437
let mut delete_entries = Vec::new();
427438
let mut deferred_entries = Vec::new();
@@ -517,7 +528,7 @@ impl IngestorProcess {
517528
message = "Deferred queue not configured, but received deferred entries.",
518529
internal_log_rate_limit = true
519530
);
520-
return;
531+
return Ok(());
521532
};
522533
let cloned_entries = deferred_entries.clone();
523534
match self
@@ -572,6 +583,7 @@ impl IngestorProcess {
572583
}
573584
}
574585
}
586+
Ok(())
575587
}
576588

577589
async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {

0 commit comments

Comments
 (0)