From ca5f5f8e847e5247ea4131740a6fa977ce687aaf Mon Sep 17 00:00:00 2001 From: Aram Peres <6775216+aramperes@users.noreply.github.com> Date: Wed, 30 Apr 2025 22:41:31 -0400 Subject: [PATCH 01/12] fix(amqp sink): attempt one reconnect when channel has errored --- src/sinks/amqp/channel.rs | 67 +++++++++++++++++++++++++++++ src/sinks/amqp/config.rs | 5 ++- src/sinks/amqp/integration_tests.rs | 3 +- src/sinks/amqp/mod.rs | 1 + src/sinks/amqp/service.rs | 11 ++++- src/sinks/amqp/sink.rs | 21 +++------ 6 files changed, 91 insertions(+), 17 deletions(-) create mode 100644 src/sinks/amqp/channel.rs diff --git a/src/sinks/amqp/channel.rs b/src/sinks/amqp/channel.rs new file mode 100644 index 0000000000000..4ac1ecee72083 --- /dev/null +++ b/src/sinks/amqp/channel.rs @@ -0,0 +1,67 @@ +use crate::amqp::AmqpConfig; +use lapin::options::ConfirmSelectOptions; +use tokio::sync::{RwLock, RwLockReadGuard}; + +use super::service::AmqpError; + +/// A wrapper around the AMQP channel that handles reconnections. +pub(crate) struct AmqpChannel { + channel: RwLock, + config: AmqpConfig, +} + +impl AmqpChannel { + /// Creates a new AMQP channel. + pub async fn new(config: &AmqpConfig) -> Result { + let channel = Self::new_channel(config).await?; + + Ok(Self { + channel: RwLock::new(channel), + config: config.clone(), + }) + } + + /// Returns a read lock to the AMQP channel. If the current channel is in an error state, + /// it will attempt to reconnect and create a new channel. + pub async fn channel(&self) -> Result, AmqpError> { + let need_reconnect = + { self.channel.read().await.status().state() == lapin::ChannelState::Error }; + + if need_reconnect { + let mut channel = self.channel.write().await; + + // Check if we still need to reconnect after acquiring the write lock. + if channel.status().state() != lapin::ChannelState::Error { + return Ok(channel.downgrade()); + } + + info!( + message = "Recovering broken connection to the AMQP broker.", + internal_log_rate_limit = true, + ); + + *channel = Self::new_channel(&self.config).await?; + + info!( + message = "Recovered connection to the AMQP broker.", + internal_log_rate_limit = true, + ); + } + Ok(self.channel.read().await) + } + + async fn new_channel(config: &AmqpConfig) -> Result { + let (_, channel) = config + .connect() + .await + .map_err(|e| AmqpError::ConnectFailed { error: e })?; + + // Enable confirmations on the channel. + channel + .confirm_select(ConfirmSelectOptions::default()) + .await + .map_err(|e| AmqpError::ConnectFailed { error: Box::new(e) })?; + + Ok(channel) + } +} diff --git a/src/sinks/amqp/config.rs b/src/sinks/amqp/config.rs index 50ea63104f836..78bccbda78feb 100644 --- a/src/sinks/amqp/config.rs +++ b/src/sinks/amqp/config.rs @@ -1,4 +1,5 @@ //! Configuration functionality for the `AMQP` sink. +use super::channel::AmqpChannel; use crate::{amqp::AmqpConfig, sinks::prelude::*}; use lapin::{types::ShortString, BasicProperties}; use std::sync::Arc; @@ -135,9 +136,11 @@ impl SinkConfig for AmqpSinkConfig { } } -pub(super) async fn healthcheck(channel: Arc) -> crate::Result<()> { +pub(super) async fn healthcheck(channel: Arc) -> crate::Result<()> { trace!("Healthcheck started."); + let channel = channel.channel().await?; + if !channel.status().connected() { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::BrokenPipe, diff --git a/src/sinks/amqp/integration_tests.rs b/src/sinks/amqp/integration_tests.rs index e919c5b88e92e..1e877ab0a3547 100644 --- a/src/sinks/amqp/integration_tests.rs +++ b/src/sinks/amqp/integration_tests.rs @@ -3,6 +3,7 @@ use crate::{ amqp::await_connection, config::{SinkConfig, SinkContext}, shutdown::ShutdownSignal, + sinks::amqp::channel::AmqpChannel, template::{Template, UnsignedIntTemplate}, test_util::{ components::{run_and_assert_sink_compliance, SINK_TAGS}, @@ -37,7 +38,7 @@ async fn healthcheck() { let mut config = make_config(); config.exchange = Template::try_from(exchange.as_str()).unwrap(); await_connection(&config.connection).await; - let (_conn, channel) = config.connection.connect().await.unwrap(); + let channel = AmqpChannel::new(&config.connection).await.unwrap(); super::config::healthcheck(Arc::new(channel)).await.unwrap(); } diff --git a/src/sinks/amqp/mod.rs b/src/sinks/amqp/mod.rs index 749f892f1ccd2..ef53974c44c4f 100644 --- a/src/sinks/amqp/mod.rs +++ b/src/sinks/amqp/mod.rs @@ -1,5 +1,6 @@ //! `AMQP` sink. //! Handles version AMQP 0.9.1 which is used by RabbitMQ. +mod channel; mod config; mod encoder; mod request_builder; diff --git a/src/sinks/amqp/service.rs b/src/sinks/amqp/service.rs index 44c475f208cae..1e0b956a99913 100644 --- a/src/sinks/amqp/service.rs +++ b/src/sinks/amqp/service.rs @@ -10,6 +10,8 @@ use std::{ task::{Context, Poll}, }; +use super::channel::AmqpChannel; + /// The request contains the data to send to `AMQP` together /// with the information need to route the message. pub(super) struct AmqpRequest { @@ -79,7 +81,7 @@ impl DriverResponse for AmqpResponse { /// The tower service that handles the actual sending of data to `AMQP`. pub(super) struct AmqpService { - pub(super) channel: Arc, + pub(super) channel: Arc, } #[derive(Debug, Snafu)] @@ -92,6 +94,11 @@ pub(super) enum AmqpError { #[snafu(display("Received Negative Acknowledgement from AMQP broker."))] Nack, + + #[snafu(display("Failed to open AMQP channel: {}", error))] + ConnectFailed { + error: Box, + }, } impl Service for AmqpService { @@ -109,6 +116,8 @@ impl Service for AmqpService { let channel = Arc::clone(&self.channel); Box::pin(async move { + let channel = channel.channel().await?; + let byte_size = req.body.len(); let fut = channel .basic_publish( diff --git a/src/sinks/amqp/sink.rs b/src/sinks/amqp/sink.rs index 1537713be206c..51528dc4489bf 100644 --- a/src/sinks/amqp/sink.rs +++ b/src/sinks/amqp/sink.rs @@ -1,10 +1,11 @@ //! The sink for the `AMQP` sink that wires together the main stream that takes the //! event and sends it to `AMQP`. use crate::sinks::prelude::*; -use lapin::{options::ConfirmSelectOptions, BasicProperties}; +use lapin::BasicProperties; use serde::Serialize; use std::sync::Arc; +use super::channel::AmqpChannel; use super::{ config::{AmqpPropertiesConfig, AmqpSinkConfig}, encoder::AmqpEncoder, @@ -27,7 +28,7 @@ pub(super) struct AmqpEvent { } pub(super) struct AmqpSink { - pub(super) channel: Arc, + pub(super) channel: Arc, exchange: Template, routing_key: Option