Skip to content

Commit 989be13

Browse files
committed
adding, stream method to consumer delivery struct and adding examples
1 parent ffa07fb commit 989be13

File tree

5 files changed

+162
-7
lines changed

5 files changed

+162
-7
lines changed

examples/receive_super_stream.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use futures::StreamExt;
2+
use rabbitmq_stream_client::error::StreamCreateError;
3+
use rabbitmq_stream_client::types::{
4+
ByteCapacity, OffsetSpecification, ResponseCode, SuperStreamConsumer,
5+
};
6+
use std::sync::atomic::{AtomicU32, Ordering};
7+
use std::sync::Arc;
8+
use std::time::Duration;
9+
use tokio::task;
10+
use tokio::time::sleep;
11+
12+
#[tokio::main]
13+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
14+
use rabbitmq_stream_client::Environment;
15+
let environment = Environment::builder().build().await?;
16+
let message_count = 10;
17+
let super_stream = "hello-rust-stream";
18+
19+
let create_response = environment
20+
.stream_creator()
21+
.max_length(ByteCapacity::GB(5))
22+
.create_super_stream(super_stream, 3, None)
23+
.await;
24+
25+
if let Err(e) = create_response {
26+
if let StreamCreateError::Create { stream, status } = e {
27+
match status {
28+
// we can ignore this error because the stream already exists
29+
ResponseCode::StreamAlreadyExists => {}
30+
err => {
31+
println!("Error creating stream: {:?} {:?}", stream, err);
32+
}
33+
}
34+
}
35+
}
36+
37+
let mut super_stream_consumer: SuperStreamConsumer = environment
38+
.super_stream_consumer()
39+
.offset(OffsetSpecification::First)
40+
.build(super_stream)
41+
.await
42+
.unwrap();
43+
44+
let received_messages = Arc::new(AtomicU32::new(0));
45+
46+
for mut consumer in super_stream_consumer.get_consumers().await.into_iter() {
47+
let received_messages_outer = received_messages.clone();
48+
49+
task::spawn(async move {
50+
let mut inner_received_messages = received_messages_outer.clone();
51+
while let Some(delivery) = consumer.next().await {
52+
let d = delivery.unwrap();
53+
println!(
54+
"Got message: {:#?} from stream: {} with offset: {}",
55+
d.message()
56+
.data()
57+
.map(|data| String::from_utf8(data.to_vec()).unwrap()),
58+
d.stream(),
59+
d.offset(),
60+
);
61+
let value = inner_received_messages.fetch_add(1, Ordering::Relaxed);
62+
if value == message_count {
63+
let handle = consumer.handle();
64+
_ = handle.close().await;
65+
break;
66+
}
67+
}
68+
});
69+
}
70+
71+
sleep(Duration::from_millis(20000)).await;
72+
73+
Ok(())
74+
}

examples/send_super_stream.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
use rabbitmq_stream_client::error::StreamCreateError;
2+
use rabbitmq_stream_client::types::{
3+
ByteCapacity, HashRoutingMurmurStrategy, Message, ResponseCode, RoutingStrategy,
4+
};
5+
use std::sync::atomic::{AtomicU32, Ordering};
6+
use std::sync::Arc;
7+
use tokio::sync::Notify;
8+
9+
fn hash_strategy_value_extractor(message: &Message) -> String {
10+
String::from_utf8(Vec::from(message.data().unwrap())).expect("Found invalid UTF-8")
11+
}
12+
13+
#[tokio::main]
14+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
15+
use rabbitmq_stream_client::Environment;
16+
let environment = Environment::builder().build().await?;
17+
let message_count = 100;
18+
let stream = "hello-rust-stream";
19+
let confirmed_messages = Arc::new(AtomicU32::new(0));
20+
let notify_on_send = Arc::new(Notify::new());
21+
22+
let _ = environment
23+
.stream_creator()
24+
.max_length(ByteCapacity::GB(5))
25+
.create_super_stream(stream, 3, None)
26+
.await;
27+
28+
let create_response = environment
29+
.stream_creator()
30+
.max_length(ByteCapacity::GB(5))
31+
.create_super_stream(stream, 3, None)
32+
.await;
33+
34+
if let Err(e) = create_response {
35+
if let StreamCreateError::Create { stream, status } = e {
36+
match status {
37+
// we can ignore this error because the stream already exists
38+
ResponseCode::StreamAlreadyExists => {}
39+
err => {
40+
println!("Error creating stream: {:?} {:?}", stream, err);
41+
}
42+
}
43+
}
44+
}
45+
46+
let mut super_stream_producer = environment
47+
.super_stream_producer(RoutingStrategy::HashRoutingStrategy(
48+
HashRoutingMurmurStrategy {
49+
routing_extractor: &hash_strategy_value_extractor,
50+
},
51+
))
52+
.build(stream)
53+
.await
54+
.unwrap();
55+
56+
for i in 0..message_count {
57+
println!("sending message {}", i);
58+
let counter = confirmed_messages.clone();
59+
let notifier = notify_on_send.clone();
60+
let msg = Message::builder().body(format!("message{}", i)).build();
61+
super_stream_producer
62+
.send(msg, move |_| {
63+
let inner_counter = counter.clone();
64+
let inner_notifier = notifier.clone();
65+
async move {
66+
if inner_counter.fetch_add(1, Ordering::Relaxed) == message_count - 1 {
67+
inner_notifier.notify_one();
68+
}
69+
}
70+
})
71+
.await
72+
.unwrap();
73+
}
74+
75+
notify_on_send.notified().await;
76+
let _ = super_stream_producer.close().await;
77+
Ok(())
78+
}

src/consumer.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ impl MessageHandler for ConsumerMessageHandler {
330330
.0
331331
.sender
332332
.send(Ok(Delivery {
333+
stream: self.0.stream.clone(),
333334
subscription_id: self.0.subscription_id,
334335
message,
335336
offset,
@@ -358,6 +359,7 @@ impl MessageHandler for ConsumerMessageHandler {
358359
/// Envelope from incoming message
359360
#[derive(Debug, Clone)]
360361
pub struct Delivery {
362+
stream: String,
361363
subscription_id: u8,
362364
message: Message,
363365
offset: u64,
@@ -369,6 +371,11 @@ impl Delivery {
369371
self.subscription_id
370372
}
371373

374+
/// Get a reference to the delivery's stream name.
375+
pub fn stream(&self) -> &String {
376+
&self.stream
377+
}
378+
372379
/// Get a reference to the delivery's message.
373380
pub fn message(&self) -> &Message {
374381
&self.message

src/superstream_consumer.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::superstream::DefaultSuperStreamMetadata;
2-
use crate::{error::ConsumerCreateError, Client, Consumer, Environment};
2+
use crate::{error::ConsumerCreateError, Consumer, Environment};
33
use rabbitmq_stream_protocol::commands::subscribe::OffsetSpecification;
44
use std::sync::Arc;
55
//type FilterPredicate = Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>;
@@ -11,8 +11,6 @@ pub struct SuperStreamConsumer {
1111
}
1212

1313
struct SuperStreamConsumerInternal {
14-
client: Client,
15-
super_stream: String,
1614
offset_specification: OffsetSpecification,
1715
consumers: Vec<Consumer>,
1816
}
@@ -54,8 +52,6 @@ impl SuperStreamConsumerBuilder {
5452
}
5553

5654
let super_stream_consumer_internal = Arc::new(SuperStreamConsumerInternal {
57-
super_stream: super_stream.to_string(),
58-
client: client.clone(),
5955
offset_specification: self.offset_specification.clone(),
6056
consumers,
6157
});
@@ -65,7 +61,7 @@ impl SuperStreamConsumerBuilder {
6561
})
6662
}
6763

68-
pub async fn offset(mut self, offset_specification: OffsetSpecification) -> Self {
64+
pub fn offset(mut self, offset_specification: OffsetSpecification) -> Self {
6965
self.offset_specification = offset_specification;
7066
self
7167
}

tests/integration/consumer_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ async fn super_stream_consumer_test() {
104104

105105
task::spawn(async move {
106106
let mut inner_received_messages = received_messages_outer.clone();
107-
while let d = consumer.next().await.unwrap() {
107+
while let _ = consumer.next().await.unwrap() {
108108
let value = inner_received_messages.fetch_add(1, Ordering::Relaxed);
109109
if value == message_count {
110110
let handle = consumer.handle();

0 commit comments

Comments
 (0)