Skip to content

Commit 54a817c

Browse files
committed
some refactoring
1 parent df6a3e9 commit 54a817c

File tree

7 files changed

+62
-69
lines changed

7 files changed

+62
-69
lines changed

protocol/src/commands/superstream_partitions.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@ use std::io::Write;
22

33
#[cfg(test)]
44
use fake::Fake;
5-
<<<<<<< HEAD
6-
=======
75

8-
>>>>>>> 4a7cb45 (refactoring tests)
96
use super::Command;
107
use crate::{
118
codec::{Decoder, Encoder},

src/client/mod.rs

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -291,14 +291,14 @@ impl Client {
291291
properties,
292292
)
293293
})
294-
.await
294+
.await
295295
}
296296

297297
pub async fn unsubscribe(&self, subscription_id: u8) -> RabbitMQStreamResult<GenericResponse> {
298298
self.send_and_receive(|correlation_id| {
299299
UnSubscribeCommand::new(correlation_id, subscription_id)
300300
})
301-
.await
301+
.await
302302
}
303303

304304
pub async fn partitions(
@@ -330,7 +330,7 @@ impl Client {
330330
self.send_and_receive(|correlation_id| {
331331
CreateStreamCommand::new(correlation_id, stream.to_owned(), options)
332332
})
333-
.await
333+
.await
334334
}
335335

336336
pub async fn create_super_stream(
@@ -349,7 +349,7 @@ impl Client {
349349
options,
350350
)
351351
})
352-
.await
352+
.await
353353
}
354354

355355
pub async fn delete_stream(&self, stream: &str) -> RabbitMQStreamResult<GenericResponse> {
@@ -364,7 +364,7 @@ impl Client {
364364
self.send_and_receive(|correlation_id| {
365365
DeleteSuperStreamCommand::new(correlation_id, super_stream.to_owned())
366366
})
367-
.await
367+
.await
368368
}
369369

370370
pub async fn credit(&self, subscription_id: u8, credit: u16) -> RabbitMQStreamResult<()> {
@@ -391,7 +391,7 @@ impl Client {
391391
stream.to_owned(),
392392
offset,
393393
))
394-
.await
394+
.await
395395
}
396396

397397
pub async fn query_offset(&self, reference: String, stream: &str) -> Result<u64, ClientError> {
@@ -422,7 +422,7 @@ impl Client {
422422
stream.to_owned(),
423423
)
424424
})
425-
.await
425+
.await
426426
}
427427

428428
pub async fn delete_publisher(
@@ -432,7 +432,7 @@ impl Client {
432432
self.send_and_receive(|correlation_id| {
433433
DeletePublisherCommand::new(correlation_id, publisher_id)
434434
})
435-
.await
435+
.await
436436
}
437437

438438
pub async fn publish<T: BaseMessage>(
@@ -475,8 +475,8 @@ impl Client {
475475
self.send_and_receive::<QueryPublisherResponse, _, _>(|correlation_id| {
476476
QueryPublisherRequest::new(correlation_id, reference.to_owned(), stream.to_owned())
477477
})
478-
.await
479-
.map(|sequence| sequence.from_response())
478+
.await
479+
.map(|sequence| sequence.from_response())
480480
}
481481

482482
pub async fn exchange_command_versions(
@@ -485,7 +485,7 @@ impl Client {
485485
self.send_and_receive::<ExchangeCommandVersionsResponse, _, _>(|correlation_id| {
486486
ExchangeCommandVersionsRequest::new(correlation_id, vec![])
487487
})
488-
.await
488+
.await
489489
}
490490

491491
pub fn filtering_supported(&self) -> bool {
@@ -509,13 +509,13 @@ impl Client {
509509
let roots = Self::build_root_store(Some(Path::new(
510510
&broker.tls.get_root_certificates_path(),
511511
)))
512-
.await?;
512+
.await?;
513513
config = Self::build_tls_client_configuration(
514514
broker.tls.get_client_certificates_path(),
515515
&roots,
516516
broker,
517517
)
518-
.await?;
518+
.await?;
519519
} else {
520520
config = Self::build_tls_client_configuration_untrusted().await?;
521521
}
@@ -547,15 +547,15 @@ impl Client {
547547
self.with_state_lock(self.peer_properties(), move |state, server_properties| {
548548
state.server_properties = server_properties;
549549
})
550-
.await?;
550+
.await?;
551551
self.authenticate().await?;
552552

553553
self.wait_for_tune_data().await?;
554554

555555
self.with_state_lock(self.open(), |state, connection_properties| {
556556
state.connection_properties = connection_properties;
557557
})
558-
.await?;
558+
.await?;
559559

560560
// Start heartbeat task after connection is established
561561
self.start_hearbeat_task(self.state.write().await.deref_mut());
@@ -619,8 +619,8 @@ impl Client {
619619
self.send_and_receive::<SaslHandshakeResponse, _, _>(|correlation_id| {
620620
SaslHandshakeCommand::new(correlation_id)
621621
})
622-
.await
623-
.map(|handshake| handshake.mechanisms)
622+
.await
623+
.map(|handshake| handshake.mechanisms)
624624
}
625625

626626
async fn send_and_receive<T, R, M>(&self, msg_factory: M) -> Result<T, ClientError>
@@ -664,22 +664,22 @@ impl Client {
664664
self.send_and_receive::<OpenResponse, _, _>(|correlation_id| {
665665
OpenCommand::new(correlation_id, self.opts.v_host.clone())
666666
})
667-
.await
668-
.and_then(|open| {
669-
if open.is_ok() {
670-
Ok(open.connection_properties)
671-
} else {
672-
Err(ClientError::RequestError(open.code().clone()))
673-
}
674-
})
667+
.await
668+
.and_then(|open| {
669+
if open.is_ok() {
670+
Ok(open.connection_properties)
671+
} else {
672+
Err(ClientError::RequestError(open.code().clone()))
673+
}
674+
})
675675
}
676676

677677
async fn peer_properties(&self) -> Result<HashMap<String, String>, ClientError> {
678678
self.send_and_receive::<PeerPropertiesResponse, _, _>(|correlation_id| {
679679
PeerPropertiesCommand::new(correlation_id, HashMap::new())
680680
})
681-
.await
682-
.map(|peer_properties| peer_properties.server_properties)
681+
.await
682+
.map(|peer_properties| peer_properties.server_properties)
683683
}
684684

685685
async fn handle_tune_command(&self, tunes: &TunesCommand) {
@@ -723,7 +723,7 @@ impl Client {
723723
tokio::time::sleep(Duration::from_secs(heartbeat_interval.into())).await;
724724
}
725725
})
726-
.into();
726+
.into();
727727
state.heartbeat_task = Some(heartbeat_task);
728728
}
729729

@@ -781,7 +781,7 @@ impl Client {
781781
let client_certs = Self::build_client_certificates(Path::new(
782782
&broker.tls.get_client_certificates_path(),
783783
))
784-
.await?;
784+
.await?;
785785
let client_keys =
786786
Self::build_client_private_keys(Path::new(&broker.tls.get_client_keys_path()))
787787
.await?;
@@ -826,4 +826,4 @@ impl Client {
826826

827827
Ok(config)
828828
}
829-
}
829+
}

src/environment.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ use crate::{
1212
producer::ProducerBuilder,
1313
stream_creator::StreamCreator,
1414
superstream::RoutingStrategy,
15-
superstream_producer::SuperStreamProducerBuilder,
1615
superstream_consumer::SuperStreamConsumerBuilder,
16+
superstream_producer::SuperStreamProducerBuilder,
1717
RabbitMQStreamResult,
1818
};
1919

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ pub mod types {
9999
pub use crate::stream_creator::StreamCreator;
100100
pub use crate::superstream::HashRoutingMurmurStrategy;
101101
pub use crate::superstream::RoutingKeyRoutingStrategy;
102-
pub use crate::superstream_consumer::SuperStreamConsumer;
103102
pub use crate::superstream::RoutingStrategy;
103+
pub use crate::superstream_consumer::SuperStreamConsumer;
104104
pub use rabbitmq_stream_protocol::message::Message;
105105
pub use rabbitmq_stream_protocol::{Response, ResponseCode, ResponseKind};
106106

src/superstream_consumer.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
11
use std::sync::Arc;
22

3-
use rabbitmq_stream_protocol::{commands::subscribe::OffsetSpecification, message::Message};
3+
use rabbitmq_stream_protocol::commands::subscribe::OffsetSpecification;
44

55
use crate::superstream::DefaultSuperStreamMetadata;
66
use crate::{error::ConsumerCreateError, Client, Consumer, Environment};
77

8-
type FilterPredicate = Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>;
8+
//type FilterPredicate = Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>;
99

1010
/// API for consuming RabbitMQ stream messages
1111
#[derive(Clone)]
1212
pub struct SuperStreamConsumer {
13-
pub internal: Arc<SuperStreamConsumerInternal>,
13+
internal: Arc<SuperStreamConsumerInternal>,
1414
}
1515

1616
struct SuperStreamConsumerInternal {
1717
client: Client,
1818
super_stream: String,
1919
offset_specification: OffsetSpecification,
20-
pub consumers: Vec<Consumer>,
20+
consumers: Vec<Consumer>,
2121
}
2222

2323
/// Builder for [`Consumer`]
@@ -68,15 +68,14 @@ impl SuperStreamConsumerBuilder {
6868
})
6969
}
7070

71-
pub fn offset(mut self, offset_specification: OffsetSpecification) -> Self {
71+
pub async fn offset(mut self, offset_specification: OffsetSpecification) -> Self {
7272
self.offset_specification = offset_specification;
7373
self
7474
}
7575
}
7676

77-
impl SuperStreamConsumer {
78-
79-
pub async fn get_consumers(&self) -> &Vec<Consumer> {
80-
return &self.internal.consumers
77+
impl SuperStreamConsumer {
78+
pub async fn get_consumers(&self) -> &Vec<Consumer> {
79+
return &self.internal.consumers;
8180
}
8281
}

tests/integration/client_test.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,4 +447,3 @@ async fn client_test_route_test() {
447447
test.partitions.get(0).unwrap()
448448
);
449449
}
450-

tests/integration/consumer_test.rs

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::time::Duration;
33
use crate::common::TestEnvironment;
44
use fake::{Fake, Faker};
55
use futures::StreamExt;
6-
use tokio::task;
76
use rabbitmq_stream_client::{
87
error::{
98
ClientError, ConsumerCloseError, ConsumerDeliveryError, ConsumerStoreOffsetError,
@@ -12,11 +11,12 @@ use rabbitmq_stream_client::{
1211
types::{Delivery, Message, OffsetSpecification, SuperStreamConsumer},
1312
Consumer, FilterConfiguration, NoDedup, Producer,
1413
};
14+
use tokio::task;
1515

16+
use rabbitmq_stream_client::types::{HashRoutingMurmurStrategy, RoutingStrategy};
1617
use rabbitmq_stream_protocol::ResponseCode;
17-
use std::sync::Arc;
1818
use std::sync::atomic::{AtomicU32, Ordering};
19-
use rabbitmq_stream_client::types::{HashRoutingMurmurStrategy, RoutingStrategy};
19+
use std::sync::Arc;
2020

2121
#[tokio::test(flavor = "multi_thread")]
2222
async fn consumer_test() {
@@ -79,13 +79,14 @@ async fn super_stream_consumer_test() {
7979
.await
8080
.unwrap();
8181

82-
static super_stream_consumer: SuperStreamConsumer = env
83-
.env
84-
.super_stream_consumer()
85-
.offset(OffsetSpecification::Next)
86-
.build(&env.stream)
87-
.await
88-
.unwrap();
82+
let super_stream_consumer: Arc<SuperStreamConsumer> = Arc::new(
83+
env.env
84+
.super_stream_consumer()
85+
//.offset(OffsetSpecification::Next)
86+
.build(&env.stream)
87+
.await
88+
.unwrap(),
89+
);
8990

9091
for n in 0..message_count {
9192
let msg = Message::builder().body(format!("message{}", n)).build();
@@ -97,22 +98,19 @@ async fn super_stream_consumer_test() {
9798
.unwrap();
9899
}
99100

100-
101101
let received_messages = Arc::new(AtomicU32::new(0));
102-
let consumers = super_stream_consumer.get_consumers().await;
103102

104103
let mut tasks = Vec::new();
105-
for mut consumer in consumers.into_iter() {
106-
let received_messages_outer = received_messages.clone();
107-
tasks.push(task::spawn(async move {
108-
let inner_received_messages = received_messages_outer.clone();
109-
let delivery = consumer.next().await.unwrap();
110-
let _ = String::from_utf8(delivery.unwrap().message().data().unwrap().to_vec()).unwrap();
111-
inner_received_messages.fetch_add(1, Ordering::Relaxed);
112-
113-
}));
114-
}
115-
104+
for mut consumer in super_stream_consumer.get_consumers().await.into_iter() {
105+
let received_messages_outer = received_messages.clone();
106+
tasks.push(task::spawn(async move {
107+
let inner_received_messages = received_messages_outer.clone();
108+
let delivery = consumer.next().await.unwrap();
109+
let _ =
110+
String::from_utf8(delivery.unwrap().message().data().unwrap().to_vec()).unwrap();
111+
inner_received_messages.fetch_add(1, Ordering::Relaxed);
112+
}));
113+
}
116114
futures::future::join_all(tasks).await;
117115

118116
assert!(received_messages.fetch_add(1, Ordering::Relaxed) == message_count);
@@ -398,7 +396,7 @@ async fn consumer_test_with_store_offset() {
398396
// Store an offset
399397
if i == offset_to_store {
400398
//Store the 5th element produced
401-
let result = consumer_store
399+
let _ = consumer_store
402400
.store_offset(delivery.unwrap().offset())
403401
.await;
404402
}

0 commit comments

Comments
 (0)