diff --git a/src/producer.rs b/src/producer.rs index 3175c0dc..8b74045f 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -9,6 +9,7 @@ use std::{ }; use dashmap::DashMap; +use futures::executor::block_on; use futures::{future::BoxFuture, FutureExt}; use tokio::sync::mpsc::channel; use tokio::sync::{mpsc, Mutex}; @@ -76,6 +77,19 @@ pub struct ProducerInternal { #[derive(Clone)] pub struct Producer(Arc, PhantomData); +/// drop implementation for Producer needed to properly close the connection. +impl Drop for Producer { + fn drop(&mut self) { + if !self.is_closed() { + // producer connection must be closed before dropping. + // drop method can't be async, block it. + block_on(async { + let _ = self.close().await; + }) + } + } +} + /// Builder for [`Producer`] pub struct ProducerBuilder { pub(crate) environment: Environment, @@ -501,7 +515,7 @@ impl Producer { self.0.closed.load(Ordering::Relaxed) } // TODO handle producer state after close - pub async fn close(self) -> Result<(), ProducerCloseError> { + pub async fn close(&self) -> Result<(), ProducerCloseError> { match self .0 .closed diff --git a/tests/producer_test.rs b/tests/producer_test.rs index 77fddd9a..4b2610b1 100644 --- a/tests/producer_test.rs +++ b/tests/producer_test.rs @@ -683,6 +683,22 @@ async fn super_stream_producer_send_filtering_message() { } } +#[tokio::test(flavor = "multi_thread")] +async fn connect_and_drop() { + use tokio::time::Duration; + + let env = TestEnvironment::create().await; + for _count in 0..50 { + let producer = env.env.producer().build(&env.stream).await.unwrap(); + + drop(producer); + tokio::time::sleep(Duration::from_millis(50)).await; + } + + // all connections dropped, waiting a few seconds... + tokio::time::sleep(Duration::from_secs(5)).await; +} + #[tokio::test(flavor = "multi_thread")] async fn producer_drop_connection() { let _ = tracing_subscriber::fmt::try_init();