Skip to content

Commit c5d29f5

Browse files
committed
Update integration tests
1 parent db7723f commit c5d29f5

File tree

3 files changed

+9
-11
lines changed

3 files changed

+9
-11
lines changed

src/sinks/amqp/channel.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1+
use super::config::AmqpSinkConfig;
12
use super::service::AmqpError;
23
use crate::amqp::AmqpConfig;
34
use deadpool::managed::Pool;
45
use lapin::options::ConfirmSelectOptions;
56

67
pub type AmqpSinkChannels = Pool<AmqpSinkChannelManager>;
78

8-
pub(super) fn new_channel_pool(config: &AmqpConfig) -> crate::Result<AmqpSinkChannels> {
9-
let channel_manager = AmqpSinkChannelManager::new(config);
9+
pub(super) fn new_channel_pool(config: &AmqpSinkConfig) -> crate::Result<AmqpSinkChannels> {
10+
let channel_manager = AmqpSinkChannelManager::new(&config.connection);
1011
let channels = Pool::builder(channel_manager)
1112
.max_size(4)
1213
.runtime(deadpool::Runtime::Tokio1)

src/sinks/amqp/integration_tests.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::{
33
amqp::await_connection,
44
config::{SinkConfig, SinkContext},
55
shutdown::ShutdownSignal,
6-
sinks::amqp::channel::AmqpChannel,
6+
sinks::amqp::channel::new_channel_pool,
77
template::{Template, UnsignedIntTemplate},
88
test_util::{
99
components::{run_and_assert_sink_compliance, SINK_TAGS},
@@ -13,7 +13,7 @@ use crate::{
1313
};
1414
use config::AmqpPropertiesConfig;
1515
use futures::StreamExt;
16-
use std::{collections::HashSet, sync::Arc, time::Duration};
16+
use std::{collections::HashSet, time::Duration};
1717
use vector_lib::{config::LogNamespace, event::LogEvent};
1818

1919
pub fn make_config() -> AmqpSinkConfig {
@@ -38,8 +38,8 @@ async fn healthcheck() {
3838
let mut config = make_config();
3939
config.exchange = Template::try_from(exchange.as_str()).unwrap();
4040
await_connection(&config.connection).await;
41-
let channel = AmqpChannel::new(&config.connection).await.unwrap();
42-
super::config::healthcheck(Arc::new(channel)).await.unwrap();
41+
let channels = new_channel_pool(&config).unwrap();
42+
super::config::healthcheck(channels).await.unwrap();
4343
}
4444

4545
#[tokio::test]

src/sinks/amqp/sink.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,8 @@ pub(super) struct AmqpSink {
3737

3838
impl AmqpSink {
3939
pub(super) async fn new(config: AmqpSinkConfig) -> crate::Result<Self> {
40-
let channels = super::channel::new_channel_pool(&config.connection).map_err(|e| {
41-
BuildError::AmqpCreateFailed {
42-
source: e,
43-
}
44-
})?;
40+
let channels = super::channel::new_channel_pool(&config)
41+
.map_err(|e| BuildError::AmqpCreateFailed { source: e })?;
4542

4643
let transformer = config.encoding.transformer();
4744
let serializer = config.encoding.build()?;

0 commit comments

Comments
 (0)