Skip to content

Commit ffa07fb

Browse files
committed
fix clippy issue
1 parent 5a557a7 commit ffa07fb

File tree

5 files changed

+19
-31
lines changed

5 files changed

+19
-31
lines changed

src/consumer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ impl Consumer {
251251
impl Stream for Consumer {
252252
type Item = Result<Delivery, ConsumerDeliveryError>;
253253

254-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
254+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
255255
self.internal.waker.register(cx.waker());
256256
let poll = Pin::new(&mut self.receiver.lock().unwrap()).poll_recv(cx);
257257
match (self.is_closed(), poll.is_ready()) {

src/environment.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl Environment {
6060
environment: self.clone(),
6161
data: PhantomData,
6262
//filter_value_extractor: None,
63-
routing_strategy: routing_strategy,
63+
route_strategy: routing_strategy,
6464
}
6565
}
6666

src/superstream.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,16 @@ pub struct DefaultSuperStreamMetadata {
1414

1515
impl DefaultSuperStreamMetadata {
1616
pub async fn partitions(&mut self) -> Vec<String> {
17-
if self.partitions.len() == 0 {
17+
if self.partitions.is_empty() {
1818
println!("partition len is 0");
1919
let response = self.client.partitions(self.super_stream.clone()).await;
2020

2121
self.partitions = response.unwrap().streams;
2222
}
23-
return self.partitions.clone();
23+
self.partitions.clone()
2424
}
2525
pub async fn routes(&mut self, routing_key: String) -> Vec<String> {
26-
if self.routes.len() == 0 {
26+
if self.routes.is_empty() {
2727
let response = self
2828
.client
2929
.route(routing_key, self.super_stream.clone())
@@ -32,7 +32,7 @@ impl DefaultSuperStreamMetadata {
3232
self.routes = response.unwrap().streams;
3333
}
3434

35-
return self.routes.clone();
35+
self.routes.clone()
3636
}
3737
}
3838

@@ -49,9 +49,7 @@ impl RoutingKeyRoutingStrategy {
4949
) -> Vec<String> {
5050
let key = (self.routing_extractor)(message);
5151

52-
let routes = metadata.routes(key).await;
53-
54-
return routes;
52+
metadata.routes(key).await
5553
}
5654
}
5755

@@ -78,7 +76,7 @@ impl HashRoutingMurmurStrategy {
7876
let stream = partitions.into_iter().nth(route as usize).unwrap();
7977
streams.push(stream);
8078

81-
return streams;
79+
streams
8280
}
8381
}
8482

src/superstream_consumer.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,7 @@
1-
use futures::Stream;
2-
use rabbitmq_stream_protocol::commands::subscribe::OffsetSpecification;
3-
use std::pin::Pin;
4-
use std::sync::Arc;
5-
use std::sync::Mutex;
6-
use std::task::{Context, Poll};
7-
8-
use crate::consumer::Delivery;
9-
use crate::error::ConsumerDeliveryError;
101
use crate::superstream::DefaultSuperStreamMetadata;
112
use crate::{error::ConsumerCreateError, Client, Consumer, Environment};
3+
use rabbitmq_stream_protocol::commands::subscribe::OffsetSpecification;
4+
use std::sync::Arc;
125
//type FilterPredicate = Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>;
136

147
/// API for consuming RabbitMQ stream messages

src/superstream_producer.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub struct SuperStreamProducer<T>(
2626
pub struct SuperStreamProducerBuilder<T> {
2727
pub(crate) environment: Environment,
2828
//pub filter_value_extractor: Option<FilterValueExtractor>,
29-
pub routing_strategy: RoutingStrategy,
29+
pub route_strategy: RoutingStrategy,
3030
pub(crate) data: PhantomData<T>,
3131
}
3232

@@ -67,7 +67,7 @@ impl SuperStreamProducer<NoDedup> {
6767
}
6868

6969
let producer = self.1.get(route.as_str()).unwrap();
70-
let _ = producer.send(message.clone(), cb.clone()).await?;
70+
producer.send(message.clone(), cb.clone()).await?;
7171
}
7272
Ok(())
7373
}
@@ -79,19 +79,16 @@ impl SuperStreamProducer<NoDedup> {
7979
let mut is_error = false;
8080
for (_, producer) in self.1.into_iter() {
8181
let close = producer.close().await;
82-
match close {
83-
Err(e) => {
84-
is_error = true;
85-
err = Some(e);
86-
}
87-
_ => (),
82+
if let Err(e) = close {
83+
is_error = true;
84+
err = Some(e);
8885
}
8986
}
9087

91-
if is_error == false {
92-
return Ok(());
88+
if !is_error {
89+
Ok(())
9390
} else {
94-
return Err(err.unwrap());
91+
Err(err.unwrap())
9592
}
9693
}
9794
}
@@ -119,7 +116,7 @@ impl<T> SuperStreamProducerBuilder<T> {
119116
environment: self.environment.clone(),
120117
client,
121118
//filter_value_extractor: self.filter_value_extractor,
122-
routing_strategy: self.routing_strategy,
119+
routing_strategy: self.route_strategy,
123120
};
124121

125122
let internal_producer = Arc::new(super_stream_producer);

0 commit comments

Comments
 (0)