diff --git a/src/client/dispatcher.rs b/src/client/dispatcher.rs index 6d688604..c8b1291d 100644 --- a/src/client/dispatcher.rs +++ b/src/client/dispatcher.rs @@ -344,7 +344,7 @@ mod tests { #[tokio::test] async fn should_reject_requests_after_closing() { - let mock_source = MockIO::push(); + let _mock_source = MockIO::push(); let dispatcher = Dispatcher::with_handler(|_| async { Ok(()) }); diff --git a/src/producer.rs b/src/producer.rs index 89126795..17775b76 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -181,7 +181,10 @@ impl ProducerBuilder { let publish_sequence = if let Some(name) = self.name { let sequence = client.query_publisher_sequence(&name, stream).await?; - Arc::new(AtomicU64::new(sequence)) + + let first_sequence = if sequence == 0 { 0 } else { sequence + 1 }; + + Arc::new(AtomicU64::new(first_sequence)) } else { Arc::new(AtomicU64::new(0)) }; diff --git a/tests/integration/client_test.rs b/tests/integration/client_test.rs index 70368458..822fd66e 100644 --- a/tests/integration/client_test.rs +++ b/tests/integration/client_test.rs @@ -42,7 +42,7 @@ async fn client_create_stream_error_test() { #[tokio::test(flavor = "multi_thread")] async fn client_create_and_delete_super_stream_test() { - let test = TestClient::create_super_stream().await; + let _test = TestClient::create_super_stream().await; } #[tokio::test(flavor = "multi_thread")] @@ -63,6 +63,8 @@ async fn client_create_super_stream_error_test() { assert_eq!(&ResponseCode::StreamAlreadyExists, response.code()); } + +#[tokio::test(flavor = "multi_thread")] async fn client_delete_stream_test() { let test = TestClient::create().await; diff --git a/tests/integration/common.rs b/tests/integration/common.rs index 98b4fec2..5c6782c3 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -1,8 +1,9 @@ -use std::collections::HashMap; +use std::{collections::HashMap, future::Future, sync::Arc}; use fake::{Fake, Faker}; use rabbitmq_stream_client::{Client, ClientOptions, Environment}; use rabbitmq_stream_protocol::ResponseCode; +use tokio::sync::Semaphore; pub struct TestClient { pub client: Client, @@ -11,6 +12,28 @@ pub struct TestClient { pub partitions: Vec, } +#[derive(Clone)] +pub struct Countdown(Arc); + +impl Drop for Countdown { + fn drop(&mut self) { + self.0.add_permits(1); + } +} + +impl Countdown { + pub fn new(n: u32) -> (Self, impl Future + Send) { + let sem = Arc::new(Semaphore::new(0)); + let latch = Self(sem.clone()); + + let wait = async move { + let _ = sem.acquire_many(n).await; + }; + + (latch, wait) + } +} + pub struct TestEnvironment { pub env: Environment, pub stream: String, diff --git a/tests/integration/consumer_test.rs b/tests/integration/consumer_test.rs index 035c8c68..847c1074 100644 --- a/tests/integration/consumer_test.rs +++ b/tests/integration/consumer_test.rs @@ -332,7 +332,7 @@ async fn consumer_test_with_store_offset() { // Store an offset if i == offset_to_store { //Store the 5th element produced - let result = consumer_store + let _result = consumer_store .store_offset(delivery.unwrap().offset()) .await; } diff --git a/tests/integration/producer_test.rs b/tests/integration/producer_test.rs index 2caf5751..1de01809 100644 --- a/tests/integration/producer_test.rs +++ b/tests/integration/producer_test.rs @@ -1,11 +1,16 @@ +use std::{collections::HashSet, sync::Arc}; + use chrono::Utc; use fake::{Fake, Faker}; -use futures::StreamExt; +use futures::{lock::Mutex, StreamExt}; use tokio::sync::mpsc::channel; -use rabbitmq_stream_client::types::{Message, OffsetSpecification, SimpleValue}; +use rabbitmq_stream_client::{ + types::{Message, OffsetSpecification, SimpleValue}, + Environment, +}; -use crate::common::TestEnvironment; +use crate::common::{Countdown, TestEnvironment}; #[tokio::test(flavor = "multi_thread")] async fn producer_send_no_name_ok() { @@ -35,6 +40,57 @@ async fn producer_send_no_name_ok() { consumer.handle().close().await.unwrap(); } +#[tokio::test(flavor = "multi_thread")] +async fn producer_send_name_deduplication_unique_ids() { + let env = TestEnvironment::create().await; + + type Ids = Arc>>; + + let ids = Ids::default(); + + let run = move |times, env: Environment, stream: String, ids: Ids| async move { + let (countdown, wait) = Countdown::new(times); + let mut producer = env + .producer() + .name("my_producer") + .build(&stream) + .await + .unwrap(); + + for _ in 0..times { + let cloned_ids = ids.clone(); + let countdown = countdown.clone(); + let _ = producer + .send( + Message::builder().body(b"message".to_vec()).build(), + move |result| { + let value = cloned_ids.clone(); + + let countdown = countdown.clone(); + async move { + let _countdown = countdown; + let id = result.unwrap().publishing_id(); + let mut guard = value.lock().await; + guard.insert(id); + } + }, + ) + .await + .unwrap(); + } + + wait.await; + + producer.close().await.unwrap(); + }; + + run(10, env.env.clone(), env.stream.to_string(), ids.clone()).await; + + run(10, env.env.clone(), env.stream.to_string(), ids.clone()).await; + + assert_eq!(20, ids.lock().await.len()); +} + #[tokio::test(flavor = "multi_thread")] async fn producer_send_name_with_deduplication_ok() { let env = TestEnvironment::create().await;