Skip to content

Commit 656600b

Browse files
authored
fixes #233 by incrementing the publisher sequence (#234)
1 parent d705bfb commit 656600b

File tree

6 files changed

+92
-8
lines changed

6 files changed

+92
-8
lines changed

src/client/dispatcher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ mod tests {
344344

345345
#[tokio::test]
346346
async fn should_reject_requests_after_closing() {
347-
let mock_source = MockIO::push();
347+
let _mock_source = MockIO::push();
348348

349349
let dispatcher = Dispatcher::with_handler(|_| async { Ok(()) });
350350

src/producer.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,10 @@ impl<T> ProducerBuilder<T> {
181181

182182
let publish_sequence = if let Some(name) = self.name {
183183
let sequence = client.query_publisher_sequence(&name, stream).await?;
184-
Arc::new(AtomicU64::new(sequence))
184+
185+
let first_sequence = if sequence == 0 { 0 } else { sequence + 1 };
186+
187+
Arc::new(AtomicU64::new(first_sequence))
185188
} else {
186189
Arc::new(AtomicU64::new(0))
187190
};

tests/integration/client_test.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ async fn client_create_stream_error_test() {
4242

4343
#[tokio::test(flavor = "multi_thread")]
4444
async fn client_create_and_delete_super_stream_test() {
45-
let test = TestClient::create_super_stream().await;
45+
let _test = TestClient::create_super_stream().await;
4646
}
4747

4848
#[tokio::test(flavor = "multi_thread")]
@@ -63,6 +63,8 @@ async fn client_create_super_stream_error_test() {
6363

6464
assert_eq!(&ResponseCode::StreamAlreadyExists, response.code());
6565
}
66+
67+
#[tokio::test(flavor = "multi_thread")]
6668
async fn client_delete_stream_test() {
6769
let test = TestClient::create().await;
6870

tests/integration/common.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
use std::collections::HashMap;
1+
use std::{collections::HashMap, future::Future, sync::Arc};
22

33
use fake::{Fake, Faker};
44
use rabbitmq_stream_client::{Client, ClientOptions, Environment};
55
use rabbitmq_stream_protocol::ResponseCode;
6+
use tokio::sync::Semaphore;
67

78
pub struct TestClient {
89
pub client: Client,
@@ -11,6 +12,28 @@ pub struct TestClient {
1112
pub partitions: Vec<String>,
1213
}
1314

15+
#[derive(Clone)]
16+
pub struct Countdown(Arc<Semaphore>);
17+
18+
impl Drop for Countdown {
19+
fn drop(&mut self) {
20+
self.0.add_permits(1);
21+
}
22+
}
23+
24+
impl Countdown {
25+
pub fn new(n: u32) -> (Self, impl Future + Send) {
26+
let sem = Arc::new(Semaphore::new(0));
27+
let latch = Self(sem.clone());
28+
29+
let wait = async move {
30+
let _ = sem.acquire_many(n).await;
31+
};
32+
33+
(latch, wait)
34+
}
35+
}
36+
1437
pub struct TestEnvironment {
1538
pub env: Environment,
1639
pub stream: String,

tests/integration/consumer_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ async fn consumer_test_with_store_offset() {
332332
// Store an offset
333333
if i == offset_to_store {
334334
//Store the 5th element produced
335-
let result = consumer_store
335+
let _result = consumer_store
336336
.store_offset(delivery.unwrap().offset())
337337
.await;
338338
}

tests/integration/producer_test.rs

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1+
use std::{collections::HashSet, sync::Arc};
2+
13
use chrono::Utc;
24
use fake::{Fake, Faker};
3-
use futures::StreamExt;
5+
use futures::{lock::Mutex, StreamExt};
46
use tokio::sync::mpsc::channel;
57

6-
use rabbitmq_stream_client::types::{Message, OffsetSpecification, SimpleValue};
8+
use rabbitmq_stream_client::{
9+
types::{Message, OffsetSpecification, SimpleValue},
10+
Environment,
11+
};
712

8-
use crate::common::TestEnvironment;
13+
use crate::common::{Countdown, TestEnvironment};
914

1015
#[tokio::test(flavor = "multi_thread")]
1116
async fn producer_send_no_name_ok() {
@@ -35,6 +40,57 @@ async fn producer_send_no_name_ok() {
3540
consumer.handle().close().await.unwrap();
3641
}
3742

43+
#[tokio::test(flavor = "multi_thread")]
44+
async fn producer_send_name_deduplication_unique_ids() {
45+
let env = TestEnvironment::create().await;
46+
47+
type Ids = Arc<Mutex<HashSet<u64>>>;
48+
49+
let ids = Ids::default();
50+
51+
let run = move |times, env: Environment, stream: String, ids: Ids| async move {
52+
let (countdown, wait) = Countdown::new(times);
53+
let mut producer = env
54+
.producer()
55+
.name("my_producer")
56+
.build(&stream)
57+
.await
58+
.unwrap();
59+
60+
for _ in 0..times {
61+
let cloned_ids = ids.clone();
62+
let countdown = countdown.clone();
63+
let _ = producer
64+
.send(
65+
Message::builder().body(b"message".to_vec()).build(),
66+
move |result| {
67+
let value = cloned_ids.clone();
68+
69+
let countdown = countdown.clone();
70+
async move {
71+
let _countdown = countdown;
72+
let id = result.unwrap().publishing_id();
73+
let mut guard = value.lock().await;
74+
guard.insert(id);
75+
}
76+
},
77+
)
78+
.await
79+
.unwrap();
80+
}
81+
82+
wait.await;
83+
84+
producer.close().await.unwrap();
85+
};
86+
87+
run(10, env.env.clone(), env.stream.to_string(), ids.clone()).await;
88+
89+
run(10, env.env.clone(), env.stream.to_string(), ids.clone()).await;
90+
91+
assert_eq!(20, ids.lock().await.len());
92+
}
93+
3894
#[tokio::test(flavor = "multi_thread")]
3995
async fn producer_send_name_with_deduplication_ok() {
4096
let env = TestEnvironment::create().await;

0 commit comments

Comments
 (0)