Skip to content

Commit 7904411

Browse files
committed
Adding README
1 parent d1c21c5 commit 7904411

File tree

2 files changed

+25
-0
lines changed

2 files changed

+25
-0
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
Single active consumer
2+
---
3+
4+
This is an example to enable single active consumer functionality for superstream:
5+
https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams
6+
https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams
7+
8+
This folder contains a super-stream consumer configured to enable it.
9+
You can use the example in the super-stream folder to produce messages for a super-stream.
10+
11+
You can then run the consumer in this folder.
12+
Assuming the super-stream is composed by three streams, you can see that the Consumer will consume messages from all the streams part of the superstream.
13+
14+
You can then run another consumer in parallel.
15+
now you'll see that one of the two consumers will consume from 2 streams while the other on one stream.
16+
17+
If you run another you'll see that every Consumer will read from a single stream.
18+
19+
If you then stop one of the Consumer you'll notice that the related stream is now read from on the Consumer which is still running.
20+
21+
22+
23+

examples/single_active_consumer/single_active_consumer_super_stream.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4444
.super_stream_consumer()
4545
.offset(OffsetSpecification::First)
4646
.client_provided_name("my super stream consumer for hello rust")
47+
/*We can decide a strategy to manage Offset specification in single active consumer based on is_active flag
48+
By default if this clousure is not present the default strategy OffsetSpecification::NEXT will be set.*/
4749
.consumer_update(move |active, message_context| {
4850
println!("single active consumer: is active: {} on stream {}", active, message_context.get_stream());
4951
OffsetSpecification::First

0 commit comments

Comments
 (0)