Skip to content

Commit b99b2cf

Browse files
Merge branch 'main' into implement_super_stream
2 parents a3aa351 + f2aca24 commit b99b2cf

File tree

6 files changed

+87
-4
lines changed

6 files changed

+87
-4
lines changed

src/client/dispatcher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ mod tests {
344344

345345
#[tokio::test]
346346
async fn should_reject_requests_after_closing() {
347-
let mock_source = MockIO::push();
347+
let _mock_source = MockIO::push();
348348

349349
let dispatcher = Dispatcher::with_handler(|_| async { Ok(()) });
350350

src/consumer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ impl ConsumerBuilder {
120120
}
121121
}
122122
} else {
123+
client.close().await?;
123124
client = Client::connect(ClientOptions {
124125
host: replica.host.clone(),
125126
port: replica.port as u16,
@@ -271,6 +272,7 @@ impl ConsumerHandle {
271272
let response = self.0.client.unsubscribe(self.0.subscription_id).await?;
272273
if response.is_ok() {
273274
self.0.waker.wake();
275+
self.0.client.close().await?;
274276
Ok(())
275277
} else {
276278
Err(ConsumerCloseError::Close {

src/producer.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,10 @@ impl<T> ProducerBuilder<T> {
181181

182182
let publish_sequence = if let Some(name) = self.name {
183183
let sequence = client.query_publisher_sequence(&name, stream).await?;
184-
Arc::new(AtomicU64::new(sequence))
184+
185+
let first_sequence = if sequence == 0 { 0 } else { sequence + 1 };
186+
187+
Arc::new(AtomicU64::new(first_sequence))
185188
} else {
186189
Arc::new(AtomicU64::new(0))
187190
};

tests/integration/client_test.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ async fn client_create_super_stream_error_test() {
6363

6464
assert_eq!(&ResponseCode::StreamAlreadyExists, response.code());
6565
}
66+
67+
#[tokio::test(flavor = "multi_thread")]
6668
async fn client_delete_stream_test() {
6769
let test = TestClient::create().await;
6870

tests/integration/common.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
use std::collections::HashMap;
1+
use std::{collections::HashMap, future::Future, sync::Arc};
22

33
use fake::{Fake, Faker};
44
use rabbitmq_stream_client::{Client, ClientOptions, Environment};
55
use rabbitmq_stream_protocol::commands::generic::GenericResponse;
66
use rabbitmq_stream_protocol::ResponseCode;
7+
use tokio::sync::Semaphore;
78

89
pub struct TestClient {
910
pub client: Client,
@@ -12,6 +13,28 @@ pub struct TestClient {
1213
pub partitions: Vec<String>,
1314
}
1415

16+
#[derive(Clone)]
17+
pub struct Countdown(Arc<Semaphore>);
18+
19+
impl Drop for Countdown {
20+
fn drop(&mut self) {
21+
self.0.add_permits(1);
22+
}
23+
}
24+
25+
impl Countdown {
26+
pub fn new(n: u32) -> (Self, impl Future + Send) {
27+
let sem = Arc::new(Semaphore::new(0));
28+
let latch = Self(sem.clone());
29+
30+
let wait = async move {
31+
let _ = sem.acquire_many(n).await;
32+
};
33+
34+
(latch, wait)
35+
}
36+
}
37+
1538
pub struct TestEnvironment {
1639
pub env: Environment,
1740
pub stream: String,

tests/integration/producer_test.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
use std::{collections::HashSet, sync::Arc};
2+
13
use chrono::Utc;
24
use fake::{Fake, Faker};
3-
use futures::StreamExt;
5+
use futures::{lock::Mutex, StreamExt};
46
use tokio::sync::mpsc::channel;
57

68
use rabbitmq_stream_client::types::{
@@ -41,6 +43,57 @@ async fn producer_send_no_name_ok() {
4143
consumer.handle().close().await.unwrap();
4244
}
4345

46+
#[tokio::test(flavor = "multi_thread")]
47+
async fn producer_send_name_deduplication_unique_ids() {
48+
let env = TestEnvironment::create().await;
49+
50+
type Ids = Arc<Mutex<HashSet<u64>>>;
51+
52+
let ids = Ids::default();
53+
54+
let run = move |times, env: Environment, stream: String, ids: Ids| async move {
55+
let (countdown, wait) = Countdown::new(times);
56+
let mut producer = env
57+
.producer()
58+
.name("my_producer")
59+
.build(&stream)
60+
.await
61+
.unwrap();
62+
63+
for _ in 0..times {
64+
let cloned_ids = ids.clone();
65+
let countdown = countdown.clone();
66+
let _ = producer
67+
.send(
68+
Message::builder().body(b"message".to_vec()).build(),
69+
move |result| {
70+
let value = cloned_ids.clone();
71+
72+
let countdown = countdown.clone();
73+
async move {
74+
let _countdown = countdown;
75+
let id = result.unwrap().publishing_id();
76+
let mut guard = value.lock().await;
77+
guard.insert(id);
78+
}
79+
},
80+
)
81+
.await
82+
.unwrap();
83+
}
84+
85+
wait.await;
86+
87+
producer.close().await.unwrap();
88+
};
89+
90+
run(10, env.env.clone(), env.stream.to_string(), ids.clone()).await;
91+
92+
run(10, env.env.clone(), env.stream.to_string(), ids.clone()).await;
93+
94+
assert_eq!(20, ids.lock().await.len());
95+
}
96+
4497
#[tokio::test(flavor = "multi_thread")]
4598
async fn producer_send_name_with_deduplication_ok() {
4699
let env = TestEnvironment::create().await;

0 commit comments

Comments
 (0)