Skip to content

Commit 4a7cb45

Browse files
committed
refactoring tests
1 parent 51590eb commit 4a7cb45

File tree

4 files changed

+108
-146
lines changed

4 files changed

+108
-146
lines changed

protocol/src/commands/superstream_partitions.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,13 @@ use std::io::Write;
33
#[cfg(test)]
44
use fake::Fake;
55

6+
use super::Command;
67
use crate::{
78
codec::{Decoder, Encoder},
89
error::{DecodeError, EncodeError},
910
protocol::commands::COMMAND_PARTITIONS,
1011
FromResponse, ResponseCode,
1112
};
12-
use crate::commands::exchange_command_versions::ExchangeCommandVersion;
13-
use super::Command;
1413

1514
#[cfg_attr(test, derive(fake::Dummy))]
1615
#[derive(PartialEq, Eq, Debug)]

src/client/mod.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,12 @@ use rabbitmq_stream_protocol::{
6060
sasl_handshake::{SaslHandshakeCommand, SaslHandshakeResponse},
6161
store_offset::StoreOffset,
6262
subscribe::{OffsetSpecification, SubscribeCommand},
63-
tune::TunesCommand,
64-
unsubscribe::UnSubscribeCommand, superstream_partitions::SuperStreamPartitionsResponse,
65-
superstream_partitions::SuperStreamPartitionsRequest, superstream_route::SuperStreamRouteRequest,
63+
superstream_partitions::SuperStreamPartitionsRequest,
64+
superstream_partitions::SuperStreamPartitionsResponse,
65+
superstream_route::SuperStreamRouteRequest,
6666
superstream_route::SuperStreamRouteResponse,
67+
tune::TunesCommand,
68+
unsubscribe::UnSubscribeCommand,
6769
},
6870
types::PublishedMessage,
6971
FromResponse, Request, Response, ResponseCode, ResponseKind,
@@ -299,18 +301,25 @@ impl Client {
299301
.await
300302
}
301303

302-
pub async fn partitions(&self, super_stream: String) -> RabbitMQStreamResult<SuperStreamPartitionsResponse> {
304+
pub async fn partitions(
305+
&self,
306+
super_stream: String,
307+
) -> RabbitMQStreamResult<SuperStreamPartitionsResponse> {
303308
self.send_and_receive(|correlation_id| {
304309
SuperStreamPartitionsRequest::new(correlation_id, super_stream)
305310
})
306-
.await
311+
.await
307312
}
308313

309-
pub async fn route(&self, routing_key: String, super_stream: String) -> RabbitMQStreamResult<SuperStreamRouteResponse> {
314+
pub async fn route(
315+
&self,
316+
routing_key: String,
317+
super_stream: String,
318+
) -> RabbitMQStreamResult<SuperStreamRouteResponse> {
310319
self.send_and_receive(|correlation_id| {
311320
SuperStreamRouteRequest::new(correlation_id, routing_key, super_stream)
312321
})
313-
.await
322+
.await
314323
}
315324

316325
pub async fn create_stream(

tests/integration/client_test.rs

Lines changed: 33 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -42,79 +42,26 @@ async fn client_create_stream_error_test() {
4242

4343
#[tokio::test(flavor = "multi_thread")]
4444
async fn client_create_and_delete_super_stream_test() {
45-
let super_stream_name = "test-super-stream";
46-
47-
let client = Client::connect(ClientOptions::default()).await.unwrap();
48-
49-
let partitions: Vec<String> = [
50-
"test-super-stream-0",
51-
"test-super-stream-1",
52-
"test-super-stream-2",
53-
]
54-
.iter()
55-
.map(|&x| x.into())
56-
.collect();
57-
58-
let binding_keys: Vec<String> = ["0", "1", "2"].iter().map(|&x| x.into()).collect();
59-
60-
let response = client
61-
.create_super_stream(&super_stream_name, partitions, binding_keys, HashMap::new())
62-
.await
63-
.unwrap();
64-
65-
assert_eq!(&ResponseCode::Ok, response.code());
66-
67-
let response = client
68-
.delete_super_stream(&super_stream_name)
69-
.await
70-
.unwrap();
71-
72-
assert_eq!(&ResponseCode::Ok, response.code());
73-
74-
let _ = client.close().await;
45+
let test = TestClient::create_super_stream().await;
7546
}
47+
7648
#[tokio::test(flavor = "multi_thread")]
7749
async fn client_create_super_stream_error_test() {
78-
let super_stream_name = "test-super-stream-error";
79-
80-
let client = Client::connect(ClientOptions::default()).await.unwrap();
81-
82-
let partitions: Vec<String> = [
83-
"test-super-stream-error-0",
84-
"test-super-stream-error-1",
85-
"test-super-stream-error-2",
86-
]
87-
.iter()
88-
.map(|&x| x.into())
89-
.collect();
90-
50+
let test = TestClient::create_super_stream().await;
9151
let binding_keys: Vec<String> = ["0", "1", "2"].iter().map(|&x| x.into()).collect();
9252

93-
let response = client
53+
let response = test
54+
.client
9455
.create_super_stream(
95-
&super_stream_name,
96-
partitions.clone(),
97-
binding_keys.clone(),
56+
&test.super_stream,
57+
test.partitions.clone(),
58+
binding_keys,
9859
HashMap::new(),
9960
)
10061
.await
10162
.unwrap();
10263

103-
assert_eq!(&ResponseCode::Ok, response.code());
104-
105-
let response = client
106-
.create_super_stream(&super_stream_name, partitions, binding_keys, HashMap::new())
107-
.await
108-
.unwrap();
109-
11064
assert_eq!(&ResponseCode::StreamAlreadyExists, response.code());
111-
112-
let response = client
113-
.delete_super_stream(&super_stream_name)
114-
.await
115-
.unwrap();
116-
117-
assert_eq!(&ResponseCode::Ok, response.code());
11865
}
11966
async fn client_delete_stream_test() {
12067
let test = TestClient::create().await;
@@ -463,86 +410,40 @@ async fn client_exchange_command_versions() {
463410

464411
#[tokio::test(flavor = "multi_thread")]
465412
async fn client_test_partitions_test() {
466-
let super_stream_name = "test-super-stream-partitions";
467-
468-
let client = Client::connect(ClientOptions::default()).await.unwrap();
469-
470-
let partitions: Vec<String> = [
471-
"test-super-stream-partitions-0",
472-
"test-super-stream-partitions-1",
473-
"test-super-stream-partitions-2",
474-
]
475-
.iter()
476-
.map(|&x| x.into())
477-
.collect();
478-
479-
let binding_keys: Vec<String> = ["0", "1", "2"].iter().map(|&x| x.into()).collect();
480-
481-
let response = client
482-
.create_super_stream(&super_stream_name, partitions, binding_keys, HashMap::new())
483-
.await
484-
.unwrap();
485-
486-
assert_eq!(&ResponseCode::Ok, response.code());
487-
488-
let response = client
489-
.partitions(super_stream_name.to_string())
490-
.await
491-
.unwrap();
492-
493-
assert_eq!(response.streams.get(0).unwrap(), "test-super-stream-partitions-0");
494-
assert_eq!(response.streams.get(1).unwrap(), "test-super-stream-partitions-1");
495-
assert_eq!(response.streams.get(2).unwrap(), "test-super-stream-partitions-2");
413+
let test = TestClient::create_super_stream().await;
496414

497-
let response = client
498-
.delete_super_stream(&super_stream_name)
415+
let response = test
416+
.client
417+
.partitions(test.super_stream.to_string())
499418
.await
500419
.unwrap();
501420

502-
assert_eq!(&ResponseCode::Ok, response.code());
503-
504-
let _ = client.close().await;
421+
assert_eq!(
422+
response.streams.get(0).unwrap(),
423+
test.partitions.get(0).unwrap()
424+
);
425+
assert_eq!(
426+
response.streams.get(1).unwrap(),
427+
test.partitions.get(1).unwrap()
428+
);
429+
assert_eq!(
430+
response.streams.get(2).unwrap(),
431+
test.partitions.get(2).unwrap()
432+
);
505433
}
506434

507435
#[tokio::test(flavor = "multi_thread")]
508436
async fn client_test_route_test() {
509-
let super_stream_name = "test-super-stream-route";
510-
511-
let client = Client::connect(ClientOptions::default()).await.unwrap();
512-
513-
let partitions: Vec<String> = [
514-
"test-super-stream-route-0",
515-
"test-super-stream-route-1",
516-
"test-super-stream-route-2",
517-
]
518-
.iter()
519-
.map(|&x| x.into())
520-
.collect();
521-
522-
let binding_keys: Vec<String> = ["0", "1", "2"].iter().map(|&x| x.into()).collect();
523-
524-
let response = client
525-
.create_super_stream(&super_stream_name, partitions, binding_keys, HashMap::new())
526-
.await
527-
.unwrap();
528-
529-
assert_eq!(&ResponseCode::Ok, response.code());
530-
531-
let response = client
532-
.route("0".to_string(), super_stream_name.to_string())
437+
let test = TestClient::create_super_stream().await;
438+
let response = test
439+
.client
440+
.route("0".to_string(), test.super_stream.to_string())
533441
.await
534442
.unwrap();
535443

536444
assert_eq!(response.streams.len(), 1);
537-
assert_eq!(response.streams.get(0).unwrap(), "test-super-stream-route-0");
538-
539-
540-
let response = client
541-
.delete_super_stream(&super_stream_name)
542-
.await
543-
.unwrap();
544-
545-
assert_eq!(&ResponseCode::Ok, response.code());
546-
547-
let _ = client.close().await;
548-
}
445+
assert_eq!(
446+
response.streams.get(0).unwrap(),
447+
test.partitions.get(0).unwrap()
448+
);
449+
}

tests/integration/common.rs

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use rabbitmq_stream_protocol::ResponseCode;
77
pub struct TestClient {
88
pub client: Client,
99
pub stream: String,
10+
pub super_stream: String,
11+
pub partitions: Vec<String>,
1012
}
1113

1214
pub struct TestEnvironment {
@@ -22,16 +24,67 @@ impl TestClient {
2224
let response = client.create_stream(&stream, HashMap::new()).await.unwrap();
2325

2426
assert_eq!(&ResponseCode::Ok, response.code());
25-
TestClient { client, stream }
27+
TestClient {
28+
client,
29+
stream,
30+
super_stream: String::new(),
31+
partitions: Vec::new(),
32+
}
33+
}
34+
35+
pub async fn create_super_stream() -> TestClient {
36+
let super_stream: String = Faker.fake();
37+
let client = Client::connect(ClientOptions::default()).await.unwrap();
38+
39+
let partitions: Vec<String> = [
40+
super_stream.to_string() + "-0",
41+
super_stream.to_string() + "-1",
42+
super_stream.to_string() + "-2",
43+
]
44+
.iter()
45+
.map(|x| x.into())
46+
.collect();
47+
48+
let binding_keys: Vec<String> = ["0", "1", "2"].iter().map(|&x| x.into()).collect();
49+
50+
let response = client
51+
.create_super_stream(
52+
&super_stream,
53+
partitions.clone(),
54+
binding_keys,
55+
HashMap::new(),
56+
)
57+
.await
58+
.unwrap();
59+
60+
assert_eq!(&ResponseCode::Ok, response.code());
61+
TestClient {
62+
client,
63+
stream: String::new(),
64+
super_stream,
65+
partitions,
66+
}
2667
}
2768
}
2869

2970
impl Drop for TestClient {
3071
fn drop(&mut self) {
31-
tokio::task::block_in_place(|| {
32-
tokio::runtime::Handle::current()
33-
.block_on(async { self.client.delete_stream(&self.stream).await.unwrap() })
34-
});
72+
if self.stream != "" {
73+
tokio::task::block_in_place(|| {
74+
tokio::runtime::Handle::current()
75+
.block_on(async { self.client.delete_stream(&self.stream).await.unwrap() })
76+
});
77+
}
78+
if self.super_stream != "" {
79+
tokio::task::block_in_place(|| {
80+
tokio::runtime::Handle::current().block_on(async {
81+
self.client
82+
.delete_super_stream(&self.super_stream)
83+
.await
84+
.unwrap()
85+
})
86+
});
87+
}
3588
}
3689
}
3790

0 commit comments

Comments
 (0)