Skip to content

Commit ae18182

Browse files
committed
fix bug in super_stream_producer client created twice
super_stream_consumer super_stream locator not closed
1 parent 4abc07a commit ae18182

File tree

2 files changed

+5
-1
lines changed

2 files changed

+5
-1
lines changed

src/superstream_consumer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::sync::Arc;
1212
use std::task::{Context, Poll};
1313
use tokio::sync::mpsc::{channel, Receiver};
1414
use tokio::task;
15+
use crate::{client::Client};
1516

1617
/// API for consuming RabbitMQ stream messages
1718
pub struct SuperStreamConsumer {
@@ -23,6 +24,7 @@ struct SuperStreamConsumerInternal {
2324
closed: Arc<AtomicBool>,
2425
handlers: Vec<ConsumerHandle>,
2526
waker: AtomicWaker,
27+
client: Client,
2628
}
2729

2830
/// Builder for [`Consumer`]
@@ -75,6 +77,7 @@ impl SuperStreamConsumerBuilder {
7577
closed: Arc::new(AtomicBool::new(false)),
7678
handlers,
7779
waker: AtomicWaker::new(),
80+
client,
7881
};
7982

8083
Ok(SuperStreamConsumer {
@@ -135,6 +138,7 @@ impl SuperStreamConsumerHandle {
135138
for handle in &self.0.handlers {
136139
handle.internal_close().await.unwrap();
137140
}
141+
self.0.client.close().await?;
138142
Ok(())
139143
}
140144
_ => Err(ConsumerCloseError::AlreadyClosed),

src/superstream_producer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ impl<T> SuperStreamProducerBuilder<T> {
117117

118118
let super_stream_metadata = DefaultSuperStreamMetadata {
119119
super_stream: super_stream.to_string(),
120-
client: self.environment.create_client().await?,
120+
client: client.clone(),
121121
partitions: Vec::new(),
122122
routes: Vec::new(),
123123
};

0 commit comments

Comments
 (0)