Skip to content

Commit e4e33f3

Browse files
committed
make amqp max_channels configurable; default to 4; added to docs and changelog
1 parent c9b2320 commit e4e33f3

File tree

4 files changed

+29
-3
lines changed

4 files changed

+29
-3
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
The `amqp` sink now attempts to re-connect to the AMQP broker when the channel has been disconnected.
1+
The `amqp` sink now attempts to re-connect to the AMQP broker when the channel has been disconnected. It will also create up to 4 channels in a pool (configurable with the `max_channels` configuration) to improve throughput.
22

33
authors: aramperes

src/sinks/amqp/channel.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,22 @@ use lapin::options::ConfirmSelectOptions;
77
pub type AmqpSinkChannels = Pool<AmqpSinkChannelManager>;
88

99
pub(super) fn new_channel_pool(config: &AmqpSinkConfig) -> crate::Result<AmqpSinkChannels> {
10+
let max_channels = config.max_channels.try_into().map_err(|_| {
11+
Box::new(AmqpError::PoolError {
12+
error: "max_channels must fit into usize".into(),
13+
})
14+
})?;
15+
if max_channels == 0 {
16+
return Err(Box::new(AmqpError::PoolError {
17+
error: "max_channels must be positive".into(),
18+
}));
19+
}
1020
let channel_manager = AmqpSinkChannelManager::new(&config.connection);
1121
let channels = Pool::builder(channel_manager)
12-
.max_size(4)
22+
.max_size(max_channels)
1323
.runtime(deadpool::Runtime::Tokio1)
1424
.build()?;
25+
debug!("AMQP channel pool created with max size: {}", max_channels);
1526
Ok(channels)
1627
}
1728

src/sinks/amqp/config.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,14 @@ pub struct AmqpSinkConfig {
9090
skip_serializing_if = "crate::serde::is_default"
9191
)]
9292
pub(crate) acknowledgements: AcknowledgementsConfig,
93+
94+
/// Maximum number of AMQP channels to keep active (channels are created as needed).
95+
#[serde(default = "default_max_channels")]
96+
pub(crate) max_channels: u32,
97+
}
98+
99+
const fn default_max_channels() -> u32 {
100+
4
93101
}
94102

95103
impl Default for AmqpSinkConfig {
@@ -101,6 +109,7 @@ impl Default for AmqpSinkConfig {
101109
encoding: TextSerializerConfig::default().into(),
102110
connection: AmqpConfig::default(),
103111
acknowledgements: AcknowledgementsConfig::default(),
112+
max_channels: default_max_channels(),
104113
}
105114
}
106115
}
@@ -111,7 +120,8 @@ impl GenerateConfig for AmqpSinkConfig {
111120
r#"connection_string = "amqp://localhost:5672/%2f"
112121
routing_key = "user_id"
113122
exchange = "test"
114-
encoding.codec = "json""#,
123+
encoding.codec = "json"
124+
max_channels = 4"#,
115125
)
116126
.unwrap()
117127
}

website/cue/reference/components/sinks/base/amqp.cue

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,11 @@ base: components: sinks: amqp: configuration: {
392392
required: true
393393
type: string: syntax: "template"
394394
}
395+
max_channels: {
396+
description: "Maximum number of AMQP channels to keep active (channels are created as needed)."
397+
required: false
398+
type: uint: default: 4
399+
}
395400
properties: {
396401
description: """
397402
Configure the AMQP message properties.

0 commit comments

Comments
 (0)