Skip to content

fix(amqp sink): attempt one reconnect when channel has errored #22971

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ smpl_jwt = { version = "0.8.0", default-features = false, optional = true }

# AMQP
lapin = { version = "2.5.3", default-features = false, features = ["native-tls"], optional = true }
deadpool = { version = "0.12.2", default-features = false, features = ["managed", "rt_tokio_1"], optional = true }

# API
async-graphql = { version = "7.0.17", default-features = false, optional = true, features = ["chrono", "playground"] }
Expand Down Expand Up @@ -784,7 +785,7 @@ sinks-metrics = [
"sinks-splunk_hec"
]

sinks-amqp = ["lapin"]
sinks-amqp = ["deadpool", "lapin"]
sinks-appsignal = []
sinks-aws_cloudwatch_logs = ["aws-core", "dep:aws-sdk-cloudwatchlogs", "dep:aws-sdk-kms"]
sinks-aws_cloudwatch_metrics = ["aws-core", "dep:aws-sdk-cloudwatch"]
Expand Down
2 changes: 2 additions & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ data-encoding,https://github.com/ia0/data-encoding,MIT,Julien Cretin <git@ia0.eu
data-url,https://github.com/servo/rust-url,MIT OR Apache-2.0,Simon Sapin <simon.sapin@exyr.org>
databend-client,https://github.com/databendlabs/bendsql,Apache-2.0,Databend Authors <opensource@databend.com>
dbl,https://github.com/RustCrypto/utils,MIT OR Apache-2.0,RustCrypto Developers
deadpool,https://github.com/bikeshedder/deadpool,MIT OR Apache-2.0,Michael P. Jung <michael.jung@terreon.de>
der,https://github.com/RustCrypto/formats/tree/master/der,Apache-2.0 OR MIT,RustCrypto Developers
deranged,https://github.com/jhpratt/deranged,MIT OR Apache-2.0,Jacob Pratt <jacob@jhpratt.dev>
derivative,https://github.com/mcarton/rust-derivative,MIT OR Apache-2.0,mcarton <cartonmartin+git@gmail.com>
Expand Down Expand Up @@ -423,6 +424,7 @@ num-integer,https://github.com/rust-num/num-integer,MIT OR Apache-2.0,The Rust P
num-iter,https://github.com/rust-num/num-iter,MIT OR Apache-2.0,The Rust Project Developers
num-rational,https://github.com/rust-num/num-rational,MIT OR Apache-2.0,The Rust Project Developers
num-traits,https://github.com/rust-num/num-traits,MIT OR Apache-2.0,The Rust Project Developers
num_cpus,https://github.com/seanmonstar/num_cpus,MIT OR Apache-2.0,Sean McArthur <sean@seanmonstar.com>
num_enum,https://github.com/illicitonion/num_enum,BSD-3-Clause OR MIT OR Apache-2.0,"Daniel Wagner-Hall <dawagner@gmail.com>, Daniel Henry-Mantilla <daniel.henry.mantilla@gmail.com>, Vincent Esche <regexident@gmail.com>"
num_threads,https://github.com/jhpratt/num_threads,MIT OR Apache-2.0,Jacob Pratt <open-source@jhpratt.dev>
number_prefix,https://github.com/ogham/rust-number-prefix,MIT,Benjamin Sago <ogham@bsago.me>
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/22971_reconnect_broken_amqp_sink.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `amqp` sink now attempts to re-connect to the AMQP broker when the channel has been disconnected. It will also create up to 4 channels in a pool (configurable with the `max_channels` setting) to improve throughput.

authors: aramperes
4 changes: 2 additions & 2 deletions docs/DEVELOPING.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ Loosely, you'll need the following:
- **To run integration tests:** Have `docker` available, or a real live version of that service. (Use `AUTOSPAWN=false`)
- **To run `make check-component-features`:** Have `remarshal` installed.
- **To run `make check-licenses` or `cargo vdev build licenses`:** Have `dd-rust-license-tool` [installed](https://github.com/DataDog/rust-license-tool).
- **To run `cargo vdev build component-docs`:** Have `cue` [installed](https://cuelang.org/docs/install/).
- **To run `make generate-component-docs`:** Have `cue` [installed](https://cuelang.org/docs/install/).

If you find yourself needing to run something inside the Docker environment described above, that's totally fine, they won't collide or hurt each other. In this case, you'd just run `make environment-generate`.

Expand Down Expand Up @@ -161,7 +161,7 @@ cargo bench transforms::example
make fmt
cargo fmt
# Build component documentation for the website
cargo vdev build component-docs
make generate-component-docs
```

If you run `make` you'll see a full list of all our tasks. Some of these will start Docker containers, sign commits, or even make releases. These are not common development commands and your mileage may vary.
Expand Down
86 changes: 86 additions & 0 deletions src/sinks/amqp/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use super::config::AmqpSinkConfig;
use super::service::AmqpError;
use crate::amqp::AmqpConfig;
use deadpool::managed::Pool;
use lapin::options::ConfirmSelectOptions;

pub type AmqpSinkChannels = Pool<AmqpSinkChannelManager>;

pub(super) fn new_channel_pool(config: &AmqpSinkConfig) -> crate::Result<AmqpSinkChannels> {
let max_channels = config.max_channels.try_into().map_err(|_| {
Box::new(AmqpError::PoolError {
error: "max_channels must fit into usize".into(),
})
})?;
if max_channels == 0 {
return Err(Box::new(AmqpError::PoolError {
error: "max_channels must be positive".into(),
}));
}
let channel_manager = AmqpSinkChannelManager::new(&config.connection);
let channels = Pool::builder(channel_manager)
.max_size(max_channels)
.runtime(deadpool::Runtime::Tokio1)
.build()?;
debug!("AMQP channel pool created with max size: {}", max_channels);
Ok(channels)
}

/// A channel pool manager for the AMQP sink.
/// This manager is responsible for creating and recycling AMQP channels.
/// It uses the `deadpool` crate to manage the channels.
pub(crate) struct AmqpSinkChannelManager {
config: AmqpConfig,
}

impl deadpool::managed::Manager for AmqpSinkChannelManager {
type Type = lapin::Channel;
type Error = AmqpError;

async fn create(&self) -> Result<Self::Type, Self::Error> {
let channel = Self::new_channel(&self.config).await?;
info!(
message = "Created a new channel to the AMQP broker.",
id = channel.id()
);
Ok(channel)
}

async fn recycle(
&self,
channel: &mut Self::Type,
_: &deadpool::managed::Metrics,
) -> deadpool::managed::RecycleResult<Self::Error> {
let state = channel.status().state();
if state == lapin::ChannelState::Connected {
Ok(())
} else {
Err((AmqpError::ChannelClosed { state }).into())
}
}
}

impl AmqpSinkChannelManager {
/// Creates a new channel pool manager for the AMQP sink.
pub fn new(config: &AmqpConfig) -> Self {
Self {
config: config.clone(),
}
}

/// Creates a new AMQP channel using the configuration of this sink.
async fn new_channel(config: &AmqpConfig) -> Result<lapin::Channel, AmqpError> {
let (_, channel) = config
.connect()
.await
.map_err(|e| AmqpError::ConnectFailed { error: e })?;

// Enable confirmations on the channel.
channel
.confirm_select(ConfirmSelectOptions::default())
.await
.map_err(|e| AmqpError::ConnectFailed { error: Box::new(e) })?;

Ok(channel)
}
}
20 changes: 16 additions & 4 deletions src/sinks/amqp/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Configuration functionality for the `AMQP` sink.
use super::channel::AmqpSinkChannels;
use crate::{amqp::AmqpConfig, sinks::prelude::*};
use lapin::{types::ShortString, BasicProperties};
use std::sync::Arc;
use vector_lib::{
codecs::TextSerializerConfig,
internal_event::{error_stage, error_type},
Expand Down Expand Up @@ -90,6 +90,14 @@ pub struct AmqpSinkConfig {
skip_serializing_if = "crate::serde::is_default"
)]
pub(crate) acknowledgements: AcknowledgementsConfig,

/// Maximum number of AMQP channels to keep active (channels are created as needed).
#[serde(default = "default_max_channels")]
pub(crate) max_channels: u32,
}

const fn default_max_channels() -> u32 {
4
}

impl Default for AmqpSinkConfig {
Expand All @@ -101,6 +109,7 @@ impl Default for AmqpSinkConfig {
encoding: TextSerializerConfig::default().into(),
connection: AmqpConfig::default(),
acknowledgements: AcknowledgementsConfig::default(),
max_channels: default_max_channels(),
}
}
}
Expand All @@ -111,7 +120,8 @@ impl GenerateConfig for AmqpSinkConfig {
r#"connection_string = "amqp://localhost:5672/%2f"
routing_key = "user_id"
exchange = "test"
encoding.codec = "json""#,
encoding.codec = "json"
max_channels = 4"#,
)
.unwrap()
}
Expand All @@ -122,7 +132,7 @@ impl GenerateConfig for AmqpSinkConfig {
impl SinkConfig for AmqpSinkConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let sink = AmqpSink::new(self.clone()).await?;
let hc = healthcheck(Arc::clone(&sink.channel)).boxed();
let hc = healthcheck(sink.channels.clone()).boxed();
Ok((VectorSink::from_event_streamsink(sink), hc))
}

Expand All @@ -135,9 +145,11 @@ impl SinkConfig for AmqpSinkConfig {
}
}

pub(super) async fn healthcheck(channel: Arc<lapin::Channel>) -> crate::Result<()> {
pub(super) async fn healthcheck(channels: AmqpSinkChannels) -> crate::Result<()> {
trace!("Healthcheck started.");

let channel = channels.get().await?;

if !channel.status().connected() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
Expand Down
7 changes: 4 additions & 3 deletions src/sinks/amqp/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
amqp::await_connection,
config::{SinkConfig, SinkContext},
shutdown::ShutdownSignal,
sinks::amqp::channel::new_channel_pool,
template::{Template, UnsignedIntTemplate},
test_util::{
components::{run_and_assert_sink_compliance, SINK_TAGS},
Expand All @@ -12,7 +13,7 @@ use crate::{
};
use config::AmqpPropertiesConfig;
use futures::StreamExt;
use std::{collections::HashSet, sync::Arc, time::Duration};
use std::{collections::HashSet, time::Duration};
use vector_lib::{config::LogNamespace, event::LogEvent};

pub fn make_config() -> AmqpSinkConfig {
Expand All @@ -37,8 +38,8 @@ async fn healthcheck() {
let mut config = make_config();
config.exchange = Template::try_from(exchange.as_str()).unwrap();
await_connection(&config.connection).await;
let (_conn, channel) = config.connection.connect().await.unwrap();
super::config::healthcheck(Arc::new(channel)).await.unwrap();
let channels = new_channel_pool(&config).unwrap();
super::config::healthcheck(channels).await.unwrap();
}

#[tokio::test]
Expand Down
5 changes: 2 additions & 3 deletions src/sinks/amqp/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! `AMQP` sink.
//! Handles version AMQP 0.9.1 which is used by RabbitMQ.
mod channel;
mod config;
mod encoder;
mod request_builder;
Expand All @@ -15,7 +16,5 @@ use snafu::Snafu;
#[derive(Debug, Snafu)]
enum BuildError {
#[snafu(display("creating amqp producer failed: {}", source))]
AmqpCreateFailed {
source: Box<dyn std::error::Error + Send + Sync>,
},
AmqpCreateFailed { source: vector_common::Error },
}
26 changes: 19 additions & 7 deletions src/sinks/amqp/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ use bytes::Bytes;
use futures::future::BoxFuture;
use lapin::{options::BasicPublishOptions, BasicProperties};
use snafu::Snafu;
use std::{
sync::Arc,
task::{Context, Poll},
};
use std::task::{Context, Poll};

use super::channel::AmqpSinkChannels;

/// The request contains the data to send to `AMQP` together
/// with the information need to route the message.
Expand Down Expand Up @@ -79,11 +78,11 @@ impl DriverResponse for AmqpResponse {

/// The tower service that handles the actual sending of data to `AMQP`.
pub(super) struct AmqpService {
pub(super) channel: Arc<lapin::Channel>,
pub(super) channels: AmqpSinkChannels,
}

#[derive(Debug, Snafu)]
pub(super) enum AmqpError {
pub enum AmqpError {
#[snafu(display("Failed retrieving Acknowledgement: {}", error))]
AcknowledgementFailed { error: lapin::Error },

Expand All @@ -92,6 +91,15 @@ pub(super) enum AmqpError {

#[snafu(display("Received Negative Acknowledgement from AMQP broker."))]
Nack,

#[snafu(display("Failed to open AMQP channel: {}", error))]
ConnectFailed { error: vector_common::Error },

#[snafu(display("Channel is not writeable: {:?}", state))]
ChannelClosed { state: lapin::ChannelState },

#[snafu(display("Channel pool error: {}", error))]
PoolError { error: vector_common::Error },
}

impl Service<AmqpRequest> for AmqpService {
Expand All @@ -106,9 +114,13 @@ impl Service<AmqpRequest> for AmqpService {
}

fn call(&mut self, req: AmqpRequest) -> Self::Future {
let channel = Arc::clone(&self.channel);
let channel = self.channels.clone();

Box::pin(async move {
let channel = channel.get().await.map_err(|error| AmqpError::PoolError {
error: Box::new(error),
})?;

let byte_size = req.body.len();
let fut = channel
.basic_publish(
Expand Down
Loading
Loading