diff --git a/Cargo.lock b/Cargo.lock index 9698a0f4e00fb..0b1e6d2fd327e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2975,11 +2975,25 @@ dependencies = [ "tokio", ] +[[package]] +name = "deadpool" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ed5957ff93768adf7a65ab167a17835c3d2c3c50d084fe305174c112f468e2f" +dependencies = [ + "deadpool-runtime", + "num_cpus", + "tokio", +] + [[package]] name = "deadpool-runtime" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49" +dependencies = [ + "tokio", +] [[package]] name = "der" @@ -11475,6 +11489,7 @@ dependencies = [ "crossterm 0.29.0", "csv", "databend-client", + "deadpool 0.12.2", "derivative", "dirs-next", "dnsmsg-parser", @@ -12771,7 +12786,7 @@ dependencies = [ "assert-json-diff", "async-trait", "base64 0.22.1", - "deadpool", + "deadpool 0.10.0", "futures 0.3.31", "http 1.1.0", "http-body-util", diff --git a/Cargo.toml b/Cargo.toml index e705a9f37d613..089f436bcc8f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -290,6 +290,7 @@ smpl_jwt = { version = "0.8.0", default-features = false, optional = true } # AMQP lapin = { version = "2.5.3", default-features = false, features = ["native-tls"], optional = true } +deadpool = { version = "0.12.2", default-features = false, features = ["managed", "rt_tokio_1"], optional = true } # API async-graphql = { version = "7.0.17", default-features = false, optional = true, features = ["chrono", "playground"] } @@ -784,7 +785,7 @@ sinks-metrics = [ "sinks-splunk_hec" ] -sinks-amqp = ["lapin"] +sinks-amqp = ["deadpool", "lapin"] sinks-appsignal = [] sinks-aws_cloudwatch_logs = ["aws-core", "dep:aws-sdk-cloudwatchlogs", "dep:aws-sdk-kms"] sinks-aws_cloudwatch_metrics = ["aws-core", "dep:aws-sdk-cloudwatch"] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index e107729c35ddc..aaaaac24bed82 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -180,6 +180,7 @@ data-encoding,https://github.com/ia0/data-encoding,MIT,Julien Cretin databend-client,https://github.com/databendlabs/bendsql,Apache-2.0,Databend Authors dbl,https://github.com/RustCrypto/utils,MIT OR Apache-2.0,RustCrypto Developers +deadpool,https://github.com/bikeshedder/deadpool,MIT OR Apache-2.0,Michael P. Jung der,https://github.com/RustCrypto/formats/tree/master/der,Apache-2.0 OR MIT,RustCrypto Developers deranged,https://github.com/jhpratt/deranged,MIT OR Apache-2.0,Jacob Pratt derivative,https://github.com/mcarton/rust-derivative,MIT OR Apache-2.0,mcarton @@ -423,6 +424,7 @@ num-integer,https://github.com/rust-num/num-integer,MIT OR Apache-2.0,The Rust P num-iter,https://github.com/rust-num/num-iter,MIT OR Apache-2.0,The Rust Project Developers num-rational,https://github.com/rust-num/num-rational,MIT OR Apache-2.0,The Rust Project Developers num-traits,https://github.com/rust-num/num-traits,MIT OR Apache-2.0,The Rust Project Developers +num_cpus,https://github.com/seanmonstar/num_cpus,MIT OR Apache-2.0,Sean McArthur num_enum,https://github.com/illicitonion/num_enum,BSD-3-Clause OR MIT OR Apache-2.0,"Daniel Wagner-Hall , Daniel Henry-Mantilla , Vincent Esche " num_threads,https://github.com/jhpratt/num_threads,MIT OR Apache-2.0,Jacob Pratt number_prefix,https://github.com/ogham/rust-number-prefix,MIT,Benjamin Sago diff --git a/changelog.d/22971_reconnect_broken_amqp_sink.fix.md b/changelog.d/22971_reconnect_broken_amqp_sink.fix.md new file mode 100644 index 0000000000000..acd01e4280f13 --- /dev/null +++ b/changelog.d/22971_reconnect_broken_amqp_sink.fix.md @@ -0,0 +1,3 @@ +The `amqp` sink now attempts to re-connect to the AMQP broker when the channel has been disconnected. It will also create up to 4 channels in a pool (configurable with the `max_channels` setting) to improve throughput. + +authors: aramperes diff --git a/docs/DEVELOPING.md b/docs/DEVELOPING.md index 7b9116a2a806b..356f780e54cb6 100644 --- a/docs/DEVELOPING.md +++ b/docs/DEVELOPING.md @@ -126,7 +126,7 @@ Loosely, you'll need the following: - **To run integration tests:** Have `docker` available, or a real live version of that service. (Use `AUTOSPAWN=false`) - **To run `make check-component-features`:** Have `remarshal` installed. - **To run `make check-licenses` or `cargo vdev build licenses`:** Have `dd-rust-license-tool` [installed](https://github.com/DataDog/rust-license-tool). -- **To run `cargo vdev build component-docs`:** Have `cue` [installed](https://cuelang.org/docs/install/). +- **To run `make generate-component-docs`:** Have `cue` [installed](https://cuelang.org/docs/install/). If you find yourself needing to run something inside the Docker environment described above, that's totally fine, they won't collide or hurt each other. In this case, you'd just run `make environment-generate`. @@ -161,7 +161,7 @@ cargo bench transforms::example make fmt cargo fmt # Build component documentation for the website -cargo vdev build component-docs +make generate-component-docs ``` If you run `make` you'll see a full list of all our tasks. Some of these will start Docker containers, sign commits, or even make releases. These are not common development commands and your mileage may vary. diff --git a/src/sinks/amqp/channel.rs b/src/sinks/amqp/channel.rs new file mode 100644 index 0000000000000..862ff06e5d0dc --- /dev/null +++ b/src/sinks/amqp/channel.rs @@ -0,0 +1,86 @@ +use super::config::AmqpSinkConfig; +use super::service::AmqpError; +use crate::amqp::AmqpConfig; +use deadpool::managed::Pool; +use lapin::options::ConfirmSelectOptions; + +pub type AmqpSinkChannels = Pool; + +pub(super) fn new_channel_pool(config: &AmqpSinkConfig) -> crate::Result { + let max_channels = config.max_channels.try_into().map_err(|_| { + Box::new(AmqpError::PoolError { + error: "max_channels must fit into usize".into(), + }) + })?; + if max_channels == 0 { + return Err(Box::new(AmqpError::PoolError { + error: "max_channels must be positive".into(), + })); + } + let channel_manager = AmqpSinkChannelManager::new(&config.connection); + let channels = Pool::builder(channel_manager) + .max_size(max_channels) + .runtime(deadpool::Runtime::Tokio1) + .build()?; + debug!("AMQP channel pool created with max size: {}", max_channels); + Ok(channels) +} + +/// A channel pool manager for the AMQP sink. +/// This manager is responsible for creating and recycling AMQP channels. +/// It uses the `deadpool` crate to manage the channels. +pub(crate) struct AmqpSinkChannelManager { + config: AmqpConfig, +} + +impl deadpool::managed::Manager for AmqpSinkChannelManager { + type Type = lapin::Channel; + type Error = AmqpError; + + async fn create(&self) -> Result { + let channel = Self::new_channel(&self.config).await?; + info!( + message = "Created a new channel to the AMQP broker.", + id = channel.id() + ); + Ok(channel) + } + + async fn recycle( + &self, + channel: &mut Self::Type, + _: &deadpool::managed::Metrics, + ) -> deadpool::managed::RecycleResult { + let state = channel.status().state(); + if state == lapin::ChannelState::Connected { + Ok(()) + } else { + Err((AmqpError::ChannelClosed { state }).into()) + } + } +} + +impl AmqpSinkChannelManager { + /// Creates a new channel pool manager for the AMQP sink. + pub fn new(config: &AmqpConfig) -> Self { + Self { + config: config.clone(), + } + } + + /// Creates a new AMQP channel using the configuration of this sink. + async fn new_channel(config: &AmqpConfig) -> Result { + let (_, channel) = config + .connect() + .await + .map_err(|e| AmqpError::ConnectFailed { error: e })?; + + // Enable confirmations on the channel. + channel + .confirm_select(ConfirmSelectOptions::default()) + .await + .map_err(|e| AmqpError::ConnectFailed { error: Box::new(e) })?; + + Ok(channel) + } +} diff --git a/src/sinks/amqp/config.rs b/src/sinks/amqp/config.rs index 50ea63104f836..afca8b47ccae5 100644 --- a/src/sinks/amqp/config.rs +++ b/src/sinks/amqp/config.rs @@ -1,7 +1,7 @@ //! Configuration functionality for the `AMQP` sink. +use super::channel::AmqpSinkChannels; use crate::{amqp::AmqpConfig, sinks::prelude::*}; use lapin::{types::ShortString, BasicProperties}; -use std::sync::Arc; use vector_lib::{ codecs::TextSerializerConfig, internal_event::{error_stage, error_type}, @@ -90,6 +90,14 @@ pub struct AmqpSinkConfig { skip_serializing_if = "crate::serde::is_default" )] pub(crate) acknowledgements: AcknowledgementsConfig, + + /// Maximum number of AMQP channels to keep active (channels are created as needed). + #[serde(default = "default_max_channels")] + pub(crate) max_channels: u32, +} + +const fn default_max_channels() -> u32 { + 4 } impl Default for AmqpSinkConfig { @@ -101,6 +109,7 @@ impl Default for AmqpSinkConfig { encoding: TextSerializerConfig::default().into(), connection: AmqpConfig::default(), acknowledgements: AcknowledgementsConfig::default(), + max_channels: default_max_channels(), } } } @@ -111,7 +120,8 @@ impl GenerateConfig for AmqpSinkConfig { r#"connection_string = "amqp://localhost:5672/%2f" routing_key = "user_id" exchange = "test" - encoding.codec = "json""#, + encoding.codec = "json" + max_channels = 4"#, ) .unwrap() } @@ -122,7 +132,7 @@ impl GenerateConfig for AmqpSinkConfig { impl SinkConfig for AmqpSinkConfig { async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let sink = AmqpSink::new(self.clone()).await?; - let hc = healthcheck(Arc::clone(&sink.channel)).boxed(); + let hc = healthcheck(sink.channels.clone()).boxed(); Ok((VectorSink::from_event_streamsink(sink), hc)) } @@ -135,9 +145,11 @@ impl SinkConfig for AmqpSinkConfig { } } -pub(super) async fn healthcheck(channel: Arc) -> crate::Result<()> { +pub(super) async fn healthcheck(channels: AmqpSinkChannels) -> crate::Result<()> { trace!("Healthcheck started."); + let channel = channels.get().await?; + if !channel.status().connected() { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::BrokenPipe, diff --git a/src/sinks/amqp/integration_tests.rs b/src/sinks/amqp/integration_tests.rs index e919c5b88e92e..c974dbccb87cc 100644 --- a/src/sinks/amqp/integration_tests.rs +++ b/src/sinks/amqp/integration_tests.rs @@ -3,6 +3,7 @@ use crate::{ amqp::await_connection, config::{SinkConfig, SinkContext}, shutdown::ShutdownSignal, + sinks::amqp::channel::new_channel_pool, template::{Template, UnsignedIntTemplate}, test_util::{ components::{run_and_assert_sink_compliance, SINK_TAGS}, @@ -12,7 +13,7 @@ use crate::{ }; use config::AmqpPropertiesConfig; use futures::StreamExt; -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{collections::HashSet, time::Duration}; use vector_lib::{config::LogNamespace, event::LogEvent}; pub fn make_config() -> AmqpSinkConfig { @@ -37,8 +38,8 @@ async fn healthcheck() { let mut config = make_config(); config.exchange = Template::try_from(exchange.as_str()).unwrap(); await_connection(&config.connection).await; - let (_conn, channel) = config.connection.connect().await.unwrap(); - super::config::healthcheck(Arc::new(channel)).await.unwrap(); + let channels = new_channel_pool(&config).unwrap(); + super::config::healthcheck(channels).await.unwrap(); } #[tokio::test] diff --git a/src/sinks/amqp/mod.rs b/src/sinks/amqp/mod.rs index 749f892f1ccd2..6b478c4adc10d 100644 --- a/src/sinks/amqp/mod.rs +++ b/src/sinks/amqp/mod.rs @@ -1,5 +1,6 @@ //! `AMQP` sink. //! Handles version AMQP 0.9.1 which is used by RabbitMQ. +mod channel; mod config; mod encoder; mod request_builder; @@ -15,7 +16,5 @@ use snafu::Snafu; #[derive(Debug, Snafu)] enum BuildError { #[snafu(display("creating amqp producer failed: {}", source))] - AmqpCreateFailed { - source: Box, - }, + AmqpCreateFailed { source: vector_common::Error }, } diff --git a/src/sinks/amqp/service.rs b/src/sinks/amqp/service.rs index 44c475f208cae..cee7d20344bd7 100644 --- a/src/sinks/amqp/service.rs +++ b/src/sinks/amqp/service.rs @@ -5,10 +5,9 @@ use bytes::Bytes; use futures::future::BoxFuture; use lapin::{options::BasicPublishOptions, BasicProperties}; use snafu::Snafu; -use std::{ - sync::Arc, - task::{Context, Poll}, -}; +use std::task::{Context, Poll}; + +use super::channel::AmqpSinkChannels; /// The request contains the data to send to `AMQP` together /// with the information need to route the message. @@ -79,11 +78,11 @@ impl DriverResponse for AmqpResponse { /// The tower service that handles the actual sending of data to `AMQP`. pub(super) struct AmqpService { - pub(super) channel: Arc, + pub(super) channels: AmqpSinkChannels, } #[derive(Debug, Snafu)] -pub(super) enum AmqpError { +pub enum AmqpError { #[snafu(display("Failed retrieving Acknowledgement: {}", error))] AcknowledgementFailed { error: lapin::Error }, @@ -92,6 +91,15 @@ pub(super) enum AmqpError { #[snafu(display("Received Negative Acknowledgement from AMQP broker."))] Nack, + + #[snafu(display("Failed to open AMQP channel: {}", error))] + ConnectFailed { error: vector_common::Error }, + + #[snafu(display("Channel is not writeable: {:?}", state))] + ChannelClosed { state: lapin::ChannelState }, + + #[snafu(display("Channel pool error: {}", error))] + PoolError { error: vector_common::Error }, } impl Service for AmqpService { @@ -106,9 +114,13 @@ impl Service for AmqpService { } fn call(&mut self, req: AmqpRequest) -> Self::Future { - let channel = Arc::clone(&self.channel); + let channel = self.channels.clone(); Box::pin(async move { + let channel = channel.get().await.map_err(|error| AmqpError::PoolError { + error: Box::new(error), + })?; + let byte_size = req.body.len(); let fut = channel .basic_publish( diff --git a/src/sinks/amqp/sink.rs b/src/sinks/amqp/sink.rs index 1537713be206c..ca40fe46b21e5 100644 --- a/src/sinks/amqp/sink.rs +++ b/src/sinks/amqp/sink.rs @@ -1,10 +1,10 @@ //! The sink for the `AMQP` sink that wires together the main stream that takes the //! event and sends it to `AMQP`. use crate::sinks::prelude::*; -use lapin::{options::ConfirmSelectOptions, BasicProperties}; +use lapin::BasicProperties; use serde::Serialize; -use std::sync::Arc; +use super::channel::AmqpSinkChannels; use super::{ config::{AmqpPropertiesConfig, AmqpSinkConfig}, encoder::AmqpEncoder, @@ -27,7 +27,7 @@ pub(super) struct AmqpEvent { } pub(super) struct AmqpSink { - pub(super) channel: Arc, + pub(super) channels: AmqpSinkChannels, exchange: Template, routing_key: Option