From ba6e8c0b4dce625a801ca76dcfe8ced96e1f7b4e Mon Sep 17 00:00:00 2001 From: Adam Medzinski Date: Tue, 6 May 2025 23:17:40 +0200 Subject: [PATCH] fix(aws_s3 source): add retry delay in sqs::Ingestor This commit adds logic to wait 500ms between subsequent calls of run_once if it returns an error. This is required to prevent spamming the SQS API if it fails to receive messages, e.g. due to invalid IAM permissions or an SQS outage. --- src/sources/aws_s3/sqs.rs | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/src/sources/aws_s3/sqs.rs b/src/sources/aws_s3/sqs.rs index 33a0efafe7fd8..2646d3e20e716 100644 --- a/src/sources/aws_s3/sqs.rs +++ b/src/sources/aws_s3/sqs.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::time::Duration; use std::{future::ready, num::NonZeroUsize, panic, sync::Arc, sync::LazyLock}; use aws_sdk_s3::operation::get_object::GetObjectError; @@ -401,27 +402,37 @@ impl IngestorProcess { async fn run(mut self) { let shutdown = self.shutdown.clone().fuse(); pin!(shutdown); + let delay = Duration::from_millis(500); loop { select! { _ = &mut shutdown => break, - _ = self.run_once() => {}, + result = self.run_once() => { + match result { + Ok(()) => {} + Err(_) => { + trace!("run_once failed, will retry after delay"); + tokio::time::sleep(delay).await; + } + } + }, } } } - async fn run_once(&mut self) { - let messages = self.receive_messages().await; - let messages = messages - .inspect(|messages| { + async fn run_once(&mut self) -> Result<(), ()> { + let messages = match self.receive_messages().await { + Ok(messages) => { emit!(SqsMessageReceiveSucceeded { count: messages.len(), }); - }) - .inspect_err(|err| { - emit!(SqsMessageReceiveError { error: err }); - }) - .unwrap_or_default(); + messages + } + Err(err) => { + emit!(SqsMessageReceiveError { error: &err }); + return Err(()); + } + }; let mut delete_entries = Vec::new(); let mut deferred_entries = Vec::new(); @@ -517,7 +528,7 @@ impl IngestorProcess { message = "Deferred queue not configured, but received deferred entries.", internal_log_rate_limit = true ); - return; + return Ok(()); }; let cloned_entries = deferred_entries.clone(); match self @@ -572,6 +583,7 @@ impl IngestorProcess { } } } + Ok(()) } async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {