diff --git a/src/sources/aws_s3/sqs.rs b/src/sources/aws_s3/sqs.rs index 33a0efafe7fd8..2646d3e20e716 100644 --- a/src/sources/aws_s3/sqs.rs +++ b/src/sources/aws_s3/sqs.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::time::Duration; use std::{future::ready, num::NonZeroUsize, panic, sync::Arc, sync::LazyLock}; use aws_sdk_s3::operation::get_object::GetObjectError; @@ -401,27 +402,37 @@ impl IngestorProcess { async fn run(mut self) { let shutdown = self.shutdown.clone().fuse(); pin!(shutdown); + let delay = Duration::from_millis(500); loop { select! { _ = &mut shutdown => break, - _ = self.run_once() => {}, + result = self.run_once() => { + match result { + Ok(()) => {} + Err(_) => { + trace!("run_once failed, will retry after delay"); + tokio::time::sleep(delay).await; + } + } + }, } } } - async fn run_once(&mut self) { - let messages = self.receive_messages().await; - let messages = messages - .inspect(|messages| { + async fn run_once(&mut self) -> Result<(), ()> { + let messages = match self.receive_messages().await { + Ok(messages) => { emit!(SqsMessageReceiveSucceeded { count: messages.len(), }); - }) - .inspect_err(|err| { - emit!(SqsMessageReceiveError { error: err }); - }) - .unwrap_or_default(); + messages + } + Err(err) => { + emit!(SqsMessageReceiveError { error: &err }); + return Err(()); + } + }; let mut delete_entries = Vec::new(); let mut deferred_entries = Vec::new(); @@ -517,7 +528,7 @@ impl IngestorProcess { message = "Deferred queue not configured, but received deferred entries.", internal_log_rate_limit = true ); - return; + return Ok(()); }; let cloned_entries = deferred_entries.clone(); match self @@ -572,6 +583,7 @@ impl IngestorProcess { } } } + Ok(()) } async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {