Skip to content

fix a bug happening during sending of super_stream with routing_key #243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ See also a simple example here on how to consume from a stream:

[Consuming messages from a stream example](./examples/simple-consumer.rs)


## Super Stream

The client supports the super-stream functionality.
Expand All @@ -156,9 +157,17 @@ See the [blog post](https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-featur

You can use SuperStreamProducer and SuperStreamConsumer classes which internally uses producers and consumers to operate on the componsing streams.

SuperstreamProducers can act in Hashing and Routing Key mode.

See the Java documentation for more details (Same concepts apply here):

[Super Stream Producer - Java doc](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#super-stream-producer)

Have a look to the examples to see on how to work with super streams.

See the [Super Stream Producer Example](./examples/superstreams/send_super_stream.rs)
See the [Super Stream Producer Example using Hashing mmh3 mode](./examples/superstreams/send_super_stream_hash.rs)

See the [Super Stream Producer Example using Routing key mode](./examples/superstreams/send_super_stream_routing_key.rs)

See the [Super Stream Consumer Example](./examples/superstreams/receive_super_stream.rs)

Expand Down
111 changes: 111 additions & 0 deletions examples/superstreams/send_super_stream_hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use rabbitmq_stream_client::error::StreamCreateError;
use rabbitmq_stream_client::types::{
ByteCapacity, HashRoutingMurmurStrategy, Message, ResponseCode, RoutingStrategy,
};
use std::convert::TryInto;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;

fn hash_strategy_value_extractor(message: &Message) -> String {
message
.application_properties()
.unwrap()
.get("id")
.unwrap()
.clone()
.try_into()
.unwrap()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;
let message_count = 100_000;
let super_stream = "hello-rust-super-stream";
let confirmed_messages = Arc::new(AtomicU32::new(0));
let notify_on_send = Arc::new(Notify::new());
let _ = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create_super_stream(super_stream, 3, None)
.await;

let delete_stream = environment.delete_super_stream(super_stream).await;

match delete_stream {
Ok(_) => {
println!("Successfully deleted super stream {}", super_stream);
}
Err(err) => {
println!(
"Failed to delete super stream {}. error {}",
super_stream, err
);
}
}

let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create_super_stream(super_stream, 3, None)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
}
println!(
"Super stream example. Sending {} messages to the super stream: {}",
message_count, super_stream
);
let mut super_stream_producer = environment
.super_stream_producer(RoutingStrategy::HashRoutingStrategy(
HashRoutingMurmurStrategy {
routing_extractor: &hash_strategy_value_extractor,
},
))
.client_provided_name("my super stream producer for hello rust")
.build(super_stream)
.await
.unwrap();

for i in 0..message_count {
let counter = confirmed_messages.clone();
let notifier = notify_on_send.clone();
let msg = Message::builder()
.body(format!("super stream message_{}", i))
.application_properties()
.insert("id", i.to_string())
.message_builder()
.build();
super_stream_producer
.send(msg, move |_| {
let inner_counter = counter.clone();
let inner_notifier = notifier.clone();
async move {
if inner_counter.fetch_add(1, Ordering::Relaxed) == message_count - 1 {
inner_notifier.notify_one();
}
}
})
.await
.unwrap();
}

notify_on_send.notified().await;
println!(
"Successfully sent {} messages to the super stream {}",
message_count, super_stream
);
let _ = super_stream_producer.close().await;
Ok(())
}
122 changes: 122 additions & 0 deletions examples/superstreams/send_super_stream_routing_key.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use rabbitmq_stream_client::error::StreamCreateError;
use rabbitmq_stream_client::types::{
ByteCapacity, RoutingKeyRoutingStrategy, Message, ResponseCode, RoutingStrategy,
};
use std::convert::TryInto;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;

fn hash_strategy_value_extractor(message: &Message) -> String {
message
.application_properties()
.unwrap()
.get("region")
.unwrap()
.clone()
.try_into()
.unwrap()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;
let message_count = 100_000;
let super_stream = "hello-rust-super-stream";
let confirmed_messages = Arc::new(AtomicU32::new(0));
let notify_on_send = Arc::new(Notify::new());
// With this example we need to specify binding_keys for the superstream
let routing_key = vec!["EMEA".to_string(), "APAC".to_string(), "AMER".to_string()];
let _ = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create_super_stream(super_stream, 3, None)
.await;

let delete_stream = environment.delete_super_stream(super_stream).await;

match delete_stream {
Ok(_) => {
println!("Successfully deleted super stream {}", super_stream);
}
Err(err) => {
println!(
"Failed to delete super stream {}. error {}",
super_stream, err
);
}
}

let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create_super_stream(super_stream, 3, Some(routing_key))
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
}
println!(
"Super stream example. Sending {} messages to the super stream: {}",
message_count, super_stream
);
let mut super_stream_producer = environment
.super_stream_producer(RoutingStrategy::RoutingKeyStrategy(
RoutingKeyRoutingStrategy {
routing_extractor: &hash_strategy_value_extractor,
},
))
.build(super_stream)
.await
.unwrap();

for i in 0..message_count {
let counter = confirmed_messages.clone();
let notifier = notify_on_send.clone();
// Region names are the same as the binding_keys we specified during superstream creation
let mut region = "";
if i % 3 == 0 {
region = "EMEA"
} else if i % 3 == 1 {
region = "APAC"
} else {
region = "AMER"
}

let msg = Message::builder()
.body(format!("super stream message_{}", i))
.application_properties()
.insert("region", region)
.message_builder()
.build();
super_stream_producer
.send(msg, move |_| {
let inner_counter = counter.clone();
let inner_notifier = notifier.clone();
async move {
if inner_counter.fetch_add(1, Ordering::Relaxed) == message_count - 1 {
inner_notifier.notify_one();
}
}
})
.await
.unwrap();
}

notify_on_send.notified().await;
println!(
"Successfully sent {} messages to the super stream {}",
message_count, super_stream
);
let _ = super_stream_producer.close().await;
Ok(())
}
12 changes: 7 additions & 5 deletions src/superstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use crate::client::Client;

use murmur3::murmur3_32;
use rabbitmq_stream_protocol::message::Message;
use std::collections::HashMap;
use std::io::Cursor;

#[derive(Clone)]
pub struct DefaultSuperStreamMetadata {
pub super_stream: String,
pub client: Client,
pub partitions: Vec<String>,
pub routes: Vec<String>,
pub routes: HashMap<String, Vec<String>>,
}

impl DefaultSuperStreamMetadata {
Expand All @@ -22,16 +23,17 @@ impl DefaultSuperStreamMetadata {
self.partitions.clone()
}
pub async fn routes(&mut self, routing_key: String) -> Vec<String> {
if self.routes.is_empty() {
if !self.routes.contains_key(&routing_key) {
let response = self
.client
.route(routing_key, self.super_stream.clone())
.route(routing_key.clone(), self.super_stream.clone())
.await;

self.routes = response.unwrap().streams;
self.routes
.insert(routing_key.clone(), response.unwrap().streams);
}

self.routes.clone()
self.routes.get(routing_key.as_str()).unwrap().clone()
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/superstream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{error::ConsumerCreateError, ConsumerHandle, Environment, FilterConfi
use futures::task::AtomicWaker;
use futures::{Stream, StreamExt};
use rabbitmq_stream_protocol::commands::subscribe::OffsetSpecification;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
Expand Down Expand Up @@ -49,7 +50,7 @@ impl SuperStreamConsumerBuilder {
super_stream: super_stream.to_string(),
client: client.clone(),
partitions: Vec::new(),
routes: Vec::new(),
routes: HashMap::new(),
};
let partitions = super_stream_metadata.partitions().await;

Expand Down
4 changes: 2 additions & 2 deletions src/superstream_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl SuperStreamProducer<NoDedup> {
self.1.insert(route.clone(), producer);
}

let producer = self.1.get(route.as_str()).unwrap();
let producer = self.1.get(&route).unwrap();
producer.send(message.clone(), cb.clone()).await?;
}
Ok(())
Expand Down Expand Up @@ -122,7 +122,7 @@ impl<T> SuperStreamProducerBuilder<T> {
super_stream: super_stream.to_string(),
client: client.clone(),
partitions: Vec::new(),
routes: Vec::new(),
routes: HashMap::new(),
};

let super_stream_producer = SuperStreamProducerInternal {
Expand Down
Loading