From b73c95cc17644edbd1dc5bd8549e99e8b4174faa Mon Sep 17 00:00:00 2001 From: Tommaso Allevi Date: Mon, 17 Feb 2025 12:38:02 +0100 Subject: [PATCH 1/2] FnOnce as Producer::send callback --- src/producer.rs | 164 +++++++++++++++++------------ tests/integration/producer_test.rs | 34 +++++- 2 files changed, 127 insertions(+), 71 deletions(-) diff --git a/src/producer.rs b/src/producer.rs index 96c94a0b..29913bdd 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -28,12 +28,6 @@ use crate::{ type WaiterMap = Arc>; type FilterValueExtractor = Arc String + 'static + Send + Sync>; -type ConfirmCallback = Arc< - dyn Fn(Result) -> BoxFuture<'static, ()> - + Send - + Sync, ->; - #[derive(Debug)] pub struct ConfirmationStatus { publishing_id: u64, @@ -323,7 +317,7 @@ impl Producer { pub async fn send( &self, message: Message, - cb: impl Fn(Result) -> Fut + Send + Sync + 'static, + cb: impl FnOnce(Result) -> Fut + Send + Sync + 'static, ) -> Result<(), ProducerPublishError> where Fut: Future + Send + Sync + 'static, @@ -359,7 +353,7 @@ impl Producer { pub async fn send( &mut self, message: Message, - cb: impl Fn(Result) -> Fut + Send + Sync + 'static, + cb: impl FnOnce(Result) -> Fut + Send + Sync + 'static, ) -> Result<(), ProducerPublishError> where Fut: Future + Send + Sync + 'static, @@ -430,7 +424,7 @@ impl Producer { async fn do_send( &self, message: Message, - cb: impl Fn(Result) -> Fut + Send + Sync + 'static, + cb: impl FnOnce(Result) -> Fut + Send + Sync + 'static, ) -> Result<(), ProducerPublishError> where Fut: Future + Send + Sync + 'static, @@ -442,7 +436,7 @@ impl Producer { async fn internal_send( &self, message: Message, - cb: impl Fn(Result) -> Fut + Send + Sync + 'static, + cb: impl FnOnce(Result) -> Fut + Send + Sync + 'static, ) -> Result<(), ProducerPublishError> where Fut: Future + Send + Sync + 'static, @@ -460,8 +454,10 @@ impl Producer { msg.filter_value_extract(f.as_ref()) } - let waiter = ProducerMessageWaiter::waiter_with_cb(cb, message); - self.0.waiting_confirmations.insert(publishing_id, waiter); + let waiter = OnceProducerMessageWaiter::waiter_with_cb(cb, message); + self.0 + .waiting_confirmations + .insert(publishing_id, ProducerMessageWaiter::Once(waiter)); if self.0.accumulator.add(msg).await? { self.0.batch_send().await?; @@ -485,7 +481,9 @@ impl Producer { let mut wrapped_msgs = Vec::with_capacity(messages.len()); for message in messages { - let waiter = ProducerMessageWaiter::waiter_with_arc_cb(arc_cb.clone(), message.clone()); + let waiter = + SharedProducerMessageWaiter::waiter_with_arc_cb(arc_cb.clone(), message.clone()); + let publishing_id = match message.publishing_id() { Some(publishing_id) => *publishing_id, None => self.0.publish_sequence.fetch_add(1, Ordering::Relaxed), @@ -499,7 +497,7 @@ impl Producer { self.0 .waiting_confirmations - .insert(publishing_id, waiter.clone()); + .insert(publishing_id, ProducerMessageWaiter::Shared(waiter.clone())); } self.0 @@ -542,19 +540,6 @@ struct ProducerConfirmHandler { metrics_collector: Arc, } -impl ProducerConfirmHandler { - async fn with_waiter( - &self, - publishing_id: u64, - cb: impl FnOnce(ProducerMessageWaiter) -> BoxFuture<'static, ()>, - ) { - match self.waiting_confirmations.remove(&publishing_id) { - Some(confirm_sender) => cb(confirm_sender.1).await, - None => todo!(), - } - } -} - #[async_trait::async_trait] impl MessageHandler for ProducerConfirmHandler { async fn handle_message(&self, item: MessageResult) -> RabbitMQStreamResult<()> { @@ -566,13 +551,33 @@ impl MessageHandler for ProducerConfirmHandler { let confirm_len = confirm.publishing_ids.len(); for publishing_id in &confirm.publishing_ids { let id = *publishing_id; - self.with_waiter(*publishing_id, move |waiter| { - async move { - let _ = waiter.handle_confirm(id).await; + + let waiter = match self.waiting_confirmations.remove(publishing_id) { + Some((_, confirm_sender)) => confirm_sender, + None => todo!(), + }; + match waiter { + ProducerMessageWaiter::Once(waiter) => { + let cb = waiter.cb; + cb(Ok(ConfirmationStatus { + publishing_id: id, + confirmed: true, + status: ResponseCode::Ok, + message: waiter.msg, + })) + .await; } - .boxed() - }) - .await; + ProducerMessageWaiter::Shared(waiter) => { + let cb = waiter.cb; + cb(Ok(ConfirmationStatus { + publishing_id: id, + confirmed: true, + status: ResponseCode::Ok, + message: waiter.msg, + })) + .await; + } + } } self.metrics_collector .publish_confirm(confirm_len as u64) @@ -583,13 +588,33 @@ impl MessageHandler for ProducerConfirmHandler { for err in &error.publishing_errors { let code = err.error_code.clone(); let id = err.publishing_id; - self.with_waiter(err.publishing_id, move |waiter| { - async move { - let _ = waiter.handle_error(id, code).await; + + let waiter = match self.waiting_confirmations.remove(&id) { + Some((_, confirm_sender)) => confirm_sender, + None => todo!(), + }; + match waiter { + ProducerMessageWaiter::Once(waiter) => { + let cb = waiter.cb; + cb(Ok(ConfirmationStatus { + publishing_id: id, + confirmed: false, + status: code, + message: waiter.msg, + })) + .await; } - .boxed() - }) - .await; + ProducerMessageWaiter::Shared(waiter) => { + let cb = waiter.cb; + cb(Ok(ConfirmationStatus { + publishing_id: id, + confirmed: false, + status: code, + message: waiter.msg, + })) + .await; + } + } } } _ => {} @@ -608,54 +633,53 @@ impl MessageHandler for ProducerConfirmHandler { } } -#[derive(Clone)] -struct ProducerMessageWaiter { +type ConfirmCallback = Box< + dyn FnOnce(Result) -> BoxFuture<'static, ()> + + Send + + Sync, +>; + +type ArcConfirmCallback = Arc< + dyn Fn(Result) -> BoxFuture<'static, ()> + + Send + + Sync, +>; + +enum ProducerMessageWaiter { + Once(OnceProducerMessageWaiter), + Shared(SharedProducerMessageWaiter), +} + +struct OnceProducerMessageWaiter { cb: ConfirmCallback, msg: Message, } - -impl ProducerMessageWaiter { +impl OnceProducerMessageWaiter { fn waiter_with_cb( - cb: impl Fn(Result) -> Fut + Send + Sync + 'static, + cb: impl FnOnce(Result) -> Fut + Send + Sync + 'static, msg: Message, ) -> Self where Fut: Future + Send + Sync + 'static, { Self { - cb: Arc::new(move |confirm_status| cb(confirm_status).boxed()), + cb: Box::new(move |confirm_status| cb(confirm_status).boxed()), msg, } } +} - fn waiter_with_arc_cb(confirm_callback: ConfirmCallback, msg: Message) -> Self { +#[derive(Clone)] +struct SharedProducerMessageWaiter { + cb: ArcConfirmCallback, + msg: Message, +} + +impl SharedProducerMessageWaiter { + fn waiter_with_arc_cb(confirm_callback: ArcConfirmCallback, msg: Message) -> Self { Self { cb: confirm_callback, msg, } } - async fn handle_confirm(self, publishing_id: u64) -> RabbitMQStreamResult<()> { - (self.cb)(Ok(ConfirmationStatus { - publishing_id, - confirmed: true, - status: ResponseCode::Ok, - message: self.msg, - })) - .await; - Ok(()) - } - async fn handle_error( - self, - publishing_id: u64, - status: ResponseCode, - ) -> RabbitMQStreamResult<()> { - (self.cb)(Ok(ConfirmationStatus { - publishing_id, - confirmed: false, - status, - message: self.msg, - })) - .await; - Ok(()) - } } diff --git a/tests/integration/producer_test.rs b/tests/integration/producer_test.rs index 588ef203..c62c6c35 100644 --- a/tests/integration/producer_test.rs +++ b/tests/integration/producer_test.rs @@ -231,6 +231,38 @@ async fn producer_send_with_callback() { producer.close().await.unwrap(); } +#[tokio::test(flavor = "multi_thread")] +async fn producer_send_with_callback_can_drop() { + let env = TestEnvironment::create().await; + let reference: String = Faker.fake(); + + let mut producer = env + .env + .producer() + .name(&reference) + .build(&env.stream) + .await + .unwrap(); + + // A non copy structure + struct Foo; + + let f = Foo; + producer + .send( + Message::builder().body(b"message".to_vec()).build(), + move |_| { + // this callback is an FnOnce, so we can drop a value here + drop(f); + async {} + }, + ) + .await + .unwrap(); + + producer.close().await.unwrap(); +} + #[tokio::test(flavor = "multi_thread")] async fn producer_batch_send_with_callback() { let env = TestEnvironment::create().await; @@ -449,7 +481,7 @@ async fn producer_send_after_close_error() { ); } -pub fn routing_key_strategy_value_extractor(message: &Message) -> String { +pub fn routing_key_strategy_value_extractor(_: &Message) -> String { return "0".to_string(); } From 66cf3089c397206a763e81c129b2796ae5884035 Mon Sep 17 00:00:00 2001 From: Tommaso Allevi Date: Mon, 17 Feb 2025 14:13:43 +0100 Subject: [PATCH 2/2] Update fake deps --- protocol/Cargo.toml | 3 +-- protocol/src/commands/close.rs | 3 --- protocol/src/commands/consumer_update.rs | 3 --- protocol/src/commands/consumer_update_request.rs | 3 --- protocol/src/commands/create_stream.rs | 3 --- protocol/src/commands/create_super_stream.rs | 3 --- protocol/src/commands/credit.rs | 3 --- protocol/src/commands/declare_publisher.rs | 3 --- protocol/src/commands/delete.rs | 3 --- protocol/src/commands/delete_publisher.rs | 3 --- protocol/src/commands/delete_super_stream.rs | 4 ---- protocol/src/commands/deliver.rs | 4 +--- protocol/src/commands/exchange_command_versions.rs | 3 --- protocol/src/commands/generic.rs | 3 --- protocol/src/commands/metadata.rs | 2 -- protocol/src/commands/metadata_update.rs | 2 -- protocol/src/commands/open.rs | 3 --- protocol/src/commands/peer_properties.rs | 3 --- protocol/src/commands/publish.rs | 3 --- protocol/src/commands/publish_confirm.rs | 3 --- protocol/src/commands/publish_error.rs | 3 --- protocol/src/commands/query_offset.rs | 3 --- protocol/src/commands/query_publisher_sequence.rs | 3 --- protocol/src/commands/sasl_authenticate.rs | 3 --- protocol/src/commands/sasl_handshake.rs | 3 --- protocol/src/commands/store_offset.rs | 3 --- protocol/src/commands/subscribe.rs | 3 --- protocol/src/commands/superstream_partitions.rs | 3 --- protocol/src/commands/superstream_route.rs | 3 --- protocol/src/commands/tune.rs | 3 --- protocol/src/commands/unsubscribe.rs | 3 --- protocol/src/message/amqp/body.rs | 2 +- protocol/src/message/amqp/header.rs | 3 --- protocol/src/message/amqp/message.rs | 3 --- protocol/src/message/amqp/mod.rs | 2 +- protocol/src/message/amqp/properties.rs | 3 --- protocol/src/message/amqp/types/annotations.rs | 2 -- protocol/src/message/amqp/types/descriptor.rs | 3 --- protocol/src/message/amqp/types/mod.rs | 3 --- protocol/src/message/amqp/types/primitives/list.rs | 3 +-- protocol/src/message/amqp/types/primitives/map.rs | 9 ++++----- protocol/src/message/amqp/types/primitives/simple.rs | 9 +++------ protocol/src/message/amqp/types/primitives/value.rs | 3 --- protocol/src/message/amqp/types/symbol.rs | 2 -- protocol/src/request/mod.rs | 6 +++--- protocol/src/response/mod.rs | 2 -- protocol/src/types.rs | 3 --- 47 files changed, 15 insertions(+), 136 deletions(-) diff --git a/protocol/Cargo.toml b/protocol/Cargo.toml index 9db6284f..c866e5e3 100644 --- a/protocol/Cargo.toml +++ b/protocol/Cargo.toml @@ -19,6 +19,5 @@ num_enum = "0.7.0" derive_more = "0.99" [dev-dependencies] -fake = { version = "3.0", features=['derive', 'chrono','uuid']} -rand = "0.8" pretty_assertions = "1.2.0" +fake = { version = "4.0", features = [ "derive", "chrono", "uuid" ] } diff --git a/protocol/src/commands/close.rs b/protocol/src/commands/close.rs index a00988a2..66245af3 100644 --- a/protocol/src/commands/close.rs +++ b/protocol/src/commands/close.rs @@ -17,9 +17,6 @@ use crate::{ use super::Command; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct CloseRequest { diff --git a/protocol/src/commands/consumer_update.rs b/protocol/src/commands/consumer_update.rs index 6d074727..79bdde80 100644 --- a/protocol/src/commands/consumer_update.rs +++ b/protocol/src/commands/consumer_update.rs @@ -1,8 +1,5 @@ use std::io::Write; -#[cfg(test)] -use fake::Fake; - use crate::{ codec::{Decoder, Encoder}, error::{DecodeError, EncodeError}, diff --git a/protocol/src/commands/consumer_update_request.rs b/protocol/src/commands/consumer_update_request.rs index 38c1f5f9..4ed26e8e 100644 --- a/protocol/src/commands/consumer_update_request.rs +++ b/protocol/src/commands/consumer_update_request.rs @@ -1,8 +1,5 @@ use std::io::Write; -#[cfg(test)] -use fake::Fake; - use crate::{ codec::{Decoder, Encoder}, error::{DecodeError, EncodeError}, diff --git a/protocol/src/commands/create_stream.rs b/protocol/src/commands/create_stream.rs index d7591042..f0f2bdfc 100644 --- a/protocol/src/commands/create_stream.rs +++ b/protocol/src/commands/create_stream.rs @@ -9,9 +9,6 @@ use crate::{ use super::Command; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct CreateStreamCommand { diff --git a/protocol/src/commands/create_super_stream.rs b/protocol/src/commands/create_super_stream.rs index f6b1e532..9cb7e156 100644 --- a/protocol/src/commands/create_super_stream.rs +++ b/protocol/src/commands/create_super_stream.rs @@ -1,9 +1,6 @@ use std::collections::HashMap; use std::io::Write; -#[cfg(test)] -use fake::Fake; - use crate::{ codec::{Decoder, Encoder}, error::{DecodeError, EncodeError}, diff --git a/protocol/src/commands/credit.rs b/protocol/src/commands/credit.rs index 7eaf9f06..63cc2f20 100644 --- a/protocol/src/commands/credit.rs +++ b/protocol/src/commands/credit.rs @@ -1,8 +1,5 @@ use std::io::Write; -#[cfg(test)] -use fake::Fake; - use crate::{ codec::{Decoder, Encoder}, error::{DecodeError, EncodeError}, diff --git a/protocol/src/commands/declare_publisher.rs b/protocol/src/commands/declare_publisher.rs index 81191c9a..f88787a9 100644 --- a/protocol/src/commands/declare_publisher.rs +++ b/protocol/src/commands/declare_publisher.rs @@ -8,9 +8,6 @@ use crate::{ use super::Command; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct DeclarePublisherCommand { diff --git a/protocol/src/commands/delete.rs b/protocol/src/commands/delete.rs index 83180e9b..6a582f3c 100644 --- a/protocol/src/commands/delete.rs +++ b/protocol/src/commands/delete.rs @@ -8,9 +8,6 @@ use crate::{ use super::Command; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct Delete { diff --git a/protocol/src/commands/delete_publisher.rs b/protocol/src/commands/delete_publisher.rs index dcf5ffb4..7c5d0e3d 100644 --- a/protocol/src/commands/delete_publisher.rs +++ b/protocol/src/commands/delete_publisher.rs @@ -8,9 +8,6 @@ use crate::{ use super::Command; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct DeletePublisherCommand { diff --git a/protocol/src/commands/delete_super_stream.rs b/protocol/src/commands/delete_super_stream.rs index 21a57994..116cff0c 100644 --- a/protocol/src/commands/delete_super_stream.rs +++ b/protocol/src/commands/delete_super_stream.rs @@ -1,8 +1,5 @@ use std::io::Write; -#[cfg(test)] -use fake::Fake; - use crate::{ codec::{Decoder, Encoder}, error::{DecodeError, EncodeError}, @@ -62,7 +59,6 @@ impl Command for DeleteSuperStreamCommand { #[cfg(test)] mod tests { - use crate::commands::create_super_stream::CreateSuperStreamCommand; use crate::commands::tests::command_encode_decode_test; use super::DeleteSuperStreamCommand; diff --git a/protocol/src/commands/deliver.rs b/protocol/src/commands/deliver.rs index ba14d0ed..65c69e0e 100644 --- a/protocol/src/commands/deliver.rs +++ b/protocol/src/commands/deliver.rs @@ -9,8 +9,6 @@ use crate::{ protocol::commands::COMMAND_DELIVER, }; use byteorder::{BigEndian, WriteBytesExt}; -#[cfg(test)] -use fake::Fake; #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug, Clone)] @@ -163,7 +161,7 @@ mod tests { use super::{DeliverCommand, Message}; impl Dummy for Message { - fn dummy_with_rng(_config: &Faker, _rng: &mut R) -> Self { + fn dummy_with_rng(_config: &Faker, _rng: &mut R) -> Self { Message::new(InternalMessage::default()) } } diff --git a/protocol/src/commands/exchange_command_versions.rs b/protocol/src/commands/exchange_command_versions.rs index 786e6495..77cad79e 100644 --- a/protocol/src/commands/exchange_command_versions.rs +++ b/protocol/src/commands/exchange_command_versions.rs @@ -10,9 +10,6 @@ use crate::{ use super::Command; use byteorder::{BigEndian, WriteBytesExt}; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct ExchangeCommandVersion(u16, u16, u16); diff --git a/protocol/src/commands/generic.rs b/protocol/src/commands/generic.rs index 4ddb527f..387c4be9 100644 --- a/protocol/src/commands/generic.rs +++ b/protocol/src/commands/generic.rs @@ -6,9 +6,6 @@ use crate::{ FromResponse, ResponseCode, ResponseKind, }; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct GenericResponse { diff --git a/protocol/src/commands/metadata.rs b/protocol/src/commands/metadata.rs index e753dcb8..9e6589a0 100644 --- a/protocol/src/commands/metadata.rs +++ b/protocol/src/commands/metadata.rs @@ -10,8 +10,6 @@ use crate::{ use super::Command; use byteorder::{BigEndian, WriteBytesExt}; -#[cfg(test)] -use fake::Fake; #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] diff --git a/protocol/src/commands/metadata_update.rs b/protocol/src/commands/metadata_update.rs index 428b8bc5..c888118b 100644 --- a/protocol/src/commands/metadata_update.rs +++ b/protocol/src/commands/metadata_update.rs @@ -5,8 +5,6 @@ use crate::{ use super::Command; use crate::codec::Encoder; -#[cfg(test)] -use fake::Fake; #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] diff --git a/protocol/src/commands/open.rs b/protocol/src/commands/open.rs index 827f9eeb..2440d774 100644 --- a/protocol/src/commands/open.rs +++ b/protocol/src/commands/open.rs @@ -10,9 +10,6 @@ use crate::{ use super::Command; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct OpenCommand { diff --git a/protocol/src/commands/peer_properties.rs b/protocol/src/commands/peer_properties.rs index 5976afbd..f6d9658c 100644 --- a/protocol/src/commands/peer_properties.rs +++ b/protocol/src/commands/peer_properties.rs @@ -10,9 +10,6 @@ use crate::{ use super::Command; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct PeerPropertiesCommand { diff --git a/protocol/src/commands/publish.rs b/protocol/src/commands/publish.rs index 7c8f3f3b..39c785ce 100644 --- a/protocol/src/commands/publish.rs +++ b/protocol/src/commands/publish.rs @@ -11,9 +11,6 @@ use super::Command; use crate::types::PublishedMessage; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct PublishCommand { diff --git a/protocol/src/commands/publish_confirm.rs b/protocol/src/commands/publish_confirm.rs index 378281c6..6336dc88 100644 --- a/protocol/src/commands/publish_confirm.rs +++ b/protocol/src/commands/publish_confirm.rs @@ -8,9 +8,6 @@ use crate::{ use super::Command; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct PublishConfirm { diff --git a/protocol/src/commands/publish_error.rs b/protocol/src/commands/publish_error.rs index 27d7f567..d826435c 100644 --- a/protocol/src/commands/publish_error.rs +++ b/protocol/src/commands/publish_error.rs @@ -10,9 +10,6 @@ use std::io::Write; use super::Command; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct PublishErrorResponse { diff --git a/protocol/src/commands/query_offset.rs b/protocol/src/commands/query_offset.rs index 2c9a9d9f..ec9e9951 100644 --- a/protocol/src/commands/query_offset.rs +++ b/protocol/src/commands/query_offset.rs @@ -8,9 +8,6 @@ use std::io::Write; use super::Command; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct QueryOffsetRequest { diff --git a/protocol/src/commands/query_publisher_sequence.rs b/protocol/src/commands/query_publisher_sequence.rs index 218ec7de..71bef33e 100644 --- a/protocol/src/commands/query_publisher_sequence.rs +++ b/protocol/src/commands/query_publisher_sequence.rs @@ -8,9 +8,6 @@ use std::io::Write; use super::Command; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct QueryPublisherRequest { diff --git a/protocol/src/commands/sasl_authenticate.rs b/protocol/src/commands/sasl_authenticate.rs index 81978127..8580c474 100644 --- a/protocol/src/commands/sasl_authenticate.rs +++ b/protocol/src/commands/sasl_authenticate.rs @@ -8,9 +8,6 @@ use crate::{ use super::Command; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct SaslAuthenticateCommand { diff --git a/protocol/src/commands/sasl_handshake.rs b/protocol/src/commands/sasl_handshake.rs index 33d95b97..d06427b9 100644 --- a/protocol/src/commands/sasl_handshake.rs +++ b/protocol/src/commands/sasl_handshake.rs @@ -10,9 +10,6 @@ use crate::{ use super::Command; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct SaslHandshakeCommand { diff --git a/protocol/src/commands/store_offset.rs b/protocol/src/commands/store_offset.rs index 3e613bb1..7cc785c2 100644 --- a/protocol/src/commands/store_offset.rs +++ b/protocol/src/commands/store_offset.rs @@ -7,9 +7,6 @@ use std::io::Write; use super::Command; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct StoreOffset { diff --git a/protocol/src/commands/subscribe.rs b/protocol/src/commands/subscribe.rs index f1d18bff..5c5df8db 100644 --- a/protocol/src/commands/subscribe.rs +++ b/protocol/src/commands/subscribe.rs @@ -1,9 +1,6 @@ use std::collections::HashMap; use std::io::Write; -#[cfg(test)] -use fake::Fake; - use crate::{ codec::{Decoder, Encoder}, error::{DecodeError, EncodeError}, diff --git a/protocol/src/commands/superstream_partitions.rs b/protocol/src/commands/superstream_partitions.rs index bee07a7c..a495bdf3 100644 --- a/protocol/src/commands/superstream_partitions.rs +++ b/protocol/src/commands/superstream_partitions.rs @@ -1,8 +1,5 @@ use std::io::Write; -#[cfg(test)] -use fake::Fake; - use super::Command; use crate::{ codec::{Decoder, Encoder}, diff --git a/protocol/src/commands/superstream_route.rs b/protocol/src/commands/superstream_route.rs index 6fa280d4..06da6533 100644 --- a/protocol/src/commands/superstream_route.rs +++ b/protocol/src/commands/superstream_route.rs @@ -1,8 +1,5 @@ use std::io::Write; -#[cfg(test)] -use fake::Fake; - use crate::{ codec::{Decoder, Encoder}, error::{DecodeError, EncodeError}, diff --git a/protocol/src/commands/tune.rs b/protocol/src/commands/tune.rs index c2c69bda..e30a16f3 100644 --- a/protocol/src/commands/tune.rs +++ b/protocol/src/commands/tune.rs @@ -9,9 +9,6 @@ use crate::{ use super::Command; -#[cfg(test)] -use fake::Fake; - #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] pub struct TunesCommand { diff --git a/protocol/src/commands/unsubscribe.rs b/protocol/src/commands/unsubscribe.rs index 6d7ea2c1..d6c182c3 100644 --- a/protocol/src/commands/unsubscribe.rs +++ b/protocol/src/commands/unsubscribe.rs @@ -1,8 +1,5 @@ use std::io::Write; -#[cfg(test)] -use fake::Fake; - use crate::{ codec::{Decoder, Encoder}, error::{DecodeError, EncodeError}, diff --git a/protocol/src/message/amqp/body.rs b/protocol/src/message/amqp/body.rs index 0ad73769..4e305106 100644 --- a/protocol/src/message/amqp/body.rs +++ b/protocol/src/message/amqp/body.rs @@ -78,7 +78,7 @@ mod tests { use super::MessageBody; impl Dummy for MessageBody { - fn dummy_with_rng(config: &Faker, rng: &mut R) -> Self { + fn dummy_with_rng(config: &Faker, rng: &mut R) -> Self { MessageBody { data: config.fake_with_rng(rng), sequence: config.fake_with_rng(rng), diff --git a/protocol/src/message/amqp/header.rs b/protocol/src/message/amqp/header.rs index 529b9e1e..9b363f55 100644 --- a/protocol/src/message/amqp/header.rs +++ b/protocol/src/message/amqp/header.rs @@ -11,9 +11,6 @@ use super::{ types::{list_decoder, Boolean, List}, AmqpDecodeError, AmqpDecoder, AmqpEncodeError, AmqpEncoder, }; -#[cfg(test)] -use fake::Fake; - /// Header of the message #[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(test, derive(fake::Dummy))] diff --git a/protocol/src/message/amqp/message.rs b/protocol/src/message/amqp/message.rs index 5d377496..df896b7c 100644 --- a/protocol/src/message/amqp/message.rs +++ b/protocol/src/message/amqp/message.rs @@ -11,9 +11,6 @@ use super::section::MessageSection; use super::types::{ApplicationProperties, DeliveryAnnotations, Footer, MessageAnnotations}; use super::{AmqpDecodeError, AmqpDecoder}; -#[cfg(test)] -use fake::Fake; - #[derive(Debug, Clone, Default, PartialEq, Eq)] #[cfg_attr(test, derive(fake::Dummy))] pub struct Message { diff --git a/protocol/src/message/amqp/mod.rs b/protocol/src/message/amqp/mod.rs index 6c6eca71..34a8cde0 100644 --- a/protocol/src/message/amqp/mod.rs +++ b/protocol/src/message/amqp/mod.rs @@ -52,7 +52,7 @@ mod tests { where T: Dummy + AmqpDecoder + AmqpEncoder + Debug + PartialEq, { - let mut rng = rand::thread_rng(); + let mut rng = fake::rand::rng(); let len: usize = DEFAULT_LEN_RANGE.fake_with_rng(&mut rng); for _ in 0..len { diff --git a/protocol/src/message/amqp/properties.rs b/protocol/src/message/amqp/properties.rs index 2bdd5fb8..5e5f5251 100644 --- a/protocol/src/message/amqp/properties.rs +++ b/protocol/src/message/amqp/properties.rs @@ -15,9 +15,6 @@ use super::{ AmqpEncodeError, AmqpEncoder, }; -#[cfg(test)] -use fake::Fake; - /// Properties of the message #[derive(Clone, Debug, PartialEq, Eq, Default)] #[cfg_attr(test, derive(fake::Dummy))] diff --git a/protocol/src/message/amqp/types/annotations.rs b/protocol/src/message/amqp/types/annotations.rs index 93e47604..50286934 100644 --- a/protocol/src/message/amqp/types/annotations.rs +++ b/protocol/src/message/amqp/types/annotations.rs @@ -4,8 +4,6 @@ use crate::{ }; use super::{Long, Map, Symbol, ULong, Value}; -#[cfg(test)] -use fake::Fake; /// Key for annotations [`Map`] #[derive(Debug, Clone, Hash, PartialEq, Eq)] diff --git a/protocol/src/message/amqp/types/descriptor.rs b/protocol/src/message/amqp/types/descriptor.rs index 70176c3e..45e79295 100644 --- a/protocol/src/message/amqp/types/descriptor.rs +++ b/protocol/src/message/amqp/types/descriptor.rs @@ -5,9 +5,6 @@ use crate::message::amqp::codec::{AmqpDecoder, AmqpEncoder}; use crate::message::amqp::error::{AmqpDecodeError, AmqpEncodeError}; use crate::utils::TupleMapperSecond; -#[cfg(test)] -use fake::Fake; - #[derive(Debug, PartialEq, Eq, Clone, Hash)] #[cfg_attr(test, derive(fake::Dummy))] pub enum Descriptor { diff --git a/protocol/src/message/amqp/types/mod.rs b/protocol/src/message/amqp/types/mod.rs index 027c360c..bb3d7398 100644 --- a/protocol/src/message/amqp/types/mod.rs +++ b/protocol/src/message/amqp/types/mod.rs @@ -6,9 +6,6 @@ pub use primitives::*; use super::{codec::constants::TypeCode, AmqpDecoder, AmqpEncodeError, AmqpEncoder}; -#[cfg(test)] -use fake::Fake; - mod annotations; mod definitions; mod descriptor; diff --git a/protocol/src/message/amqp/types/primitives/list.rs b/protocol/src/message/amqp/types/primitives/list.rs index e44c0d80..951e632f 100644 --- a/protocol/src/message/amqp/types/primitives/list.rs +++ b/protocol/src/message/amqp/types/primitives/list.rs @@ -168,7 +168,6 @@ mod tests { use std::ops::Range; use fake::{Dummy, Fake, Faker}; - use rand::Rng; use crate::message::amqp::{ tests::type_encode_decode_test_fuzzy, @@ -179,7 +178,7 @@ mod tests { const DEFAULT_LEN_RANGE: Range = 0..10; impl Dummy for List { - fn dummy_with_rng(config: &Faker, rng: &mut R) -> Self { + fn dummy_with_rng(config: &Faker, rng: &mut R) -> Self { let len: usize = DEFAULT_LEN_RANGE.fake_with_rng(rng); let mut m = List::new(); diff --git a/protocol/src/message/amqp/types/primitives/map.rs b/protocol/src/message/amqp/types/primitives/map.rs index 246f7cc7..8e96d25f 100644 --- a/protocol/src/message/amqp/types/primitives/map.rs +++ b/protocol/src/message/amqp/types/primitives/map.rs @@ -132,7 +132,6 @@ mod tests { use crate::message::amqp::types::{SimpleValue, Value}; use fake::{Dummy, Fake, Faker}; - use rand::Rng; use std::{hash::Hash, ops::Range}; @@ -142,7 +141,7 @@ mod tests { where K: Dummy + Hash + Eq, { - fn dummy_with_rng(config: &Faker, rng: &mut R) -> Self { + fn dummy_with_rng(config: &Faker, rng: &mut R) -> Self { let len: usize = DEFAULT_LEN_RANGE.fake_with_rng(rng); let mut m = Map::default(); @@ -154,7 +153,7 @@ mod tests { } fn dummy(config: &Faker) -> Self { - let mut r = rand::thread_rng(); + let mut r = fake::rand::rng(); Dummy::::dummy_with_rng(config, &mut r) } } @@ -163,7 +162,7 @@ mod tests { where K: Dummy + Hash + Eq, { - fn dummy_with_rng(config: &Faker, rng: &mut R) -> Self { + fn dummy_with_rng(config: &Faker, rng: &mut R) -> Self { let len: usize = DEFAULT_LEN_RANGE.fake_with_rng(rng); let mut m = Map::default(); @@ -175,7 +174,7 @@ mod tests { } fn dummy(config: &Faker) -> Self { - let mut r = rand::thread_rng(); + let mut r = fake::rand::rng(); Dummy::::dummy_with_rng(config, &mut r) } } diff --git a/protocol/src/message/amqp/types/primitives/simple.rs b/protocol/src/message/amqp/types/primitives/simple.rs index 9c04e4d5..03ac3d26 100644 --- a/protocol/src/message/amqp/types/primitives/simple.rs +++ b/protocol/src/message/amqp/types/primitives/simple.rs @@ -17,9 +17,6 @@ use crate::{ utils::TupleMapperSecond, }; -#[cfg(test)] -use fake::Fake; - /// Primitive AMQP 1.0 data type #[derive(Debug, Eq, PartialEq, Hash, Clone, From, TryInto)] #[try_into(owned, ref, ref_mut)] @@ -691,20 +688,20 @@ mod tests { use crate::message::amqp::{tests::type_encode_decode_test_fuzzy, types::SimpleValue}; impl Dummy for Float { - fn dummy_with_rng(config: &Faker, rng: &mut R) -> Self { + fn dummy_with_rng(config: &Faker, rng: &mut R) -> Self { let num: f32 = config.fake_with_rng(rng); Float(OrderedFloat::from(num)) } } impl Dummy for Double { - fn dummy_with_rng(config: &Faker, rng: &mut R) -> Self { + fn dummy_with_rng(config: &Faker, rng: &mut R) -> Self { let num: f64 = config.fake_with_rng(rng); Double(OrderedFloat::from(num)) } } impl Dummy for Timestamp { - fn dummy_with_rng(config: &Faker, rng: &mut R) -> Self { + fn dummy_with_rng(config: &Faker, rng: &mut R) -> Self { let mut dt: DateTime = config.fake_with_rng(rng); dt = dt.with_nanosecond(0).unwrap(); Timestamp(dt) diff --git a/protocol/src/message/amqp/types/primitives/value.rs b/protocol/src/message/amqp/types/primitives/value.rs index 8192ee3f..21aa8bff 100644 --- a/protocol/src/message/amqp/types/primitives/value.rs +++ b/protocol/src/message/amqp/types/primitives/value.rs @@ -14,9 +14,6 @@ use crate::{ }; use derive_more::From; -#[cfg(test)] -use fake::Fake; - /// AMQP 1.0 data types #[derive(Debug, Eq, PartialEq, Hash, Clone)] #[cfg_attr(test, derive(fake::Dummy))] diff --git a/protocol/src/message/amqp/types/symbol.rs b/protocol/src/message/amqp/types/symbol.rs index 4b55697b..ca62732c 100644 --- a/protocol/src/message/amqp/types/symbol.rs +++ b/protocol/src/message/amqp/types/symbol.rs @@ -10,8 +10,6 @@ use crate::{ }; use byteorder::{BigEndian, WriteBytesExt}; -#[cfg(test)] -use fake::Fake; use super::Str; diff --git a/protocol/src/request/mod.rs b/protocol/src/request/mod.rs index fe488038..ee3ec7ec 100644 --- a/protocol/src/request/mod.rs +++ b/protocol/src/request/mod.rs @@ -244,9 +244,9 @@ mod tests { use crate::{ codec::{Decoder, Encoder}, commands::{ - close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand, - create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand, - credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete, + close::CloseRequest, create_stream::CreateStreamCommand, + create_super_stream::CreateSuperStreamCommand, credit::CreditCommand, + declare_publisher::DeclarePublisherCommand, delete::Delete, delete_publisher::DeletePublisherCommand, delete_super_stream::DeleteSuperStreamCommand, exchange_command_versions::ExchangeCommandVersionsRequest, diff --git a/protocol/src/response/mod.rs b/protocol/src/response/mod.rs index 9c1e920f..4bda7679 100644 --- a/protocol/src/response/mod.rs +++ b/protocol/src/response/mod.rs @@ -223,8 +223,6 @@ where #[cfg(test)] mod tests { - use std::collections::HashMap; - use byteorder::{BigEndian, WriteBytesExt}; use super::{Response, ResponseKind}; diff --git a/protocol/src/types.rs b/protocol/src/types.rs index 8bf0ee3f..2961c514 100644 --- a/protocol/src/types.rs +++ b/protocol/src/types.rs @@ -20,9 +20,6 @@ impl Header { } } -#[cfg(test)] -use fake::Fake; - use crate::{message::Message, ResponseCode}; #[cfg_attr(test, derive(fake::Dummy))]