Skip to content

Commit 6dc32c0

Browse files
committed
example work in progress
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 4dc096a commit 6dc32c0

File tree

7 files changed

+268
-29
lines changed

7 files changed

+268
-29
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ readme = "README.md"
1616
members = [
1717
".",
1818
"protocol",
19-
"benchmark"
19+
"benchmark",
20+
"examples/super_stream_single_active_consumer", "examples/test_ss",
2021
]
2122

2223

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[package]
2+
name = "single_active_consumer"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
futures = "0.3.31"
8+
tokio = "1.41.1"
9+
rabbitmq-stream-client = { path = "../../" }
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
Super stream example
2+
---
3+
4+
[Super Streams Documentation](https://www.rabbitmq.com/streams.html#super-streams) for more details.
5+
[Super Streams blog post](https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-super-streams)
6+
7+
8+
This example shows how to use the Super Stream feature in RabbitMQ 3.11.0.
9+
10+
Then run the producer in one terminal:
11+
12+
$ cargo run -- --producer
13+
14+
15+
And the consumer in another terminal:
16+
17+
$ cargo run -- --consumer
18+
19+
You should see the consumer receiving the messages from the producer.
20+
21+
It would be something like:
22+
```bash
23+
$ cargo run -- --producer
24+
Starting SuperStream Producer
25+
Super Stream Producer connected to RabbitMQ
26+
Super Stream Producer sent 0 messages to invoices
27+
Super Stream Producer sent 1 messages to invoices
28+
Super Stream Producer sent 2 messages to invoices
29+
Super Stream Producer sent 3 messages to invoices
30+
```
31+
32+
```bash
33+
$ dotnet run --consumer my_first_consumer
34+
Starting SuperStream Consumer my_first_consumer
35+
Super Stream Consumer connected to RabbitMQ. ConsumerName my_first_consumer
36+
Consumer Name my_first_consumer -Received message id: hello0 body: hello0, Stream invoices-2
37+
Consumer Name my_first_consumer -Received message id: hello1 body: hello1, Stream invoices-1
38+
Consumer Name my_first_consumer -Received message id: hello2 body: hello2, Stream invoices-0
39+
Consumer Name my_first_consumer -Received message id: hello3 body: hello3, Stream invoices-1
40+
Consumer Name my_first_consumer -Received message id: hello4 body: hello4, Stream invoices-2
41+
Consumer Name my_first_consumer -Received message id: hello5 body: hello5, Stream invoices-0
42+
Consumer Name my_first_consumer -Received message id: hello6 body: hello6, Stream invoices-2
43+
Consumer Name my_first_consumer -Received message id: hello7 body: hello7, Stream invoices-0
44+
Consumer Name my_first_consumer -Received message id: hello8 body: hello8, Stream invoices-1
45+
Consumer Name my_first_consumer -Received message id: hello9 body: hello9, Stream invoices-0
46+
```
47+
48+
49+
To see the Single active consumer in action, run another consumer:
50+
51+
$ dotnet run --consumer my_second_consumer
52+
53+
You should see the second consumer receiving the part of the messages from the producer. In thi case only the messages coming from the `invoices-1`.
54+
55+
It should be something like:
56+
```bash
57+
$ dotnet run --consumer my_second_consumer
58+
Starting SuperStream Consumer my_second_consumer
59+
Super Stream Consumer connected to RabbitMQ. ConsumerName my_second_consumer
60+
Consumer Name my_second_consumer -Received message id: hello1 body: hello1, Stream invoices-1
61+
Consumer Name my_second_consumer -Received message id: hello3 body: hello3, Stream invoices-1
62+
Consumer Name my_second_consumer -Received message id: hello8 body: hello8, Stream invoices-1
63+
Consumer Name my_second_consumer -Received message id: hello14 body: hello14, Stream invoices-1
64+
Consumer Name my_second_consumer -Received message id: hello15 body: hello15, Stream invoices-1
65+
Consumer Name my_second_consumer -Received message id: hello16 body: hello16, Stream invoices-1
66+
Consumer Name my_second_consumer -Received message id: hello19 body: hello19, Stream invoices-1
67+
```
68+
and the first consumer should be receiving the rest of the messages:
69+
```bash
70+
Consumer Name my_first_consumer -Received message id: hello0 body: hello0, Stream invoices-2
71+
Consumer Name my_first_consumer -Received message id: hello2 body: hello2, Stream invoices-0
72+
Consumer Name my_first_consumer -Received message id: hello4 body: hello4, Stream invoices-2
73+
Consumer Name my_first_consumer -Received message id: hello5 body: hello5, Stream invoices-0
74+
Consumer Name my_first_consumer -Received message id: hello6 body: hello6, Stream invoices-2
75+
Consumer Name my_first_consumer -Received message id: hello7 body: hello7, Stream invoices-0
76+
Consumer Name my_first_consumer -Received message id: hello9 body: hello9, Stream invoices-0
77+
Consumer Name my_first_consumer -Received message id: hello10 body: hello10, Stream invoices-2
78+
Consumer Name my_first_consumer -Received message id: hello11 body: hello11, Stream invoices-0
79+
```
80+
81+
82+
83+
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
mod send_super_stream;
2+
mod single_active_consumer_super_stream;
3+
4+
use std::env;
5+
6+
static SUPER_STREAM: &str = "invoices";
7+
8+
#[tokio::main]
9+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
10+
let mut args = env::args().skip(1);
11+
while let Some(arg) = args.next() {
12+
match &arg[..] {
13+
"-h" | "--help" => help(),
14+
"--consumer" => {
15+
single_active_consumer_super_stream::start_consumer().await?;
16+
}
17+
18+
"--producer" => send_super_stream::start_producer().await?,
19+
20+
arg if arg.starts_with("-") => {
21+
eprintln!("Unknown argument: {}", arg);
22+
}
23+
24+
_ => {
25+
eprintln!("Unknown argument: {}", arg);
26+
help();
27+
}
28+
}
29+
}
30+
Ok(())
31+
}
32+
33+
fn help() {
34+
println!("--consumer or --producer")
35+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
use rabbitmq_stream_client::error::StreamCreateError;
2+
use rabbitmq_stream_client::types::{
3+
ByteCapacity, HashRoutingMurmurStrategy, Message, ResponseCode, RoutingStrategy,
4+
};
5+
use std::convert::TryInto;
6+
use std::sync::atomic::{AtomicU32, Ordering};
7+
use std::sync::Arc;
8+
use tokio::sync::Notify;
9+
use tokio::time;
10+
11+
fn hash_strategy_value_extractor(message: &Message) -> String {
12+
message
13+
.application_properties()
14+
.unwrap()
15+
.get("id")
16+
.unwrap()
17+
.clone()
18+
.try_into()
19+
.unwrap()
20+
}
21+
22+
pub async fn start_producer() -> Result<(), Box<dyn std::error::Error>> {
23+
use rabbitmq_stream_client::Environment;
24+
let environment = Environment::builder().build().await?;
25+
let confirmed_messages = Arc::new(AtomicU32::new(0));
26+
let notify_on_send = Arc::new(Notify::new());
27+
let _ = environment
28+
.stream_creator()
29+
.max_length(ByteCapacity::GB(5))
30+
.create_super_stream(crate::SUPER_STREAM, 3, None)
31+
.await;
32+
33+
let create_response = environment
34+
.stream_creator()
35+
.max_length(ByteCapacity::GB(5))
36+
.create_super_stream(crate::SUPER_STREAM, 3, None)
37+
.await;
38+
39+
if let Err(e) = create_response {
40+
if let StreamCreateError::Create { stream, status } = e {
41+
match status {
42+
// we can ignore this error because the stream already exists
43+
ResponseCode::StreamAlreadyExists => {}
44+
err => {
45+
println!(
46+
"[Super Stream Producer] Error creating stream: {:?} {:?}",
47+
stream, err
48+
);
49+
}
50+
}
51+
}
52+
}
53+
println!(
54+
"[Super Stream Producer] Sending messages to the super stream: {}",
55+
crate::SUPER_STREAM
56+
);
57+
let super_stream_producer = environment
58+
.super_stream_producer(RoutingStrategy::HashRoutingStrategy(
59+
HashRoutingMurmurStrategy {
60+
routing_extractor: &hash_strategy_value_extractor,
61+
},
62+
))
63+
.client_provided_name("rust stream producer - sac example")
64+
.build(crate::SUPER_STREAM)
65+
.await;
66+
67+
match super_stream_producer {
68+
Ok(mut producer) => {
69+
println!("[Super Stream Producer] Successfully created super stream producer");
70+
let mut idx = 0;
71+
loop {
72+
let counter = confirmed_messages.clone();
73+
let notifier = notify_on_send.clone();
74+
let msg = Message::builder()
75+
.body(format!("super stream message_{}", idx))
76+
.application_properties()
77+
.insert("id", idx.to_string())
78+
.message_builder()
79+
.build();
80+
println!("[Super Stream Producer] Sending message {}", idx);
81+
let send_result = producer
82+
.send(msg, move |_| {
83+
let inner_counter = counter.clone();
84+
let inner_notifier = notifier.clone();
85+
async move {
86+
if inner_counter.fetch_add(1, Ordering::Relaxed) == idx - 1 {
87+
inner_notifier.notify_one();
88+
}
89+
}
90+
})
91+
.await;
92+
93+
match send_result {
94+
Ok(_) => {
95+
idx += 1;
96+
println!("[Super Stream Producer] Message {} sent", idx);
97+
}
98+
Err(err) => {
99+
println!(
100+
"[Super Stream Producer] Failed to send message. error: {}",
101+
err
102+
);
103+
}
104+
}
105+
106+
time::sleep(time::Duration::from_millis(1_000)).await;
107+
}
108+
}
109+
Err(err) => {
110+
println!("Failed to create super stream producer. error {}", err);
111+
Ok(())
112+
}
113+
}
114+
}

examples/single_active_consumer/single_active_consumer_super_stream.rs renamed to examples/super_stream_single_active_consumer/src/single_active_consumer_super_stream.rs

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,15 @@ use rabbitmq_stream_client::error::StreamCreateError;
33
use rabbitmq_stream_client::types::{
44
ByteCapacity, OffsetSpecification, ResponseCode, SuperStreamConsumer,
55
};
6-
use std::collections::HashMap;
76

8-
#[tokio::main]
9-
async fn main() -> Result<(), Box<dyn std::error::Error>> {
7+
pub async fn start_consumer() -> Result<(), Box<dyn std::error::Error>> {
108
use rabbitmq_stream_client::Environment;
119
let environment = Environment::builder().build().await?;
12-
let message_count = 1000000;
13-
let super_stream = "hello-rust-super-stream";
1410

1511
let create_response = environment
1612
.stream_creator()
1713
.max_length(ByteCapacity::GB(5))
18-
.create_super_stream(super_stream, 3, None)
14+
.create_super_stream(crate::SUPER_STREAM, 3, None)
1915
.await;
2016

2117
if let Err(e) = create_response {
@@ -31,7 +27,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3127
}
3228
println!(
3329
"Super stream consumer example, consuming messages from the super stream {}",
34-
super_stream
30+
crate::SUPER_STREAM
3531
);
3632

3733
let mut super_stream_consumer: SuperStreamConsumer = environment
@@ -41,7 +37,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4137
.offset(OffsetSpecification::First)
4238
.enable_single_active_consumer(true)
4339
.client_provided_name("my super stream consumer for hello rust")
44-
.consumer_update(move |active, message_context| async move {
40+
.consumer_update(move |active, message_context| async move {
4541
let name = message_context.name();
4642
let stream = message_context.stream();
4743
let client = message_context.client();
@@ -52,19 +48,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5248
);
5349
let stored_offset = client.query_offset(name, stream.as_str()).await;
5450

55-
if let Err(e) = stored_offset {
51+
if let Err(_) = stored_offset {
5652
return OffsetSpecification::First;
5753
}
5854
let stored_offset_u = stored_offset.unwrap();
59-
println!("stored_offset_u {}", stored_offset_u.clone());
55+
println!("offset: {} stored", stored_offset_u.clone());
6056
OffsetSpecification::Offset(stored_offset_u)
61-
6257
})
63-
.build(super_stream)
58+
.build(crate::SUPER_STREAM)
6459
.await
6560
.unwrap();
6661

67-
for _ in 0..message_count {
62+
loop {
6863
let delivery = super_stream_consumer.next().await.unwrap();
6964
{
7065
let delivery = delivery.unwrap();
@@ -80,15 +75,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
8075
);
8176

8277
// Store an offset for every consumer
83-
if delivery.consumer_name().is_some() && delivery.offset() == 1000 {
84-
super_stream_consumer.client().store_offset(delivery.consumer_name().unwrap().as_str(), delivery.stream().as_str(), delivery.offset()).await;
85-
}
86-
78+
// store the offset each time a message is consumed
79+
// that is not a best practice, but it is done here for demonstration purposes
80+
super_stream_consumer
81+
.client()
82+
.store_offset(
83+
delivery.consumer_name().unwrap().as_str(),
84+
delivery.stream().as_str(),
85+
delivery.offset(),
86+
)
87+
.await?;
8788
}
8889
}
89-
90-
println!("Stopping super stream consumer...");
91-
let _ = super_stream_consumer.handle().close().await;
92-
println!("Super stream consumer stopped");
93-
Ok(())
94-
}
90+
}

src/superstream_producer.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,13 @@ impl SuperStreamProducer<NoDedup> {
4545
&mut self,
4646
message: Message,
4747
cb: impl Fn(Result<ConfirmationStatus, ProducerPublishError>) -> Fut
48-
+ Send
49-
+ Sync
50-
+ 'static
51-
+ Clone,
48+
+ Send
49+
+ Sync
50+
+ 'static
51+
+ Clone,
5252
) -> Result<(), SuperStreamProducerPublishError>
5353
where
54-
Fut: Future<Output = ()> + Send + Sync + 'static,
54+
Fut: Future<Output=()> + Send + Sync + 'static,
5555
{
5656
let routes = match &self.4 {
5757
RoutingStrategy::HashRoutingStrategy(routing_strategy) => {
@@ -68,6 +68,7 @@ impl SuperStreamProducer<NoDedup> {
6868

6969
for route in routes.into_iter() {
7070
if !self.1.contains_key(route.as_str()) {
71+
println!("Sending message to route: {}", route);
7172
let producer = self
7273
.0
7374
.environment

0 commit comments

Comments
 (0)