Skip to content

Commit 9f5c488

Browse files
Implement super_stream (#232)
* WIP: implementing create and delete superstream commands * implementing route and partition commands * refactoring tests * super_stream implementation * superstream producer implementation * super_stream_producer first basic version * finalizing superstream_producer and tests * better implementation/refactoring * making Messages as references to allow borrowing * implementing super_stream consumer * some refactoring * fixing super_stream_consumer test * fix clippy issue * adding, stream method to consumer delivery struct and adding examples * fixing offset in SuperstreamConsumer * super_stream_consumer new approach * implementing close() * implement filtering on super_stream * adding super_stream_filtering tests * error handling improvement in super_stream send * improve super_stream send example * add information to the example Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> * small performance improvement in HashRoutingMurmurStrategy * add information to the example Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> * close super stream consumer Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> * fix bug in super_stream_producer client created twice super_stream_consumer super_stream locator not closed * merging * updating README.md --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Co-authored-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent f2aca24 commit 9f5c488

15 files changed

+1030
-27
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ thiserror = "1.0"
3636
async-trait = "0.1.51"
3737
rand = "0.8"
3838
dashmap = "5.3.4"
39+
murmur3 = "0.5.2"
3940

4041
[dev-dependencies]
4142
tracing-subscriber = "0.3.1"

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,22 @@ sleep(Duration::from_secs(10)).await;
136136
handle.close().await?;
137137
```
138138

139+
### Superstreams
140+
141+
The client supports the superstream functionality.
142+
143+
A super stream is a logical stream made of individual, regular streams. It is a way to scale out publishing and consuming with RabbitMQ Streams: a large logical stream is divided into partition streams, splitting up the storage and the traffic on several cluster nodes.
144+
145+
See the [blog post](https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-super-streams/) for more info.
146+
147+
You can use SuperStreamProducer and SuperStreamConsumer classes which internally uses producers and consumers to operate on the componsing streams.
148+
149+
Have a look to the examples to see on how to work with super streams.
150+
151+
See the [Super Stream Producer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/examples/send_super_stream.rs)
152+
153+
See the [Super Stream Consumer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/examples/receive_super_stream.rs)
154+
139155
### Development
140156

141157
#### Compiling

examples/receive_super_stream.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
use futures::StreamExt;
2+
use rabbitmq_stream_client::error::StreamCreateError;
3+
use rabbitmq_stream_client::types::{
4+
ByteCapacity, OffsetSpecification, ResponseCode, SuperStreamConsumer,
5+
};
6+
7+
#[tokio::main]
8+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
9+
use rabbitmq_stream_client::Environment;
10+
let environment = Environment::builder().build().await?;
11+
let message_count = 100_000;
12+
let super_stream = "hello-rust-super-stream";
13+
14+
let create_response = environment
15+
.stream_creator()
16+
.max_length(ByteCapacity::GB(5))
17+
.create_super_stream(super_stream, 3, None)
18+
.await;
19+
20+
if let Err(e) = create_response {
21+
if let StreamCreateError::Create { stream, status } = e {
22+
match status {
23+
// we can ignore this error because the stream already exists
24+
ResponseCode::StreamAlreadyExists => {}
25+
err => {
26+
println!("Error creating stream: {:?} {:?}", stream, err);
27+
}
28+
}
29+
}
30+
}
31+
println!(
32+
"Super stream consumer example, consuming messages from the super stream {}",
33+
super_stream
34+
);
35+
let mut super_stream_consumer: SuperStreamConsumer = environment
36+
.super_stream_consumer()
37+
.offset(OffsetSpecification::First)
38+
.build(super_stream)
39+
.await
40+
.unwrap();
41+
42+
for _ in 0..message_count {
43+
let delivery = super_stream_consumer.next().await.unwrap();
44+
{
45+
let delivery = delivery.unwrap();
46+
println!(
47+
"Got message: {:#?} from stream: {} with offset: {}",
48+
delivery
49+
.message()
50+
.data()
51+
.map(|data| String::from_utf8(data.to_vec()).unwrap())
52+
.unwrap(),
53+
delivery.stream(),
54+
delivery.offset()
55+
);
56+
}
57+
}
58+
59+
println!("Stopping super stream consumer...");
60+
let _ = super_stream_consumer.handle().close().await;
61+
println!("Super stream consumer stopped");
62+
Ok(())
63+
}

examples/send_super_stream.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
use rabbitmq_stream_client::error::StreamCreateError;
2+
use rabbitmq_stream_client::types::{
3+
ByteCapacity, HashRoutingMurmurStrategy, Message, ResponseCode, RoutingStrategy,
4+
};
5+
use std::convert::TryInto;
6+
use std::sync::atomic::{AtomicU32, Ordering};
7+
use std::sync::Arc;
8+
use tokio::sync::Notify;
9+
10+
fn hash_strategy_value_extractor(message: &Message) -> String {
11+
message
12+
.application_properties()
13+
.unwrap()
14+
.get("id")
15+
.unwrap()
16+
.clone()
17+
.try_into()
18+
.unwrap()
19+
}
20+
21+
#[tokio::main]
22+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
23+
use rabbitmq_stream_client::Environment;
24+
let environment = Environment::builder().build().await?;
25+
let message_count = 100_000;
26+
let super_stream = "hello-rust-super-stream";
27+
let confirmed_messages = Arc::new(AtomicU32::new(0));
28+
let notify_on_send = Arc::new(Notify::new());
29+
let _ = environment
30+
.stream_creator()
31+
.max_length(ByteCapacity::GB(5))
32+
.create_super_stream(super_stream, 3, None)
33+
.await;
34+
35+
let delete_stream = environment.delete_super_stream(super_stream).await;
36+
37+
match delete_stream {
38+
Ok(_) => {
39+
println!("Successfully deleted super stream {}", super_stream);
40+
}
41+
Err(err) => {
42+
println!(
43+
"Failed to delete super stream {}. error {}",
44+
super_stream, err
45+
);
46+
}
47+
}
48+
49+
let create_response = environment
50+
.stream_creator()
51+
.max_length(ByteCapacity::GB(5))
52+
.create_super_stream(super_stream, 3, None)
53+
.await;
54+
55+
if let Err(e) = create_response {
56+
if let StreamCreateError::Create { stream, status } = e {
57+
match status {
58+
// we can ignore this error because the stream already exists
59+
ResponseCode::StreamAlreadyExists => {}
60+
err => {
61+
println!("Error creating stream: {:?} {:?}", stream, err);
62+
}
63+
}
64+
}
65+
}
66+
println!(
67+
"Super stream example. Sending {} messages to the super stream: {}",
68+
message_count, super_stream
69+
);
70+
let mut super_stream_producer = environment
71+
.super_stream_producer(RoutingStrategy::HashRoutingStrategy(
72+
HashRoutingMurmurStrategy {
73+
routing_extractor: &hash_strategy_value_extractor,
74+
},
75+
))
76+
.build(super_stream)
77+
.await
78+
.unwrap();
79+
80+
for i in 0..message_count {
81+
let counter = confirmed_messages.clone();
82+
let notifier = notify_on_send.clone();
83+
let msg = Message::builder()
84+
.body(format!("super stream message_{}", i))
85+
.application_properties()
86+
.insert("id", i.to_string())
87+
.message_builder()
88+
.build();
89+
super_stream_producer
90+
.send(msg, move |_| {
91+
let inner_counter = counter.clone();
92+
let inner_notifier = notifier.clone();
93+
async move {
94+
if inner_counter.fetch_add(1, Ordering::Relaxed) == message_count - 1 {
95+
inner_notifier.notify_one();
96+
}
97+
}
98+
})
99+
.await
100+
.unwrap();
101+
}
102+
103+
notify_on_send.notified().await;
104+
println!(
105+
"Successfully sent {} messages to the super stream {}",
106+
message_count, super_stream
107+
);
108+
let _ = super_stream_producer.close().await;
109+
Ok(())
110+
}

src/consumer.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,10 @@ pub struct ConsumerHandle(Arc<ConsumerInternal>);
263263
impl ConsumerHandle {
264264
/// Close the [`Consumer`] associated to this handle
265265
pub async fn close(self) -> Result<(), ConsumerCloseError> {
266+
self.internal_close().await
267+
}
268+
269+
pub(crate) async fn internal_close(&self) -> Result<(), ConsumerCloseError> {
266270
match self.0.closed.compare_exchange(false, true, SeqCst, SeqCst) {
267271
Ok(false) => {
268272
let response = self.0.client.unsubscribe(self.0.subscription_id).await?;
@@ -327,6 +331,7 @@ impl MessageHandler for ConsumerMessageHandler {
327331
.0
328332
.sender
329333
.send(Ok(Delivery {
334+
stream: self.0.stream.clone(),
330335
subscription_id: self.0.subscription_id,
331336
message,
332337
offset,
@@ -355,6 +360,7 @@ impl MessageHandler for ConsumerMessageHandler {
355360
/// Envelope from incoming message
356361
#[derive(Debug)]
357362
pub struct Delivery {
363+
stream: String,
358364
subscription_id: u8,
359365
message: Message,
360366
offset: u64,
@@ -366,6 +372,11 @@ impl Delivery {
366372
self.subscription_id
367373
}
368374

375+
/// Get a reference to the delivery's stream name.
376+
pub fn stream(&self) -> &String {
377+
&self.stream
378+
}
379+
369380
/// Get a reference to the delivery's message.
370381
pub fn message(&self) -> &Message {
371382
&self.message

src/environment.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ use crate::{
1111
error::StreamDeleteError,
1212
producer::ProducerBuilder,
1313
stream_creator::StreamCreator,
14+
superstream::RoutingStrategy,
15+
superstream_consumer::SuperStreamConsumerBuilder,
16+
superstream_producer::SuperStreamProducerBuilder,
1417
RabbitMQStreamResult,
1518
};
1619

@@ -49,6 +52,18 @@ impl Environment {
4952
}
5053
}
5154

55+
pub fn super_stream_producer(
56+
&self,
57+
routing_strategy: RoutingStrategy,
58+
) -> SuperStreamProducerBuilder<NoDedup> {
59+
SuperStreamProducerBuilder {
60+
environment: self.clone(),
61+
data: PhantomData,
62+
filter_value_extractor: None,
63+
route_strategy: routing_strategy,
64+
}
65+
}
66+
5267
/// Returns a builder for creating a consumer
5368
pub fn consumer(&self) -> ConsumerBuilder {
5469
ConsumerBuilder {
@@ -58,6 +73,15 @@ impl Environment {
5873
filter_configuration: None,
5974
}
6075
}
76+
77+
pub fn super_stream_consumer(&self) -> SuperStreamConsumerBuilder {
78+
SuperStreamConsumerBuilder {
79+
environment: self.clone(),
80+
offset_specification: OffsetSpecification::Next,
81+
filter_configuration: None,
82+
}
83+
}
84+
6185
pub(crate) async fn create_client(&self) -> RabbitMQStreamResult<Client> {
6286
Client::connect(self.options.client_options.clone()).await
6387
}

src/error.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,26 @@ pub enum ProducerPublishError {
110110
#[error(transparent)]
111111
Client(#[from] ClientError),
112112
}
113+
114+
#[derive(Error, Debug)]
115+
pub enum SuperStreamProducerPublishError {
116+
#[error("Failed to send message to stream")]
117+
ProducerPublishError(),
118+
#[error("Failed to create a producer")]
119+
ProducerCreateError(),
120+
}
121+
122+
impl From<ProducerPublishError> for SuperStreamProducerPublishError {
123+
fn from(_err: ProducerPublishError) -> Self {
124+
SuperStreamProducerPublishError::ProducerPublishError()
125+
}
126+
}
127+
impl From<ProducerCreateError> for SuperStreamProducerPublishError {
128+
fn from(_err: ProducerCreateError) -> Self {
129+
SuperStreamProducerPublishError::ProducerCreateError()
130+
}
131+
}
132+
113133
#[derive(Error, Debug)]
114134
pub enum ProducerCloseError {
115135
#[error("Failed to close producer for stream {stream} status {status:?}")]

src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ pub mod error;
7979
mod offset_specification;
8080
mod producer;
8181
mod stream_creator;
82+
mod superstream;
83+
mod superstream_consumer;
84+
mod superstream_producer;
8285

8386
pub type RabbitMQStreamResult<T> = Result<T, error::ClientError>;
8487

@@ -94,6 +97,10 @@ pub mod types {
9497
pub use crate::consumer::Delivery;
9598
pub use crate::offset_specification::OffsetSpecification;
9699
pub use crate::stream_creator::StreamCreator;
100+
pub use crate::superstream::HashRoutingMurmurStrategy;
101+
pub use crate::superstream::RoutingKeyRoutingStrategy;
102+
pub use crate::superstream::RoutingStrategy;
103+
pub use crate::superstream_consumer::SuperStreamConsumer;
97104
pub use rabbitmq_stream_protocol::message::Message;
98105
pub use rabbitmq_stream_protocol::{Response, ResponseCode, ResponseKind};
99106

src/producer.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,14 @@ impl<T> ProducerBuilder<T> {
245245
self.filter_value_extractor = Some(f);
246246
self
247247
}
248+
249+
pub fn filter_value_extractor_arc(
250+
mut self,
251+
filter_value_extractor: Option<FilterValueExtractor>,
252+
) -> Self {
253+
self.filter_value_extractor = filter_value_extractor;
254+
self
255+
}
248256
}
249257

250258
pub struct MessageAccumulator {

0 commit comments

Comments
 (0)