Skip to content

Commit e969217

Browse files
committed
super_stream_consumer new approach
1 parent e590d11 commit e969217

File tree

4 files changed

+50
-43
lines changed

4 files changed

+50
-43
lines changed

src/consumer.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66
AtomicBool,
77
Ordering::{Relaxed, SeqCst},
88
},
9-
Arc, Mutex,
9+
Arc,
1010
},
1111
task::{Context, Poll},
1212
};
@@ -32,11 +32,10 @@ use rand::{seq::SliceRandom, SeedableRng};
3232
type FilterPredicate = Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>;
3333

3434
/// API for consuming RabbitMQ stream messages
35-
#[derive(Clone)]
3635
pub struct Consumer {
3736
// Mandatory in case of manual offset tracking
3837
name: Option<String>,
39-
receiver: Arc<Mutex<Receiver<Result<Delivery, ConsumerDeliveryError>>>>,
38+
receiver: Receiver<Result<Delivery, ConsumerDeliveryError>>,
4039
internal: Arc<ConsumerInternal>,
4140
}
4241

@@ -181,7 +180,7 @@ impl ConsumerBuilder {
181180
if response.is_ok() {
182181
Ok(Consumer {
183182
name: self.consumer_name,
184-
receiver: Arc::new(Mutex::new(rx)),
183+
receiver: rx,
185184
internal: consumer,
186185
})
187186
} else {
@@ -251,9 +250,9 @@ impl Consumer {
251250
impl Stream for Consumer {
252251
type Item = Result<Delivery, ConsumerDeliveryError>;
253252

254-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
253+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
255254
self.internal.waker.register(cx.waker());
256-
let poll = Pin::new(&mut self.receiver.lock().unwrap()).poll_recv(cx);
255+
let poll = Pin::new(&mut self.receiver).poll_recv(cx);
257256
match (self.is_closed(), poll.is_ready()) {
258257
(true, false) => Poll::Ready(None),
259258
_ => poll,

src/superstream.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ pub struct DefaultSuperStreamMetadata {
1515
impl DefaultSuperStreamMetadata {
1616
pub async fn partitions(&mut self) -> Vec<String> {
1717
if self.partitions.is_empty() {
18-
println!("partition len is 0");
1918
let response = self.client.partitions(self.super_stream.clone()).await;
2019

2120
self.partitions = response.unwrap().streams;
@@ -64,7 +63,6 @@ impl HashRoutingMurmurStrategy {
6463
message: &Message,
6564
metadata: &mut DefaultSuperStreamMetadata,
6665
) -> Vec<String> {
67-
println!("im in routes");
6866
let mut streams: Vec<String> = Vec::new();
6967

7068
let key = (self.routing_extractor)(message);

src/superstream_consumer.rs

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
1+
use crate::consumer::Delivery;
2+
use crate::error::ConsumerDeliveryError;
13
use crate::superstream::DefaultSuperStreamMetadata;
2-
use crate::{error::ConsumerCreateError, Consumer, Environment};
4+
use crate::{error::ConsumerCreateError, Environment};
5+
use futures::{Stream, StreamExt};
36
use rabbitmq_stream_protocol::commands::subscribe::OffsetSpecification;
4-
use std::sync::Arc;
7+
use std::pin::Pin;
8+
use std::task::{Context, Poll};
9+
use tokio::sync::mpsc::{channel, Receiver};
10+
use tokio::task;
11+
512
//type FilterPredicate = Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>;
613

714
/// API for consuming RabbitMQ stream messages
8-
#[derive(Clone)]
915
pub struct SuperStreamConsumer {
10-
internal: Arc<SuperStreamConsumerInternal>,
16+
internal: SuperStreamConsumerInternal,
1117
}
1218

1319
struct SuperStreamConsumerInternal {
14-
consumers: Vec<Consumer>,
20+
receiver: Receiver<Result<Delivery, ConsumerDeliveryError>>,
1521
}
1622

1723
/// Builder for [`Consumer`]
@@ -28,6 +34,7 @@ impl SuperStreamConsumerBuilder {
2834
// Connect to the user specified node first, then look for a random replica to connect to instead.
2935
// This is recommended for load balancing purposes.
3036
let client = self.environment.create_client().await?;
37+
let (tx, rx) = channel(10000);
3138

3239
let mut super_stream_metadata = DefaultSuperStreamMetadata {
3340
super_stream: super_stream.to_string(),
@@ -36,21 +43,25 @@ impl SuperStreamConsumerBuilder {
3643
routes: Vec::new(),
3744
};
3845
let partitions = super_stream_metadata.partitions().await;
39-
let mut consumers: Vec<Consumer> = Vec::new();
4046

4147
for partition in partitions.into_iter() {
42-
let consumer = self
48+
let tx_cloned = tx.clone();
49+
let mut consumer = self
4350
.environment
4451
.consumer()
4552
.offset(self.offset_specification.clone())
4653
.build(partition.as_str())
4754
.await
4855
.unwrap();
4956

50-
consumers.push(consumer);
57+
task::spawn(async move {
58+
while let Some(d) = consumer.next().await {
59+
_ = tx_cloned.send(d).await;
60+
}
61+
});
5162
}
5263

53-
let super_stream_consumer_internal = Arc::new(SuperStreamConsumerInternal { consumers });
64+
let super_stream_consumer_internal = SuperStreamConsumerInternal { receiver: rx };
5465

5566
Ok(SuperStreamConsumer {
5667
internal: super_stream_consumer_internal,
@@ -63,12 +74,10 @@ impl SuperStreamConsumerBuilder {
6374
}
6475
}
6576

66-
impl SuperStreamConsumer {
67-
pub async fn get_consumer(&self, i: usize) -> &Consumer {
68-
return self.internal.consumers.get(i).unwrap();
69-
}
77+
impl Stream for SuperStreamConsumer {
78+
type Item = Result<Delivery, ConsumerDeliveryError>;
7079

71-
pub async fn get_consumers(&mut self) -> Vec<Consumer> {
72-
self.internal.consumers.clone()
80+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
81+
Pin::new(&mut self.internal.receiver).poll_recv(cx)
7382
}
7483
}

tests/integration/consumer_test.rs

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ async fn super_stream_consumer_test() {
8282
let mut super_stream_consumer: SuperStreamConsumer = env
8383
.env
8484
.super_stream_consumer()
85-
//.offset(OffsetSpecification::Next)
85+
.offset(OffsetSpecification::First)
8686
.build(&env.super_stream)
8787
.await
8888
.unwrap();
@@ -97,27 +97,28 @@ async fn super_stream_consumer_test() {
9797
.unwrap();
9898
}
9999

100-
let received_messages = Arc::new(AtomicU32::new(0));
101-
102-
for mut consumer in super_stream_consumer.get_consumers().await.into_iter() {
103-
let received_messages_outer = received_messages.clone();
104-
105-
task::spawn(async move {
106-
let mut inner_received_messages = received_messages_outer.clone();
107-
while let _ = consumer.next().await.unwrap() {
108-
let value = inner_received_messages.fetch_add(1, Ordering::Relaxed);
109-
if value == message_count {
110-
let handle = consumer.handle();
111-
_ = handle.close().await;
112-
break;
113-
}
114-
}
115-
});
116-
}
100+
let mut received_messages = 0;
117101

118-
sleep(Duration::from_millis(1000)).await;
102+
println!("before looping");
103+
while let delivery = super_stream_consumer.next().await.unwrap() {
104+
println!("inside while delivery loop");
105+
let d = delivery.unwrap();
106+
println!(
107+
"Got message: {:#?} from stream: {} with offset: {}",
108+
d.message()
109+
.data()
110+
.map(|data| String::from_utf8(data.to_vec()).unwrap()),
111+
d.stream(),
112+
d.offset()
113+
);
114+
115+
received_messages = received_messages + 1;
116+
if received_messages == 10 {
117+
break;
118+
}
119+
}
119120

120-
assert!(received_messages.fetch_add(1, Ordering::Relaxed) == message_count);
121+
assert!(received_messages == message_count);
121122

122123
super_stream_producer.close().await.unwrap();
123124
}

0 commit comments

Comments
 (0)