|
1 | 1 | use std::collections::HashMap;
|
2 | 2 | use std::{future::ready, num::NonZeroUsize, panic, sync::Arc, sync::LazyLock};
|
| 3 | +use std::time::Duration; |
3 | 4 |
|
4 | 5 | use aws_sdk_s3::operation::get_object::GetObjectError;
|
5 | 6 | use aws_sdk_s3::Client as S3Client;
|
@@ -401,27 +402,37 @@ impl IngestorProcess {
|
401 | 402 | async fn run(mut self) {
|
402 | 403 | let shutdown = self.shutdown.clone().fuse();
|
403 | 404 | pin!(shutdown);
|
| 405 | + let delay = Duration::from_millis(500); |
404 | 406 |
|
405 | 407 | loop {
|
406 | 408 | select! {
|
407 | 409 | _ = &mut shutdown => break,
|
408 |
| - _ = self.run_once() => {}, |
| 410 | + result = self.run_once() => { |
| 411 | + match result { |
| 412 | + Ok(()) => {} |
| 413 | + Err(_) => { |
| 414 | + trace!("run_once failed, will retry after delay"); |
| 415 | + tokio::time::sleep(delay).await; |
| 416 | + } |
| 417 | + } |
| 418 | + }, |
409 | 419 | }
|
410 | 420 | }
|
411 | 421 | }
|
412 | 422 |
|
413 |
| - async fn run_once(&mut self) { |
414 |
| - let messages = self.receive_messages().await; |
415 |
| - let messages = messages |
416 |
| - .inspect(|messages| { |
| 423 | + async fn run_once(&mut self) -> Result<(), ()> { |
| 424 | + let messages = match self.receive_messages().await { |
| 425 | + Ok(messages) => { |
417 | 426 | emit!(SqsMessageReceiveSucceeded {
|
418 | 427 | count: messages.len(),
|
419 | 428 | });
|
420 |
| - }) |
421 |
| - .inspect_err(|err| { |
422 |
| - emit!(SqsMessageReceiveError { error: err }); |
423 |
| - }) |
424 |
| - .unwrap_or_default(); |
| 429 | + messages |
| 430 | + } |
| 431 | + Err(err) => { |
| 432 | + emit!(SqsMessageReceiveError { error: &err }); |
| 433 | + return Err(()); |
| 434 | + } |
| 435 | + }; |
425 | 436 |
|
426 | 437 | let mut delete_entries = Vec::new();
|
427 | 438 | let mut deferred_entries = Vec::new();
|
@@ -517,7 +528,7 @@ impl IngestorProcess {
|
517 | 528 | message = "Deferred queue not configured, but received deferred entries.",
|
518 | 529 | internal_log_rate_limit = true
|
519 | 530 | );
|
520 |
| - return; |
| 531 | + return Ok(()); |
521 | 532 | };
|
522 | 533 | let cloned_entries = deferred_entries.clone();
|
523 | 534 | match self
|
@@ -572,6 +583,7 @@ impl IngestorProcess {
|
572 | 583 | }
|
573 | 584 | }
|
574 | 585 | }
|
| 586 | + return Ok(()); |
575 | 587 | }
|
576 | 588 |
|
577 | 589 | async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {
|
|
0 commit comments