Skip to content

Commit 116193f

Browse files
committed
few improvements and test for simple SAC
1 parent bb6ee6f commit 116193f

File tree

5 files changed

+183
-8
lines changed

5 files changed

+183
-8
lines changed

examples/single_active_consumer/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ This is an example to enable single active consumer functionality for superstrea
55
https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams
66
https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams
77

8-
This folder contains a super-stream consumer configured to enable it.
8+
This folder contains a consumer and a super-stream consumer configured to enable it.
99
You can use the example in the super-stream folder to produce messages for a super-stream.
1010

11-
You can then run the consumer in this folder.
11+
You can then run the single_active_consumer_super_stream.rs in this folder.
1212
Assuming the super-stream is composed by three streams, you can see that the Consumer will consume messages from all the streams part of the superstream.
1313

1414
You can then run another consumer in parallel.
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use futures::StreamExt;
2+
use rabbitmq_stream_client::error::StreamCreateError;
3+
use rabbitmq_stream_client::types::{
4+
ByteCapacity, OffsetSpecification, ResponseCode,
5+
};
6+
7+
8+
#[tokio::main]
9+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
10+
use rabbitmq_stream_client::Environment;
11+
let environment = Environment::builder().build().await?;
12+
let message_count = 1000000;
13+
let stream = "hello-rust-super-stream-0";
14+
15+
let create_response = environment
16+
.stream_creator()
17+
.max_length(ByteCapacity::GB(5))
18+
.create(stream)
19+
.await;
20+
21+
if let Err(e) = create_response {
22+
if let StreamCreateError::Create { stream, status } = e {
23+
match status {
24+
// we can ignore this error because the stream already exists
25+
ResponseCode::StreamAlreadyExists => {}
26+
err => {
27+
println!("Error creating stream: {:?} {:?}", stream, err);
28+
}
29+
}
30+
}
31+
}
32+
println!(
33+
"Super stream consumer example, consuming messages from the super stream {}",
34+
stream
35+
);
36+
37+
let mut consumer = environment
38+
.consumer()
39+
// Mandatory if sac is enabled
40+
.name("consumer-group-1")
41+
.offset(OffsetSpecification::First)
42+
.enable_single_active_consumer(true)
43+
.client_provided_name("my super stream consumer for hello rust")
44+
.consumer_update(move |active, message_context| async {
45+
let name = message_context.name();
46+
let stream = message_context.stream();
47+
let client = message_context.client();
48+
49+
let mut stored_offset = 0;
50+
println!("single active consumer: is active: {} on stream: {} with consumer_name: {}", active, stream, name);
51+
let my_async = async {
52+
println!("im hereXXXXXXXXXXX");
53+
stored_offset = client.query_offset(name, stream.as_str()).await.unwrap();
54+
println!("stored offset {}", stored_offset);
55+
//OffsetSpecification::Offset(stored_offset)
56+
57+
};
58+
println!("im hereXXXXXXXXXXX {}", stored_offset);
59+
my_async.await;
60+
OffsetSpecification::Offset(stored_offset)
61+
62+
})
63+
.build(stream)
64+
.await
65+
.unwrap();
66+
67+
for i in 0..message_count {
68+
let delivery = consumer.next().await.unwrap();
69+
{
70+
let delivery = delivery.unwrap();
71+
/*println!(
72+
"Got message: {:#?} from stream: {} with offset: {}",
73+
delivery
74+
.message()
75+
.data()
76+
.map(|data| String::from_utf8(data.to_vec()).unwrap())
77+
.unwrap(),
78+
delivery.stream(),
79+
delivery.offset()
80+
);*/
81+
82+
//store an offset
83+
if i == 10000 {
84+
let _ = consumer
85+
.store_offset(delivery.offset())
86+
.await
87+
.unwrap_or_else(|e| println!("Err: {}", e));
88+
}
89+
}
90+
}
91+
92+
println!("Stopping consumer...");
93+
let _ = consumer.handle().close().await;
94+
println!("consumer stopped");
95+
Ok(())
96+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use futures::StreamExt;
2+
use rabbitmq_stream_client::error::StreamCreateError;
3+
use rabbitmq_stream_client::types::{
4+
ByteCapacity, OffsetSpecification, ResponseCode,
5+
};
6+
7+
#[tokio::main]
8+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
9+
use rabbitmq_stream_client::Environment;
10+
let environment = Environment::builder().build().await?;
11+
let message_count = 1000000;
12+
let stream = "hello-rust-stream";
13+
14+
let create_response = environment
15+
.stream_creator()
16+
.max_length(ByteCapacity::GB(5))
17+
.create(stream)
18+
.await;
19+
20+
if let Err(e) = create_response {
21+
if let StreamCreateError::Create { stream, status } = e {
22+
match status {
23+
// we can ignore this error because the stream already exists
24+
ResponseCode::StreamAlreadyExists => {}
25+
err => {
26+
println!("Error creating stream: {:?} {:?}", stream, err);
27+
}
28+
}
29+
}
30+
}
31+
println!(
32+
"Super stream consumer example, consuming messages from the super stream {}",
33+
stream
34+
);
35+
36+
let mut consumer = environment
37+
.consumer()
38+
// Mandatory if sac is enabled
39+
.name("consumer-group-1")
40+
.offset(OffsetSpecification::First)
41+
.enable_single_active_consumer(true)
42+
.client_provided_name("my super stream consumer for hello rust")
43+
.consumer_update(move |active, message_context| {
44+
println!("single active consumer: is active: {} on stream {}", active, message_context.get_stream());
45+
OffsetSpecification::First
46+
})
47+
.build(stream)
48+
.await
49+
.unwrap();
50+
51+
for _ in 0..message_count {
52+
let delivery = consumer.next().await.unwrap();
53+
{
54+
let delivery = delivery.unwrap();
55+
println!(
56+
"Got message: {:#?} from stream: {} with offset: {}",
57+
delivery
58+
.message()
59+
.data()
60+
.map(|data| String::from_utf8(data.to_vec()).unwrap())
61+
.unwrap(),
62+
delivery.stream(),
63+
delivery.offset()
64+
);
65+
}
66+
}
67+
68+
println!("Stopping consumer...");
69+
let _ = consumer.handle().close().await;
70+
println!("consumer stopped");
71+
Ok(())
72+
}

src/consumer.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,19 +89,25 @@ impl FilterConfiguration {
8989
}
9090
}
9191

92+
#[derive(Clone)]
9293
pub struct MessageContext {
93-
consumer_name: Option<String>,
94+
name: String,
9495
stream: String,
96+
client: Client,
9597
}
9698

9799
impl MessageContext {
98-
pub fn get_name(&self) -> Option<String> {
99-
self.consumer_name.clone()
100+
pub fn name(&self) -> String {
101+
self.name.clone()
100102
}
101103

102-
pub fn get_stream(&self) -> String {
104+
pub fn stream(&self) -> String {
103105
self.stream.clone()
104106
}
107+
108+
pub fn client(&self) -> Client {
109+
self.client.clone()
110+
}
105111
}
106112

107113
/// Builder for [`Consumer`]
@@ -451,8 +457,9 @@ impl MessageHandler for ConsumerMessageHandler {
451457
// Otherwise the Offset specification is returned by the user callback
452458
let is_active = consumer_update.is_active();
453459
let message_context = MessageContext {
454-
consumer_name: self.0.name.clone(),
460+
name: self.0.name.clone().unwrap(),
455461
stream: self.0.stream.clone(),
462+
client: self.0.client.clone(),
456463
};
457464
let consumer_update_listener_callback =
458465
self.0.consumer_update_listener.clone().unwrap();

src/superstream_consumer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ impl SuperStreamConsumerBuilder {
131131
self
132132
}
133133

134-
pub fn consumer_update(
134+
pub fn consumer_update<Fut>(
135135
mut self,
136136
consumer_update_listener: impl Fn(u8, &MessageContext) -> OffsetSpecification
137137
+ Send

0 commit comments

Comments
 (0)