diff --git a/README.md b/README.md index 7ddc14bd..a9cb4a6d 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,7 @@ See also a simple example here on how to consume from a stream: [Consuming messages from a stream example](./examples/simple-consumer.rs) + ## Super Stream The client supports the super-stream functionality. @@ -156,9 +157,17 @@ See the [blog post](https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-featur You can use SuperStreamProducer and SuperStreamConsumer classes which internally uses producers and consumers to operate on the componsing streams. +SuperstreamProducers can act in Hashing and Routing Key mode. + +See the Java documentation for more details (Same concepts apply here): + +[Super Stream Producer - Java doc](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#super-stream-producer) + Have a look to the examples to see on how to work with super streams. -See the [Super Stream Producer Example](./examples/superstreams/send_super_stream.rs) +See the [Super Stream Producer Example using Hashing mmh3 mode](./examples/superstreams/send_super_stream_hash.rs) + +See the [Super Stream Producer Example using Routing key mode](./examples/superstreams/send_super_stream_routing_key.rs) See the [Super Stream Consumer Example](./examples/superstreams/receive_super_stream.rs) diff --git a/examples/superstreams/send_super_stream_hash.rs b/examples/superstreams/send_super_stream_hash.rs new file mode 100644 index 00000000..a0318b90 --- /dev/null +++ b/examples/superstreams/send_super_stream_hash.rs @@ -0,0 +1,111 @@ +use rabbitmq_stream_client::error::StreamCreateError; +use rabbitmq_stream_client::types::{ + ByteCapacity, HashRoutingMurmurStrategy, Message, ResponseCode, RoutingStrategy, +}; +use std::convert::TryInto; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use tokio::sync::Notify; + +fn hash_strategy_value_extractor(message: &Message) -> String { + message + .application_properties() + .unwrap() + .get("id") + .unwrap() + .clone() + .try_into() + .unwrap() +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + use rabbitmq_stream_client::Environment; + let environment = Environment::builder().build().await?; + let message_count = 100_000; + let super_stream = "hello-rust-super-stream"; + let confirmed_messages = Arc::new(AtomicU32::new(0)); + let notify_on_send = Arc::new(Notify::new()); + let _ = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create_super_stream(super_stream, 3, None) + .await; + + let delete_stream = environment.delete_super_stream(super_stream).await; + + match delete_stream { + Ok(_) => { + println!("Successfully deleted super stream {}", super_stream); + } + Err(err) => { + println!( + "Failed to delete super stream {}. error {}", + super_stream, err + ); + } + } + + let create_response = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create_super_stream(super_stream, 3, None) + .await; + + if let Err(e) = create_response { + if let StreamCreateError::Create { stream, status } = e { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); + } + } + } + } + println!( + "Super stream example. Sending {} messages to the super stream: {}", + message_count, super_stream + ); + let mut super_stream_producer = environment + .super_stream_producer(RoutingStrategy::HashRoutingStrategy( + HashRoutingMurmurStrategy { + routing_extractor: &hash_strategy_value_extractor, + }, + )) + .client_provided_name("my super stream producer for hello rust") + .build(super_stream) + .await + .unwrap(); + + for i in 0..message_count { + let counter = confirmed_messages.clone(); + let notifier = notify_on_send.clone(); + let msg = Message::builder() + .body(format!("super stream message_{}", i)) + .application_properties() + .insert("id", i.to_string()) + .message_builder() + .build(); + super_stream_producer + .send(msg, move |_| { + let inner_counter = counter.clone(); + let inner_notifier = notifier.clone(); + async move { + if inner_counter.fetch_add(1, Ordering::Relaxed) == message_count - 1 { + inner_notifier.notify_one(); + } + } + }) + .await + .unwrap(); + } + + notify_on_send.notified().await; + println!( + "Successfully sent {} messages to the super stream {}", + message_count, super_stream + ); + let _ = super_stream_producer.close().await; + Ok(()) +} diff --git a/examples/superstreams/send_super_stream_routing_key.rs b/examples/superstreams/send_super_stream_routing_key.rs new file mode 100644 index 00000000..47141f70 --- /dev/null +++ b/examples/superstreams/send_super_stream_routing_key.rs @@ -0,0 +1,122 @@ +use rabbitmq_stream_client::error::StreamCreateError; +use rabbitmq_stream_client::types::{ + ByteCapacity, RoutingKeyRoutingStrategy, Message, ResponseCode, RoutingStrategy, +}; +use std::convert::TryInto; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use tokio::sync::Notify; + +fn hash_strategy_value_extractor(message: &Message) -> String { + message + .application_properties() + .unwrap() + .get("region") + .unwrap() + .clone() + .try_into() + .unwrap() +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + use rabbitmq_stream_client::Environment; + let environment = Environment::builder().build().await?; + let message_count = 100_000; + let super_stream = "hello-rust-super-stream"; + let confirmed_messages = Arc::new(AtomicU32::new(0)); + let notify_on_send = Arc::new(Notify::new()); + // With this example we need to specify binding_keys for the superstream + let routing_key = vec!["EMEA".to_string(), "APAC".to_string(), "AMER".to_string()]; + let _ = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create_super_stream(super_stream, 3, None) + .await; + + let delete_stream = environment.delete_super_stream(super_stream).await; + + match delete_stream { + Ok(_) => { + println!("Successfully deleted super stream {}", super_stream); + } + Err(err) => { + println!( + "Failed to delete super stream {}. error {}", + super_stream, err + ); + } + } + + let create_response = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create_super_stream(super_stream, 3, Some(routing_key)) + .await; + + if let Err(e) = create_response { + if let StreamCreateError::Create { stream, status } = e { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); + } + } + } + } + println!( + "Super stream example. Sending {} messages to the super stream: {}", + message_count, super_stream + ); + let mut super_stream_producer = environment + .super_stream_producer(RoutingStrategy::RoutingKeyStrategy( + RoutingKeyRoutingStrategy { + routing_extractor: &hash_strategy_value_extractor, + }, + )) + .build(super_stream) + .await + .unwrap(); + + for i in 0..message_count { + let counter = confirmed_messages.clone(); + let notifier = notify_on_send.clone(); + // Region names are the same as the binding_keys we specified during superstream creation + let mut region = ""; + if i % 3 == 0 { + region = "EMEA" + } else if i % 3 == 1 { + region = "APAC" + } else { + region = "AMER" + } + + let msg = Message::builder() + .body(format!("super stream message_{}", i)) + .application_properties() + .insert("region", region) + .message_builder() + .build(); + super_stream_producer + .send(msg, move |_| { + let inner_counter = counter.clone(); + let inner_notifier = notifier.clone(); + async move { + if inner_counter.fetch_add(1, Ordering::Relaxed) == message_count - 1 { + inner_notifier.notify_one(); + } + } + }) + .await + .unwrap(); + } + + notify_on_send.notified().await; + println!( + "Successfully sent {} messages to the super stream {}", + message_count, super_stream + ); + let _ = super_stream_producer.close().await; + Ok(()) +} diff --git a/src/superstream.rs b/src/superstream.rs index f68604a1..3c8c71f9 100644 --- a/src/superstream.rs +++ b/src/superstream.rs @@ -2,6 +2,7 @@ use crate::client::Client; use murmur3::murmur3_32; use rabbitmq_stream_protocol::message::Message; +use std::collections::HashMap; use std::io::Cursor; #[derive(Clone)] @@ -9,7 +10,7 @@ pub struct DefaultSuperStreamMetadata { pub super_stream: String, pub client: Client, pub partitions: Vec, - pub routes: Vec, + pub routes: HashMap>, } impl DefaultSuperStreamMetadata { @@ -22,16 +23,17 @@ impl DefaultSuperStreamMetadata { self.partitions.clone() } pub async fn routes(&mut self, routing_key: String) -> Vec { - if self.routes.is_empty() { + if !self.routes.contains_key(&routing_key) { let response = self .client - .route(routing_key, self.super_stream.clone()) + .route(routing_key.clone(), self.super_stream.clone()) .await; - self.routes = response.unwrap().streams; + self.routes + .insert(routing_key.clone(), response.unwrap().streams); } - self.routes.clone() + self.routes.get(routing_key.as_str()).unwrap().clone() } } diff --git a/src/superstream_consumer.rs b/src/superstream_consumer.rs index 23857f54..baf2c5b7 100644 --- a/src/superstream_consumer.rs +++ b/src/superstream_consumer.rs @@ -6,6 +6,7 @@ use crate::{error::ConsumerCreateError, ConsumerHandle, Environment, FilterConfi use futures::task::AtomicWaker; use futures::{Stream, StreamExt}; use rabbitmq_stream_protocol::commands::subscribe::OffsetSpecification; +use std::collections::HashMap; use std::pin::Pin; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; @@ -49,7 +50,7 @@ impl SuperStreamConsumerBuilder { super_stream: super_stream.to_string(), client: client.clone(), partitions: Vec::new(), - routes: Vec::new(), + routes: HashMap::new(), }; let partitions = super_stream_metadata.partitions().await; diff --git a/src/superstream_producer.rs b/src/superstream_producer.rs index b0678d69..37871d1f 100644 --- a/src/superstream_producer.rs +++ b/src/superstream_producer.rs @@ -79,7 +79,7 @@ impl SuperStreamProducer { self.1.insert(route.clone(), producer); } - let producer = self.1.get(route.as_str()).unwrap(); + let producer = self.1.get(&route).unwrap(); producer.send(message.clone(), cb.clone()).await?; } Ok(()) @@ -122,7 +122,7 @@ impl SuperStreamProducerBuilder { super_stream: super_stream.to_string(), client: client.clone(), partitions: Vec::new(), - routes: Vec::new(), + routes: HashMap::new(), }; let super_stream_producer = SuperStreamProducerInternal {