Skip to content

Commit 8105c4a

Browse files
committed
finalizing superstream_producer and tests
1 parent f6bffdb commit 8105c4a

File tree

6 files changed

+209
-25
lines changed

6 files changed

+209
-25
lines changed

src/environment.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use crate::{
1111
error::StreamDeleteError,
1212
producer::ProducerBuilder,
1313
stream_creator::StreamCreator,
14+
superstream::RoutingStrategy,
15+
superstream_producer::SuperStreamProducerBuilder,
1416
RabbitMQStreamResult,
1517
};
1618

@@ -49,6 +51,19 @@ impl Environment {
4951
}
5052
}
5153

54+
pub fn super_stream_producer(
55+
&self,
56+
routing_strategy: RoutingStrategy,
57+
) -> SuperStreamProducerBuilder<NoDedup> {
58+
SuperStreamProducerBuilder {
59+
environment: self.clone(),
60+
name: None,
61+
data: PhantomData,
62+
filter_value_extractor: None,
63+
routing_strategy: routing_strategy,
64+
}
65+
}
66+
5267
/// Returns a builder for creating a consumer
5368
pub fn consumer(&self) -> ConsumerBuilder {
5469
ConsumerBuilder {

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ pub mod types {
9696
pub use crate::consumer::Delivery;
9797
pub use crate::offset_specification::OffsetSpecification;
9898
pub use crate::stream_creator::StreamCreator;
99+
pub use crate::superstream::HashRoutingMurmurStrategy;
100+
pub use crate::superstream::RoutingKeyRoutingStrategy;
101+
pub use crate::superstream::RoutingStrategy;
99102
pub use rabbitmq_stream_protocol::message::Message;
100103
pub use rabbitmq_stream_protocol::{Response, ResponseCode, ResponseKind};
101104

src/superstream.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ use murmur3::murmur3_32;
44
use rabbitmq_stream_protocol::message::Message;
55
use std::io::Cursor;
66

7+
/*
78
pub enum RouteType {
89
Hash = 0,
910
Key = 1,
1011
}
12+
*/
1113

1214
#[derive(Clone)]
1315
pub struct DefaultSuperStreamMetadata {
@@ -43,7 +45,7 @@ impl DefaultSuperStreamMetadata {
4345

4446
#[derive(Clone)]
4547
pub struct RoutingKeyRoutingStrategy {
46-
routing_extractor: &'static dyn Fn(Message) -> String,
48+
pub routing_extractor: &'static dyn Fn(Message) -> String,
4749
}
4850

4951
impl RoutingKeyRoutingStrategy {
@@ -62,7 +64,7 @@ impl RoutingKeyRoutingStrategy {
6264

6365
#[derive(Clone)]
6466
pub struct HashRoutingMurmurStrategy {
65-
routing_extractor: &'static dyn Fn(Message) -> String,
67+
pub routing_extractor: &'static dyn Fn(Message) -> String,
6668
}
6769

6870
impl HashRoutingMurmurStrategy {

src/superstream_producer.rs

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::{
22
client::Client,
33
environment::Environment,
44
error::{ProducerCreateError, ProducerPublishError},
5-
producer::{ConfirmationStatus, Producer, NoDedup},
5+
producer::{ConfirmationStatus, NoDedup, Producer},
66
superstream::{DefaultSuperStreamMetadata, RoutingStrategy},
77
};
88
use rabbitmq_stream_protocol::message::Message;
@@ -14,7 +14,11 @@ use std::sync::Arc;
1414
type FilterValueExtractor = Arc<dyn Fn(&Message) -> String + 'static + Send + Sync>;
1515

1616
#[derive(Clone)]
17-
pub struct SuperStreamProducer<T>(Arc<SuperStreamProducerInternal<T>>, PhantomData<T>);
17+
pub struct SuperStreamProducer<T>(
18+
Arc<SuperStreamProducerInternal>,
19+
HashMap<String, Producer<T>>,
20+
PhantomData<T>,
21+
);
1822

1923
/// Builder for [`SuperStreamProducer`]
2024
pub struct SuperStreamProducerBuilder<T> {
@@ -25,23 +29,27 @@ pub struct SuperStreamProducerBuilder<T> {
2529
pub(crate) data: PhantomData<T>,
2630
}
2731

28-
pub struct SuperStreamProducerInternal<T> {
32+
pub struct SuperStreamProducerInternal {
2933
pub(crate) environment: Environment,
3034
client: Client,
3135
super_stream: String,
3236
publish_version: u16,
33-
producers: HashMap<String, Producer<T>>,
3437
filter_value_extractor: Option<FilterValueExtractor>,
3538
super_stream_metadata: DefaultSuperStreamMetadata,
3639
routing_strategy: RoutingStrategy,
3740
}
3841

3942
impl SuperStreamProducer<NoDedup> {
4043
pub async fn send<Fut>(
41-
&self,
44+
&mut self,
4245
message: Message,
43-
cb: impl Fn(Result<ConfirmationStatus, ProducerPublishError>) -> Fut + Send + Sync + 'static + Clone,
44-
) where
46+
cb: impl Fn(Result<ConfirmationStatus, ProducerPublishError>) -> Fut
47+
+ Send
48+
+ Sync
49+
+ 'static
50+
+ Clone,
51+
) -> Result<(), ProducerPublishError>
52+
where
4553
Fut: Future<Output = ()> + Send + Sync + 'static,
4654
{
4755
let routes = match self.0.routing_strategy.clone() {
@@ -58,24 +66,29 @@ impl SuperStreamProducer<NoDedup> {
5866
};
5967

6068
for route in routes.into_iter() {
61-
if !self.0.producers.contains_key(route.as_str()) {
69+
if !self.1.contains_key(route.as_str()) {
6270
let producer = self.0.environment.producer().build(route.as_str()).await;
63-
self.0.producers.clone().insert(route.clone(), producer.unwrap().clone());
71+
72+
self.1.insert(route.clone(), producer.unwrap());
6473
}
6574

66-
let producer = self.0.producers.get(route.as_str()).unwrap();
67-
_ = producer
68-
.send(message.clone(), cb.clone())
69-
.await;
75+
println!("sending message to super_stream {}", route.clone());
76+
77+
let producer = self.1.get(route.as_str()).unwrap();
78+
let result = producer.send(message.clone(), cb.clone()).await;
79+
match result {
80+
Ok(()) => println!("Message correctly sent"),
81+
Err(e) => println!("Error {}", e),
82+
}
7083
}
84+
Ok(())
7185
}
7286
}
7387

7488
impl<T> SuperStreamProducerBuilder<T> {
7589
pub async fn build(
7690
self,
7791
super_stream: &str,
78-
route_type: RoutingStrategy,
7992
) -> Result<SuperStreamProducer<T>, ProducerCreateError> {
8093
// Connect to the user specified node first, then look for the stream leader.
8194
// The leader is the recommended node for writing, because writing to a replica will redundantly pass these messages
@@ -100,18 +113,18 @@ impl<T> SuperStreamProducerBuilder<T> {
100113
client,
101114
publish_version,
102115
filter_value_extractor: self.filter_value_extractor,
103-
routing_strategy: route_type,
116+
routing_strategy: self.routing_strategy,
104117
super_stream_metadata: DefaultSuperStreamMetadata {
105118
super_stream: super_stream.to_string(),
106119
client: self.environment.create_client().await?,
107120
partitions: Vec::new(),
108121
routes: Vec::new(),
109122
},
110-
producers,
111123
};
112124

113125
let internal_producer = Arc::new(super_stream_producer);
114-
let super_stream_producer = SuperStreamProducer(internal_producer.clone(), PhantomData);
126+
let super_stream_producer =
127+
SuperStreamProducer(internal_producer.clone(), producers, PhantomData);
115128

116129
Ok(super_stream_producer)
117130
}

tests/integration/common.rs

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ pub struct TestClient {
1414
pub struct TestEnvironment {
1515
pub env: Environment,
1616
pub stream: String,
17+
pub super_stream: String,
18+
pub partitions: Vec<String>,
1719
}
1820

1921
impl TestClient {
@@ -94,15 +96,67 @@ impl TestEnvironment {
9496
let env = Environment::builder().build().await.unwrap();
9597
env.stream_creator().create(&stream).await.unwrap();
9698

97-
TestEnvironment { env, stream }
99+
TestEnvironment {
100+
env,
101+
stream,
102+
super_stream: String::new(),
103+
partitions: Vec::new(),
104+
}
105+
}
106+
107+
pub async fn create_super_stream() -> TestEnvironment {
108+
let super_stream: String = Faker.fake();
109+
let client = Client::connect(ClientOptions::default()).await.unwrap();
110+
let env = Environment::builder().build().await.unwrap();
111+
112+
let partitions: Vec<String> = [
113+
super_stream.to_string() + "-0",
114+
super_stream.to_string() + "-1",
115+
super_stream.to_string() + "-2",
116+
]
117+
.iter()
118+
.map(|x| x.into())
119+
.collect();
120+
121+
let binding_keys: Vec<String> = ["0", "1", "2"].iter().map(|&x| x.into()).collect();
122+
123+
let response = client
124+
.create_super_stream(
125+
&super_stream,
126+
partitions.clone(),
127+
binding_keys,
128+
HashMap::new(),
129+
)
130+
.await
131+
.unwrap();
132+
133+
assert_eq!(&ResponseCode::Ok, response.code());
134+
TestEnvironment {
135+
env,
136+
stream: String::new(),
137+
super_stream,
138+
partitions,
139+
}
98140
}
99141
}
100142

101143
impl Drop for TestEnvironment {
102144
fn drop(&mut self) {
103-
tokio::task::block_in_place(|| {
104-
tokio::runtime::Handle::current()
105-
.block_on(async { self.env.delete_stream(&self.stream).await.unwrap() })
106-
});
145+
if self.stream != "" {
146+
tokio::task::block_in_place(|| {
147+
tokio::runtime::Handle::current()
148+
.block_on(async { self.env.delete_stream(&self.stream).await.unwrap() })
149+
});
150+
}
151+
if self.super_stream != "" {
152+
tokio::task::block_in_place(|| {
153+
tokio::runtime::Handle::current().block_on(async {
154+
self.env
155+
.delete_super_stream(&self.super_stream)
156+
.await
157+
.unwrap()
158+
})
159+
});
160+
}
107161
}
108162
}

tests/integration/producer_test.rs

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,18 @@ use fake::{Fake, Faker};
33
use futures::StreamExt;
44
use tokio::sync::mpsc::channel;
55

6-
use rabbitmq_stream_client::types::{Message, OffsetSpecification, SimpleValue};
6+
use rabbitmq_stream_client::types::{
7+
HashRoutingMurmurStrategy, Message, OffsetSpecification, RoutingKeyRoutingStrategy,
8+
RoutingStrategy, SimpleValue,
9+
};
710

811
use crate::common::TestEnvironment;
12+
use rabbitmq_stream_protocol::message::Value;
13+
use rabbitmq_stream_protocol::utils::TupleMapperSecond;
14+
use std::sync::atomic::{AtomicU32, Ordering};
15+
use std::sync::Arc;
16+
use tokio::sync::Notify;
17+
use tokio::time::{sleep, Duration};
918

1019
#[tokio::test(flavor = "multi_thread")]
1120
async fn producer_send_no_name_ok() {
@@ -385,6 +394,94 @@ async fn producer_send_after_close_error() {
385394
);
386395
}
387396

397+
fn routing_key_strategy_value_extractor(message: Message) -> String {
398+
return "0".to_string();
399+
}
400+
401+
fn hash_strategy_value_extractor(message: Message) -> String {
402+
let s = String::from_utf8(Vec::from(message.data().unwrap())).expect("Found invalid UTF-8");
403+
404+
return s;
405+
}
406+
407+
#[tokio::test(flavor = "multi_thread")]
408+
async fn key_super_steam_producer_test() {
409+
let env = TestEnvironment::create_super_stream().await;
410+
let mut confirmed_messages = Arc::new(AtomicU32::new(0));
411+
let notify_on_send = Arc::new(Notify::new());
412+
let message_count = 100;
413+
414+
let mut super_stream_producer = env
415+
.env
416+
.super_stream_producer(RoutingStrategy::RoutingKeyStrategy(
417+
RoutingKeyRoutingStrategy {
418+
routing_extractor: &routing_key_strategy_value_extractor,
419+
},
420+
))
421+
.build(&env.super_stream)
422+
.await
423+
.unwrap();
424+
425+
for i in 0..message_count {
426+
let counter = confirmed_messages.clone();
427+
let notifier = notify_on_send.clone();
428+
let msg = Message::builder().body(format!("message{}", i)).build();
429+
super_stream_producer
430+
.send(msg, move |_| {
431+
let inner_counter = counter.clone();
432+
let inner_notifier = notifier.clone();
433+
async move {
434+
if inner_counter.fetch_add(1, Ordering::Relaxed) == message_count - 1 {
435+
inner_notifier.notify_one();
436+
}
437+
}
438+
})
439+
.await
440+
.unwrap();
441+
}
442+
443+
notify_on_send.notified().await;
444+
}
445+
446+
#[tokio::test(flavor = "multi_thread")]
447+
async fn hash_super_steam_producer_test() {
448+
let env = TestEnvironment::create_super_stream().await;
449+
let mut confirmed_messages = Arc::new(AtomicU32::new(0));
450+
let notify_on_send = Arc::new(Notify::new());
451+
let message_count = 100;
452+
453+
let mut super_stream_producer = env
454+
.env
455+
.super_stream_producer(RoutingStrategy::HashRoutingStrategy(
456+
HashRoutingMurmurStrategy {
457+
routing_extractor: &hash_strategy_value_extractor,
458+
},
459+
))
460+
.build(&env.super_stream)
461+
.await
462+
.unwrap();
463+
464+
for i in 0..message_count {
465+
let counter = confirmed_messages.clone();
466+
let notifier = notify_on_send.clone();
467+
let msg = Message::builder().body(format!("message{}", i)).build();
468+
super_stream_producer
469+
.send(msg, move |_| {
470+
let inner_counter = counter.clone();
471+
let inner_notifier = notifier.clone();
472+
async move {
473+
if inner_counter.fetch_add(1, Ordering::Relaxed) == message_count - 1 {
474+
inner_notifier.notify_one();
475+
}
476+
}
477+
})
478+
.await
479+
.unwrap();
480+
}
481+
482+
notify_on_send.notified().await;
483+
}
484+
388485
#[tokio::test(flavor = "multi_thread")]
389486
async fn producer_send_filtering_message() {
390487
let env = TestEnvironment::create().await;

0 commit comments

Comments
 (0)