Skip to content

Commit 70e93b9

Browse files
committed
better implementation/refactoring
1 parent 8105c4a commit 70e93b9

File tree

5 files changed

+80
-79
lines changed

5 files changed

+80
-79
lines changed

src/environment.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ impl Environment {
5757
) -> SuperStreamProducerBuilder<NoDedup> {
5858
SuperStreamProducerBuilder {
5959
environment: self.clone(),
60-
name: None,
6160
data: PhantomData,
6261
filter_value_extractor: None,
6362
routing_strategy: routing_strategy,

src/superstream.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ pub struct DefaultSuperStreamMetadata {
2222
impl DefaultSuperStreamMetadata {
2323
pub async fn partitions(&mut self) -> Vec<String> {
2424
if self.partitions.len() == 0 {
25+
println!("partition len is 0");
2526
let response = self.client.partitions(self.super_stream.clone()).await;
2627

2728
self.partitions = response.unwrap().streams;
2829
}
29-
3030
return self.partitions.clone();
3131
}
3232
pub async fn routes(&mut self, routing_key: String) -> Vec<String> {
@@ -73,14 +73,15 @@ impl HashRoutingMurmurStrategy {
7373
message: Message,
7474
metadata: &mut DefaultSuperStreamMetadata,
7575
) -> Vec<String> {
76+
println!("im in routes");
7677
let mut streams: Vec<String> = Vec::new();
7778

78-
let key = (self.routing_extractor)(message);
79+
let key = (self.routing_extractor)(message.clone());
7980
let hash_result = murmur3_32(&mut Cursor::new(key), 104729);
8081

8182
let number_of_partitions = metadata.partitions().await.len();
8283
let route = hash_result.unwrap() % number_of_partitions as u32;
83-
let partitions: Vec<String> = metadata.partitions().await;
84+
let partitions = metadata.partitions().await;
8485
let stream = partitions.into_iter().nth(route as usize).unwrap();
8586
streams.push(stream);
8687

src/superstream_producer.rs

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::error::ProducerCloseError;
12
use crate::{
23
client::Client,
34
environment::Environment,
@@ -17,13 +18,13 @@ type FilterValueExtractor = Arc<dyn Fn(&Message) -> String + 'static + Send + Sy
1718
pub struct SuperStreamProducer<T>(
1819
Arc<SuperStreamProducerInternal>,
1920
HashMap<String, Producer<T>>,
21+
DefaultSuperStreamMetadata,
2022
PhantomData<T>,
2123
);
2224

2325
/// Builder for [`SuperStreamProducer`]
2426
pub struct SuperStreamProducerBuilder<T> {
2527
pub(crate) environment: Environment,
26-
pub(crate) name: Option<String>,
2728
pub filter_value_extractor: Option<FilterValueExtractor>,
2829
pub routing_strategy: RoutingStrategy,
2930
pub(crate) data: PhantomData<T>,
@@ -32,10 +33,8 @@ pub struct SuperStreamProducerBuilder<T> {
3233
pub struct SuperStreamProducerInternal {
3334
pub(crate) environment: Environment,
3435
client: Client,
35-
super_stream: String,
36-
publish_version: u16,
3736
filter_value_extractor: Option<FilterValueExtractor>,
38-
super_stream_metadata: DefaultSuperStreamMetadata,
37+
//super_stream_metadata: DefaultSuperStreamMetadata,
3938
routing_strategy: RoutingStrategy,
4039
}
4140

@@ -54,14 +53,10 @@ impl SuperStreamProducer<NoDedup> {
5453
{
5554
let routes = match self.0.routing_strategy.clone() {
5655
RoutingStrategy::HashRoutingStrategy(routing_strategy) => {
57-
routing_strategy
58-
.routes(message.clone(), &mut self.0.super_stream_metadata.clone())
59-
.await
56+
routing_strategy.routes(message.clone(), &mut self.2).await
6057
}
6158
RoutingStrategy::RoutingKeyStrategy(routing_strategy) => {
62-
routing_strategy
63-
.routes(message.clone(), &mut self.0.super_stream_metadata.clone())
64-
.await
59+
routing_strategy.routes(message.clone(), &mut self.2).await
6560
}
6661
};
6762

@@ -78,11 +73,34 @@ impl SuperStreamProducer<NoDedup> {
7873
let result = producer.send(message.clone(), cb.clone()).await;
7974
match result {
8075
Ok(()) => println!("Message correctly sent"),
81-
Err(e) => println!("Error {}", e),
76+
Err(e) => return Err(e),
8277
}
8378
}
8479
Ok(())
8580
}
81+
82+
pub async fn close(self) -> Result<(), ProducerCloseError> {
83+
self.0.client.close().await?;
84+
85+
let mut err: Option<ProducerCloseError> = None;
86+
let mut is_error = false;
87+
for (_, producer) in self.1.into_iter() {
88+
let close = producer.close().await;
89+
match close {
90+
Err(e) => {
91+
is_error = true;
92+
err = Some(e);
93+
}
94+
_ => (),
95+
}
96+
}
97+
98+
if is_error == false {
99+
return Ok(());
100+
} else {
101+
return Err(err.unwrap());
102+
}
103+
}
86104
}
87105

88106
impl<T> SuperStreamProducerBuilder<T> {
@@ -95,36 +113,29 @@ impl<T> SuperStreamProducerBuilder<T> {
95113
// to the leader anyway - it is the only one capable of writing.
96114
let client = self.environment.create_client().await?;
97115

98-
let mut publish_version = 1;
99-
100-
if self.filter_value_extractor.is_some() {
101-
if client.filtering_supported() {
102-
publish_version = 2
103-
} else {
104-
return Err(ProducerCreateError::FilteringNotSupport);
105-
}
106-
}
107-
108116
let producers = HashMap::new();
109117

118+
let super_stream_metadata = DefaultSuperStreamMetadata {
119+
super_stream: super_stream.to_string(),
120+
client: self.environment.create_client().await?,
121+
partitions: Vec::new(),
122+
routes: Vec::new(),
123+
};
124+
110125
let super_stream_producer = SuperStreamProducerInternal {
111126
environment: self.environment.clone(),
112-
super_stream: super_stream.to_string(),
113127
client,
114-
publish_version,
115128
filter_value_extractor: self.filter_value_extractor,
116129
routing_strategy: self.routing_strategy,
117-
super_stream_metadata: DefaultSuperStreamMetadata {
118-
super_stream: super_stream.to_string(),
119-
client: self.environment.create_client().await?,
120-
partitions: Vec::new(),
121-
routes: Vec::new(),
122-
},
123130
};
124131

125132
let internal_producer = Arc::new(super_stream_producer);
126-
let super_stream_producer =
127-
SuperStreamProducer(internal_producer.clone(), producers, PhantomData);
133+
let super_stream_producer = SuperStreamProducer(
134+
internal_producer.clone(),
135+
producers,
136+
super_stream_metadata,
137+
PhantomData,
138+
);
128139

129140
Ok(super_stream_producer)
130141
}

tests/integration/common.rs

Lines changed: 31 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::collections::HashMap;
22

33
use fake::{Fake, Faker};
44
use rabbitmq_stream_client::{Client, ClientOptions, Environment};
5+
use rabbitmq_stream_protocol::commands::generic::GenericResponse;
56
use rabbitmq_stream_protocol::ResponseCode;
67

78
pub struct TestClient {
@@ -38,26 +39,7 @@ impl TestClient {
3839
let super_stream: String = Faker.fake();
3940
let client = Client::connect(ClientOptions::default()).await.unwrap();
4041

41-
let partitions: Vec<String> = [
42-
super_stream.to_string() + "-0",
43-
super_stream.to_string() + "-1",
44-
super_stream.to_string() + "-2",
45-
]
46-
.iter()
47-
.map(|x| x.into())
48-
.collect();
49-
50-
let binding_keys: Vec<String> = ["0", "1", "2"].iter().map(|&x| x.into()).collect();
51-
52-
let response = client
53-
.create_super_stream(
54-
&super_stream,
55-
partitions.clone(),
56-
binding_keys,
57-
HashMap::new(),
58-
)
59-
.await
60-
.unwrap();
42+
let (response, partitions) = create_generic_super_stream(&super_stream, &client).await;
6143

6244
assert_eq!(&ResponseCode::Ok, response.code());
6345
TestClient {
@@ -109,26 +91,7 @@ impl TestEnvironment {
10991
let client = Client::connect(ClientOptions::default()).await.unwrap();
11092
let env = Environment::builder().build().await.unwrap();
11193

112-
let partitions: Vec<String> = [
113-
super_stream.to_string() + "-0",
114-
super_stream.to_string() + "-1",
115-
super_stream.to_string() + "-2",
116-
]
117-
.iter()
118-
.map(|x| x.into())
119-
.collect();
120-
121-
let binding_keys: Vec<String> = ["0", "1", "2"].iter().map(|&x| x.into()).collect();
122-
123-
let response = client
124-
.create_super_stream(
125-
&super_stream,
126-
partitions.clone(),
127-
binding_keys,
128-
HashMap::new(),
129-
)
130-
.await
131-
.unwrap();
94+
let (response, partitions) = create_generic_super_stream(&super_stream, &client).await;
13295

13396
assert_eq!(&ResponseCode::Ok, response.code());
13497
TestEnvironment {
@@ -160,3 +123,31 @@ impl Drop for TestEnvironment {
160123
}
161124
}
162125
}
126+
127+
pub async fn create_generic_super_stream(
128+
super_stream: &String,
129+
client: &Client,
130+
) -> (GenericResponse, Vec<String>) {
131+
let partitions: Vec<String> = [
132+
super_stream.to_string() + "-0",
133+
super_stream.to_string() + "-1",
134+
super_stream.to_string() + "-2",
135+
]
136+
.iter()
137+
.map(|x| x.into())
138+
.collect();
139+
140+
let binding_keys: Vec<String> = ["0", "1", "2"].iter().map(|&x| x.into()).collect();
141+
142+
let response = client
143+
.create_super_stream(
144+
&super_stream,
145+
partitions.clone(),
146+
binding_keys,
147+
HashMap::new(),
148+
)
149+
.await
150+
.unwrap();
151+
152+
return (response, partitions);
153+
}

tests/integration/producer_test.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,9 @@ use rabbitmq_stream_client::types::{
99
};
1010

1111
use crate::common::TestEnvironment;
12-
use rabbitmq_stream_protocol::message::Value;
13-
use rabbitmq_stream_protocol::utils::TupleMapperSecond;
1412
use std::sync::atomic::{AtomicU32, Ordering};
1513
use std::sync::Arc;
1614
use tokio::sync::Notify;
17-
use tokio::time::{sleep, Duration};
1815

1916
#[tokio::test(flavor = "multi_thread")]
2017
async fn producer_send_no_name_ok() {
@@ -400,7 +397,6 @@ fn routing_key_strategy_value_extractor(message: Message) -> String {
400397

401398
fn hash_strategy_value_extractor(message: Message) -> String {
402399
let s = String::from_utf8(Vec::from(message.data().unwrap())).expect("Found invalid UTF-8");
403-
404400
return s;
405401
}
406402

@@ -441,6 +437,7 @@ async fn key_super_steam_producer_test() {
441437
}
442438

443439
notify_on_send.notified().await;
440+
_ = super_stream_producer.close();
444441
}
445442

446443
#[tokio::test(flavor = "multi_thread")]
@@ -462,6 +459,7 @@ async fn hash_super_steam_producer_test() {
462459
.unwrap();
463460

464461
for i in 0..message_count {
462+
println!("sending message {}", i);
465463
let counter = confirmed_messages.clone();
466464
let notifier = notify_on_send.clone();
467465
let msg = Message::builder().body(format!("message{}", i)).build();
@@ -480,6 +478,7 @@ async fn hash_super_steam_producer_test() {
480478
}
481479

482480
notify_on_send.notified().await;
481+
_ = super_stream_producer.close();
483482
}
484483

485484
#[tokio::test(flavor = "multi_thread")]

0 commit comments

Comments
 (0)