Skip to content

Commit e06146d

Browse files
committed
fix bug in super_stream_producer client created twice
super_stream_consumer super_stream locator not closed
1 parent e8f7c12 commit e06146d

File tree

2 files changed

+5
-1
lines changed

2 files changed

+5
-1
lines changed

src/superstream_consumer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::client::Client;
12
use crate::consumer::Delivery;
23
use crate::error::{ConsumerCloseError, ConsumerDeliveryError};
34
use crate::superstream::DefaultSuperStreamMetadata;
@@ -23,6 +24,7 @@ struct SuperStreamConsumerInternal {
2324
closed: Arc<AtomicBool>,
2425
handlers: Vec<ConsumerHandle>,
2526
waker: AtomicWaker,
27+
client: Client,
2628
}
2729

2830
/// Builder for [`Consumer`]
@@ -75,6 +77,7 @@ impl SuperStreamConsumerBuilder {
7577
closed: Arc::new(AtomicBool::new(false)),
7678
handlers,
7779
waker: AtomicWaker::new(),
80+
client,
7881
};
7982

8083
Ok(SuperStreamConsumer {
@@ -135,6 +138,7 @@ impl SuperStreamConsumerHandle {
135138
for handle in &self.0.handlers {
136139
handle.internal_close().await.unwrap();
137140
}
141+
self.0.client.close().await?;
138142
Ok(())
139143
}
140144
_ => Err(ConsumerCloseError::AlreadyClosed),

src/superstream_producer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ impl<T> SuperStreamProducerBuilder<T> {
117117

118118
let super_stream_metadata = DefaultSuperStreamMetadata {
119119
super_stream: super_stream.to_string(),
120-
client: self.environment.create_client().await?,
120+
client: client.clone(),
121121
partitions: Vec::new(),
122122
routes: Vec::new(),
123123
};

0 commit comments

Comments
 (0)