Skip to content

Commit aa085c3

Browse files
committed
making Messages as references to allow borrowing
1 parent e409ac5 commit aa085c3

File tree

3 files changed

+10
-10
lines changed

3 files changed

+10
-10
lines changed

src/superstream.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,13 @@ impl DefaultSuperStreamMetadata {
4545

4646
#[derive(Clone)]
4747
pub struct RoutingKeyRoutingStrategy {
48-
pub routing_extractor: &'static dyn Fn(Message) -> String,
48+
pub routing_extractor: &'static dyn Fn(&Message) -> String,
4949
}
5050

5151
impl RoutingKeyRoutingStrategy {
5252
pub async fn routes(
5353
&self,
54-
message: Message,
54+
message: &Message,
5555
metadata: &mut DefaultSuperStreamMetadata,
5656
) -> Vec<String> {
5757
let key = (self.routing_extractor)(message);
@@ -64,19 +64,19 @@ impl RoutingKeyRoutingStrategy {
6464

6565
#[derive(Clone)]
6666
pub struct HashRoutingMurmurStrategy {
67-
pub routing_extractor: &'static dyn Fn(Message) -> String,
67+
pub routing_extractor: &'static dyn Fn(&Message) -> String,
6868
}
6969

7070
impl HashRoutingMurmurStrategy {
7171
pub async fn routes(
7272
&self,
73-
message: Message,
73+
message: &Message,
7474
metadata: &mut DefaultSuperStreamMetadata,
7575
) -> Vec<String> {
7676
println!("im in routes");
7777
let mut streams: Vec<String> = Vec::new();
7878

79-
let key = (self.routing_extractor)(message.clone());
79+
let key = (self.routing_extractor)(message);
8080
let hash_result = murmur3_32(&mut Cursor::new(key), 104729);
8181

8282
let number_of_partitions = metadata.partitions().await.len();

src/superstream_producer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ impl SuperStreamProducer<NoDedup> {
5353
{
5454
let routes = match self.0.routing_strategy.clone() {
5555
RoutingStrategy::HashRoutingStrategy(routing_strategy) => {
56-
routing_strategy.routes(message.clone(), &mut self.2).await
56+
routing_strategy.routes(&message, &mut self.2).await
5757
}
5858
RoutingStrategy::RoutingKeyStrategy(routing_strategy) => {
59-
routing_strategy.routes(message.clone(), &mut self.2).await
59+
routing_strategy.routes(&message, &mut self.2).await
6060
}
6161
};
6262

@@ -67,7 +67,7 @@ impl SuperStreamProducer<NoDedup> {
6767
}
6868

6969
let producer = self.1.get(route.as_str()).unwrap();
70-
let result = producer.send(message.clone(), cb.clone()).await?;
70+
let _ = producer.send(message.clone(), cb.clone()).await?;
7171
}
7272
Ok(())
7373
}

tests/integration/producer_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,11 +391,11 @@ async fn producer_send_after_close_error() {
391391
);
392392
}
393393

394-
fn routing_key_strategy_value_extractor(message: Message) -> String {
394+
fn routing_key_strategy_value_extractor(message: &Message) -> String {
395395
return "0".to_string();
396396
}
397397

398-
fn hash_strategy_value_extractor(message: Message) -> String {
398+
fn hash_strategy_value_extractor(message: &Message) -> String {
399399
let s = String::from_utf8(Vec::from(message.data().unwrap())).expect("Found invalid UTF-8");
400400
return s;
401401
}

0 commit comments

Comments
 (0)