From 39ad6265763898575a3673b81c29b84ba971d38b Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Wed, 6 Nov 2024 10:48:21 +0100 Subject: [PATCH 01/14] implementing Consumer_Update command --- protocol/src/commands/consumer_update.rs | 84 ++++++++++++++++++++++++ protocol/src/commands/mod.rs | 1 + protocol/src/protocol.rs | 3 + protocol/src/response/mod.rs | 29 +++++++- 4 files changed, 114 insertions(+), 3 deletions(-) create mode 100644 protocol/src/commands/consumer_update.rs diff --git a/protocol/src/commands/consumer_update.rs b/protocol/src/commands/consumer_update.rs new file mode 100644 index 00000000..1bd92e05 --- /dev/null +++ b/protocol/src/commands/consumer_update.rs @@ -0,0 +1,84 @@ +use std::io::Write; + +#[cfg(test)] +use fake::Fake; + +use crate::{ + codec::{Decoder, Encoder}, + error::{DecodeError, EncodeError}, + protocol::commands::COMMAND_CONSUMER_UPDATE, +}; + +use super::Command; + +#[cfg_attr(test, derive(fake::Dummy))] +#[derive(PartialEq, Eq, Debug)] +pub struct ConsumerUpdateCommand { + pub(crate) correlation_id: u32, + subscription_id: u8, + active: u8, +} + +impl ConsumerUpdateCommand { + pub fn new( + correlation_id: u32, + subscription_id: u8, + active: u8, + ) -> Self { + Self { + correlation_id, + subscription_id, + active, + } + } +} + +impl Encoder for ConsumerUpdateCommand { + fn encoded_size(&self) -> u32 { + self.correlation_id.encoded_size() + + self.subscription_id.encoded_size() + + self.active.encoded_size() + } + + fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> { + self.correlation_id.encode(writer)?; + self.subscription_id.encode(writer)?; + self.active.encode(writer)?; + Ok(()) + } +} + +impl Decoder for ConsumerUpdateCommand { + fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> { + let (input, correlation_id) = u32::decode(input)?; + let (input, subscription_id) = u8::decode(input)?; + let (input, active) = u8::decode(input)?; + + Ok(( + input, + ConsumerUpdateCommand { + correlation_id, + subscription_id, + active, + }, + )) + } +} + +impl Command for ConsumerUpdateCommand { + fn key(&self) -> u16 { + COMMAND_CONSUMER_UPDATE + } +} + +#[cfg(test)] +mod tests { + use crate::commands::tests::command_encode_decode_test; + + use super::{ConsumerUpdateCommand}; + + #[test] + fn subscribe_request_test() { + command_encode_decode_test::(); + } +} diff --git a/protocol/src/commands/mod.rs b/protocol/src/commands/mod.rs index f6ecf1f8..3f5e9456 100644 --- a/protocol/src/commands/mod.rs +++ b/protocol/src/commands/mod.rs @@ -29,6 +29,7 @@ pub mod superstream_partitions; pub mod superstream_route; pub mod tune; pub mod unsubscribe; +pub mod consumer_update; pub trait Command { fn key(&self) -> u16; diff --git a/protocol/src/protocol.rs b/protocol/src/protocol.rs index 5dcb0d3c..12d52e41 100644 --- a/protocol/src/protocol.rs +++ b/protocol/src/protocol.rs @@ -32,6 +32,9 @@ pub mod commands { pub const COMMAND_STREAMS_STATS: u16 = 28; pub const COMMAND_CREATE_SUPER_STREAM: u16 = 29; pub const COMMAND_DELETE_SUPER_STREAM: u16 = 30; + pub const COMMAND_CONSUMER_UPDATE_REQUEST: u16 = 32794; + + } // server responses diff --git a/protocol/src/response/mod.rs b/protocol/src/response/mod.rs index 0f394c14..4cf0bcae 100644 --- a/protocol/src/response/mod.rs +++ b/protocol/src/response/mod.rs @@ -14,7 +14,7 @@ use crate::{ publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse, query_publisher_sequence::QueryPublisherResponse, sasl_handshake::SaslHandshakeResponse, superstream_partitions::SuperStreamPartitionsResponse, - superstream_route::SuperStreamRouteResponse, tune::TunesCommand, + superstream_route::SuperStreamRouteResponse, tune::TunesCommand, consumer_update::ConsumerUpdateCommand }, error::DecodeError, protocol::commands::*, @@ -72,6 +72,7 @@ pub enum ResponseKind { ExchangeCommandVersions(ExchangeCommandVersionsResponse), SuperStreamPartitions(SuperStreamPartitionsResponse), SuperStreamRoute(SuperStreamRouteResponse), + ConsumerUpdate(ConsumerUpdateCommand), } impl Response { @@ -107,6 +108,9 @@ impl Response { ResponseKind::SuperStreamRoute(super_stream_route_command) => { Some(super_stream_route_command.correlation_id) } + ResponseKind::ConsumerUpdate(consumer_update_command) => { + Some(consumer_update_command.correlation_id) + } } } @@ -188,6 +192,8 @@ impl Decoder for Response { .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamPartitions(kind)))?, COMMAND_ROUTE => SuperStreamRouteResponse::decode(input) .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamRoute(kind)))?, + COMMAND_CONSUMER_UPDATE => ConsumerUpdateCommand::decode(input) + .map(|(remaining, kind)| (remaining, ResponseKind::ConsumerUpdate(kind)))?, n => return Err(DecodeError::UnsupportedResponseType(n)), }; Ok((input, Response { header, kind })) @@ -228,7 +234,7 @@ mod tests { query_publisher_sequence::QueryPublisherResponse, sasl_handshake::SaslHandshakeResponse, superstream_partitions::SuperStreamPartitionsResponse, - superstream_route::SuperStreamRouteResponse, tune::TunesCommand, + superstream_route::SuperStreamRouteResponse, tune::TunesCommand, consumer_update::ConsumerUpdateCommand }, protocol::{ commands::{ @@ -236,7 +242,7 @@ mod tests { COMMAND_METADATA_UPDATE, COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES, COMMAND_PUBLISH_CONFIRM, COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET, COMMAND_QUERY_PUBLISHER_SEQUENCE, COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE, - COMMAND_SASL_HANDSHAKE, COMMAND_TUNE, + COMMAND_SASL_HANDSHAKE, COMMAND_TUNE, COMMAND_CONSUMER_UPDATE, }, version::PROTOCOL_VERSION, }, @@ -244,6 +250,8 @@ mod tests { types::Header, ResponseCode, }; + use crate::protocol::commands::COMMAND_CONSUMER_UPDATE; + impl Encoder for ResponseKind { fn encoded_size(&self) -> u32 { match self { @@ -273,6 +281,9 @@ mod tests { ResponseKind::SuperStreamRoute(super_stream_response) => { super_stream_response.encoded_size() } + ResponseKind::ConsumerUpdate(consumer_update_response) => { + consumer_update_response.encoded_size() + } } } @@ -307,6 +318,9 @@ mod tests { ResponseKind::SuperStreamRoute(super_stream_command_versions) => { super_stream_command_versions.encode(writer) } + ResponseKind::ConsumerUpdate(consumer_update_command_version) => { + consumer_update_command_version.encode(writer) + } } } } @@ -494,4 +508,13 @@ mod tests { COMMAND_ROUTE ); } + + #[test] + fn consumer_update_response_test() { + response_test!( + ConsumerUpdateCommand, + ResponseKind::ConsumerUpdate, + COMMAND_CONSUMER_UPDATE + ); + } } From 5a0bd3e0debb21ecd57f982af9f837b3ff3328ce Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Wed, 6 Nov 2024 11:44:07 +0100 Subject: [PATCH 02/14] implementing ConsumerUpdateRequest command --- protocol/src/commands/consumer_update.rs | 8 +- .../src/commands/consumer_update_request.rs | 86 +++++++++++++++++++ protocol/src/commands/mod.rs | 3 +- protocol/src/protocol.rs | 2 - protocol/src/request/mod.rs | 27 ++++-- protocol/src/request/shims.rs | 9 +- protocol/src/response/mod.rs | 45 +++++++--- src/client/mod.rs | 3 +- tests/integration/client_test.rs | 13 +++ 9 files changed, 166 insertions(+), 30 deletions(-) create mode 100644 protocol/src/commands/consumer_update_request.rs diff --git a/protocol/src/commands/consumer_update.rs b/protocol/src/commands/consumer_update.rs index 1bd92e05..a850ce2a 100644 --- a/protocol/src/commands/consumer_update.rs +++ b/protocol/src/commands/consumer_update.rs @@ -20,11 +20,7 @@ pub struct ConsumerUpdateCommand { } impl ConsumerUpdateCommand { - pub fn new( - correlation_id: u32, - subscription_id: u8, - active: u8, - ) -> Self { + pub fn new(correlation_id: u32, subscription_id: u8, active: u8) -> Self { Self { correlation_id, subscription_id, @@ -75,7 +71,7 @@ impl Command for ConsumerUpdateCommand { mod tests { use crate::commands::tests::command_encode_decode_test; - use super::{ConsumerUpdateCommand}; + use super::ConsumerUpdateCommand; #[test] fn subscribe_request_test() { diff --git a/protocol/src/commands/consumer_update_request.rs b/protocol/src/commands/consumer_update_request.rs new file mode 100644 index 00000000..d736b7a7 --- /dev/null +++ b/protocol/src/commands/consumer_update_request.rs @@ -0,0 +1,86 @@ +use std::io::Write; + +#[cfg(test)] +use fake::Fake; + +use crate::{ + codec::{Decoder, Encoder}, + error::{DecodeError, EncodeError}, + protocol::commands::COMMAND_CONSUMER_UPDATE_REQUEST, +}; + +use crate::commands::subscribe::OffsetSpecification; + +use super::Command; + +#[cfg_attr(test, derive(fake::Dummy))] +#[derive(PartialEq, Eq, Debug)] +pub struct ConsumerUpdateRequestCommand { + pub(crate) correlation_id: u32, + response_code: u16, + offset_specification: OffsetSpecification, +} + +impl ConsumerUpdateRequestCommand { + pub fn new( + correlation_id: u32, + response_code: u16, + offset_specification: OffsetSpecification, + ) -> Self { + Self { + correlation_id, + response_code, + offset_specification, + } + } +} + +impl Encoder for ConsumerUpdateRequestCommand { + fn encoded_size(&self) -> u32 { + self.correlation_id.encoded_size() + + self.response_code.encoded_size() + + self.offset_specification.encoded_size() + } + + fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> { + self.correlation_id.encode(writer)?; + self.response_code.encode(writer)?; + self.offset_specification.encode(writer)?; + Ok(()) + } +} + +impl Decoder for ConsumerUpdateRequestCommand { + fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> { + let (input, correlation_id) = u32::decode(input)?; + let (input, response_code) = u16::decode(input)?; + let (input, offset_specification) = OffsetSpecification::decode(input)?; + + Ok(( + input, + ConsumerUpdateRequestCommand { + correlation_id, + response_code, + offset_specification, + }, + )) + } +} + +impl Command for ConsumerUpdateRequestCommand { + fn key(&self) -> u16 { + COMMAND_CONSUMER_UPDATE_REQUEST + } +} + +#[cfg(test)] +mod tests { + use crate::commands::tests::command_encode_decode_test; + + use super::ConsumerUpdateRequestCommand; + + #[test] + fn subscribe_request_test() { + command_encode_decode_test::(); + } +} diff --git a/protocol/src/commands/mod.rs b/protocol/src/commands/mod.rs index 3f5e9456..c8f85170 100644 --- a/protocol/src/commands/mod.rs +++ b/protocol/src/commands/mod.rs @@ -1,6 +1,8 @@ use crate::protocol::version::PROTOCOL_VERSION; pub mod close; +pub mod consumer_update; +pub mod consumer_update_request; pub mod create_stream; pub mod create_super_stream; pub mod credit; @@ -29,7 +31,6 @@ pub mod superstream_partitions; pub mod superstream_route; pub mod tune; pub mod unsubscribe; -pub mod consumer_update; pub trait Command { fn key(&self) -> u16; diff --git a/protocol/src/protocol.rs b/protocol/src/protocol.rs index 12d52e41..2c41cbed 100644 --- a/protocol/src/protocol.rs +++ b/protocol/src/protocol.rs @@ -33,8 +33,6 @@ pub mod commands { pub const COMMAND_CREATE_SUPER_STREAM: u16 = 29; pub const COMMAND_DELETE_SUPER_STREAM: u16 = 30; pub const COMMAND_CONSUMER_UPDATE_REQUEST: u16 = 32794; - - } // server responses diff --git a/protocol/src/request/mod.rs b/protocol/src/request/mod.rs index 1eea47bd..31bd805e 100644 --- a/protocol/src/request/mod.rs +++ b/protocol/src/request/mod.rs @@ -3,9 +3,9 @@ use std::io::Write; use crate::{ codec::{decoder::read_u32, Decoder, Encoder}, commands::{ - close::CloseRequest, create_stream::CreateStreamCommand, - create_super_stream::CreateSuperStreamCommand, credit::CreditCommand, - declare_publisher::DeclarePublisherCommand, delete::Delete, + close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand, + create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand, + credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete, delete_publisher::DeletePublisherCommand, delete_super_stream::DeleteSuperStreamCommand, exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand, metadata::MetadataCommand, open::OpenCommand, peer_properties::PeerPropertiesCommand, @@ -68,6 +68,7 @@ pub enum RequestKind { DeleteSuperStream(DeleteSuperStreamCommand), SuperStreamPartitions(SuperStreamPartitionsRequest), SuperStreamRoute(SuperStreamRouteRequest), + ConsumerUpdateRequest(ConsumerUpdateRequestCommand), } impl Encoder for RequestKind { @@ -105,6 +106,9 @@ impl Encoder for RequestKind { super_stream_partitions.encoded_size() } RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encoded_size(), + RequestKind::ConsumerUpdateRequest(consumer_update_request) => { + consumer_update_request.encoded_size() + } } } @@ -142,6 +146,9 @@ impl Encoder for RequestKind { super_stream_partition.encode(writer) } RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encode(writer), + RequestKind::ConsumerUpdateRequest(consumer_update_request) => { + consumer_update_request.encode(writer) + } } } } @@ -222,6 +229,9 @@ impl Decoder for Request { COMMAND_ROUTE => { SuperStreamRouteRequest::decode(input).map(|(i, kind)| (i, kind.into()))? } + COMMAND_CONSUMER_UPDATE_REQUEST => { + ConsumerUpdateRequestCommand::decode(input).map(|(i, kind)| (i, kind.into()))? + } n => return Err(DecodeError::UnsupportedResponseType(n)), }; Ok((input, Request { header, kind: cmd })) @@ -234,9 +244,9 @@ mod tests { use crate::{ codec::{Decoder, Encoder}, commands::{ - close::CloseRequest, create_stream::CreateStreamCommand, - create_super_stream::CreateSuperStreamCommand, credit::CreditCommand, - declare_publisher::DeclarePublisherCommand, delete::Delete, + close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand, + create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand, + credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete, delete_publisher::DeletePublisherCommand, delete_super_stream::DeleteSuperStreamCommand, exchange_command_versions::ExchangeCommandVersionsRequest, @@ -387,4 +397,9 @@ mod tests { fn request_route_command() { request_encode_decode_test::() } + + #[test] + fn request_consumer_update_request_command() { + request_encode_decode_test::() + } } diff --git a/protocol/src/request/shims.rs b/protocol/src/request/shims.rs index 102efe6f..2d8c73b4 100644 --- a/protocol/src/request/shims.rs +++ b/protocol/src/request/shims.rs @@ -2,7 +2,8 @@ use crate::commands::create_super_stream::CreateSuperStreamCommand; use crate::commands::delete_super_stream::DeleteSuperStreamCommand; use crate::{ commands::{ - close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand, + close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand, + create_stream::CreateStreamCommand, credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete, delete_publisher::DeletePublisherCommand, exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand, @@ -164,3 +165,9 @@ impl From for RequestKind { RequestKind::SuperStreamRoute(cmd) } } + +impl From for RequestKind { + fn from(cmd: ConsumerUpdateRequestCommand) -> Self { + RequestKind::ConsumerUpdateRequest(cmd) + } +} diff --git a/protocol/src/response/mod.rs b/protocol/src/response/mod.rs index 4cf0bcae..9f3ff419 100644 --- a/protocol/src/response/mod.rs +++ b/protocol/src/response/mod.rs @@ -6,15 +6,16 @@ use crate::{ Decoder, }, commands::{ - close::CloseResponse, credit::CreditResponse, deliver::DeliverCommand, - exchange_command_versions::ExchangeCommandVersionsResponse, generic::GenericResponse, - heart_beat::HeartbeatResponse, metadata::MetadataResponse, + close::CloseResponse, consumer_update::ConsumerUpdateCommand, + consumer_update_request::ConsumerUpdateRequestCommand, credit::CreditResponse, + deliver::DeliverCommand, exchange_command_versions::ExchangeCommandVersionsResponse, + generic::GenericResponse, heart_beat::HeartbeatResponse, metadata::MetadataResponse, metadata_update::MetadataUpdateCommand, open::OpenResponse, peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm, publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse, query_publisher_sequence::QueryPublisherResponse, sasl_handshake::SaslHandshakeResponse, superstream_partitions::SuperStreamPartitionsResponse, - superstream_route::SuperStreamRouteResponse, tune::TunesCommand, consumer_update::ConsumerUpdateCommand + superstream_route::SuperStreamRouteResponse, tune::TunesCommand, }, error::DecodeError, protocol::commands::*, @@ -73,6 +74,7 @@ pub enum ResponseKind { SuperStreamPartitions(SuperStreamPartitionsResponse), SuperStreamRoute(SuperStreamRouteResponse), ConsumerUpdate(ConsumerUpdateCommand), + ConsumerUpdateRequest(ConsumerUpdateRequestCommand), } impl Response { @@ -111,6 +113,9 @@ impl Response { ResponseKind::ConsumerUpdate(consumer_update_command) => { Some(consumer_update_command.correlation_id) } + ResponseKind::ConsumerUpdateRequest(consumer_update_request_command) => { + Some(consumer_update_request_command.correlation_id) + } } } @@ -155,6 +160,7 @@ impl Decoder for Response { | COMMAND_CREATE_STREAM | COMMAND_CREATE_SUPER_STREAM | COMMAND_DELETE_SUPER_STREAM + | COMMAND_CONSUMER_UPDATE_REQUEST | COMMAND_DELETE_STREAM => { GenericResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Generic(kind)))? } @@ -225,7 +231,8 @@ mod tests { use crate::{ codec::{Decoder, Encoder}, commands::{ - close::CloseResponse, deliver::DeliverCommand, + close::CloseResponse, consumer_update::ConsumerUpdateCommand, + consumer_update_request::ConsumerUpdateRequestCommand, deliver::DeliverCommand, exchange_command_versions::ExchangeCommandVersionsResponse, generic::GenericResponse, heart_beat::HeartbeatResponse, metadata::MetadataResponse, metadata_update::MetadataUpdateCommand, open::OpenResponse, @@ -234,15 +241,15 @@ mod tests { query_publisher_sequence::QueryPublisherResponse, sasl_handshake::SaslHandshakeResponse, superstream_partitions::SuperStreamPartitionsResponse, - superstream_route::SuperStreamRouteResponse, tune::TunesCommand, consumer_update::ConsumerUpdateCommand + superstream_route::SuperStreamRouteResponse, tune::TunesCommand, }, protocol::{ commands::{ - COMMAND_CLOSE, COMMAND_DELIVER, COMMAND_HEARTBEAT, COMMAND_METADATA, - COMMAND_METADATA_UPDATE, COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES, - COMMAND_PUBLISH_CONFIRM, COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET, - COMMAND_QUERY_PUBLISHER_SEQUENCE, COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE, - COMMAND_SASL_HANDSHAKE, COMMAND_TUNE, COMMAND_CONSUMER_UPDATE, + COMMAND_CLOSE, COMMAND_CONSUMER_UPDATE, COMMAND_CONSUMER_UPDATE_REQUEST, + COMMAND_DELIVER, COMMAND_HEARTBEAT, COMMAND_METADATA, COMMAND_METADATA_UPDATE, + COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES, COMMAND_PUBLISH_CONFIRM, + COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET, COMMAND_QUERY_PUBLISHER_SEQUENCE, + COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE, COMMAND_SASL_HANDSHAKE, COMMAND_TUNE, }, version::PROTOCOL_VERSION, }, @@ -250,7 +257,6 @@ mod tests { types::Header, ResponseCode, }; - use crate::protocol::commands::COMMAND_CONSUMER_UPDATE; impl Encoder for ResponseKind { fn encoded_size(&self) -> u32 { @@ -284,6 +290,9 @@ mod tests { ResponseKind::ConsumerUpdate(consumer_update_response) => { consumer_update_response.encoded_size() } + ResponseKind::ConsumerUpdateRequest(consumer_update_request_response) => { + consumer_update_request_response.encoded_size() + } } } @@ -321,6 +330,9 @@ mod tests { ResponseKind::ConsumerUpdate(consumer_update_command_version) => { consumer_update_command_version.encode(writer) } + ResponseKind::ConsumerUpdateRequest(consumer_update_request_command_version) => { + consumer_update_request_command_version.encode(writer) + } } } } @@ -517,4 +529,13 @@ mod tests { COMMAND_CONSUMER_UPDATE ); } + + #[test] + fn consumer_update_request_response_test() { + response_test!( + ConsumerUpdateRequestCommand, + ResponseKind::ConsumerUpdateRequest, + COMMAND_CONSUMER_UPDATE_REQUEST + ); + } } diff --git a/src/client/mod.rs b/src/client/mod.rs index dc242d6e..920186b6 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -34,6 +34,7 @@ use tokio_rustls::{rustls, TlsConnector}; use tokio_util::codec::Framed; use tracing::trace; +use crate::{error::ClientError, RabbitMQStreamResult}; pub use message::ClientMessage; pub use metadata::{Broker, StreamMetadata}; pub use metrics::MetricsCollector; @@ -71,8 +72,6 @@ use rabbitmq_stream_protocol::{ FromResponse, Request, Response, ResponseCode, ResponseKind, }; -use crate::{error::ClientError, RabbitMQStreamResult}; - pub use self::handler::{MessageHandler, MessageResult}; use self::{ channel::{channel, ChannelReceiver, ChannelSender}, diff --git a/tests/integration/client_test.rs b/tests/integration/client_test.rs index 4051c484..03c9a4c4 100644 --- a/tests/integration/client_test.rs +++ b/tests/integration/client_test.rs @@ -457,3 +457,16 @@ async fn client_test_route_test() { test.partitions.get(0).unwrap() ); } + +#[tokio::test(flavor = "multi_thread")] +async fn client_consumer_update_request_test() { + let test = TestClient::create().await; + + let response = test + .client + .consumer_update(OffsetSpecification::Next) + .await + .unwrap(); + + assert_eq!(&ResponseCode::Ok, response.code()); +} From 13faec776f47045d9acc5cf000ba5830a2cf89db Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Fri, 8 Nov 2024 15:38:11 +0100 Subject: [PATCH 03/14] SAC: starting implementation --- protocol/src/commands/consumer_update.rs | 2 +- .../src/commands/consumer_update_request.rs | 2 +- protocol/src/response/mod.rs | 9 ----- src/client/dispatcher.rs | 7 ++-- src/consumer.rs | 36 ++++++++++++++++--- src/superstream_consumer.rs | 5 +++ 6 files changed, 43 insertions(+), 18 deletions(-) diff --git a/protocol/src/commands/consumer_update.rs b/protocol/src/commands/consumer_update.rs index a850ce2a..463ad0ca 100644 --- a/protocol/src/commands/consumer_update.rs +++ b/protocol/src/commands/consumer_update.rs @@ -74,7 +74,7 @@ mod tests { use super::ConsumerUpdateCommand; #[test] - fn subscribe_request_test() { + fn consumer_update_response_test() { command_encode_decode_test::(); } } diff --git a/protocol/src/commands/consumer_update_request.rs b/protocol/src/commands/consumer_update_request.rs index d736b7a7..38c1f5f9 100644 --- a/protocol/src/commands/consumer_update_request.rs +++ b/protocol/src/commands/consumer_update_request.rs @@ -80,7 +80,7 @@ mod tests { use super::ConsumerUpdateRequestCommand; #[test] - fn subscribe_request_test() { + fn consumer_update_request_test() { command_encode_decode_test::(); } } diff --git a/protocol/src/response/mod.rs b/protocol/src/response/mod.rs index 9f3ff419..9c1e920f 100644 --- a/protocol/src/response/mod.rs +++ b/protocol/src/response/mod.rs @@ -529,13 +529,4 @@ mod tests { COMMAND_CONSUMER_UPDATE ); } - - #[test] - fn consumer_update_request_response_test() { - response_test!( - ConsumerUpdateRequestCommand, - ResponseKind::ConsumerUpdateRequest, - COMMAND_CONSUMER_UPDATE_REQUEST - ); - } } diff --git a/src/client/dispatcher.rs b/src/client/dispatcher.rs index c8b1291d..5ac3b7fd 100644 --- a/src/client/dispatcher.rs +++ b/src/client/dispatcher.rs @@ -1,5 +1,5 @@ use futures::Stream; -use rabbitmq_stream_protocol::Response; +use rabbitmq_stream_protocol::{Response, ResponseKind}; use std::sync::{ atomic::{AtomicBool, AtomicU32, Ordering}, Arc, @@ -167,7 +167,10 @@ where while let Some(result) = stream.next().await { match result { Ok(item) => match item.correlation_id() { - Some(correlation_id) => state.dispatch(correlation_id, item).await, + Some(correlation_id) => match item.kind_ref() { + ResponseKind::ConsumerUpdate(consumer_update) => state.notify(item).await, + _ => state.dispatch(correlation_id, item).await, + }, None => state.notify(item).await, }, Err(e) => { diff --git a/src/consumer.rs b/src/consumer.rs index 519cc54c..a4650ce1 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -26,11 +26,14 @@ use crate::{ Client, ClientOptions, Environment, MetricsCollector, }; use futures::{task::AtomicWaker, Stream}; +use rabbitmq_stream_protocol::commands::consumer_update::ConsumerUpdateCommand; use rand::rngs::StdRng; use rand::{seq::SliceRandom, SeedableRng}; type FilterPredicate = Option bool + Send + Sync>>; +type ConsumerUpdateListener = Option u64 + Send + Sync>>; + /// API for consuming RabbitMQ stream messages pub struct Consumer { // Mandatory in case of manual offset tracking @@ -82,6 +85,26 @@ impl FilterConfiguration { } } +pub struct MessageContext { + consumer: Consumer, + subscriber_name: String, + reference: String, +} + +impl MessageContext { + pub fn get_consumer(self) -> Consumer { + self.consumer + } + + pub fn get_subscriber_name(self) -> String { + self.subscriber_name + } + + pub fn get_reference(self) -> String { + self.reference + } +} + /// Builder for [`Consumer`] pub struct ConsumerBuilder { pub(crate) consumer_name: Option, @@ -317,27 +340,27 @@ impl MessageHandler for ConsumerMessageHandler { async fn handle_message(&self, item: MessageResult) -> crate::RabbitMQStreamResult<()> { match item { Some(Ok(response)) => { - if let ResponseKind::Deliver(delivery) = response.kind() { + if let ResponseKind::Deliver(delivery) = response.kind_ref() { let mut offset = delivery.chunk_first_offset; let len = delivery.messages.len(); + let d = delivery.clone(); trace!("Got delivery with messages {}", len); // // client filter let messages = match &self.0.filter_configuration { Some(filter_input) => { if let Some(f) = &filter_input.predicate { - delivery - .messages + d.messages .into_iter() .filter(|message| f(message)) .collect::>() } else { - delivery.messages + d.messages } } - None => delivery.messages, + None => d.messages, }; for message in messages { @@ -363,6 +386,8 @@ impl MessageHandler for ConsumerMessageHandler { // TODO handle credit fail let _ = self.0.client.credit(self.0.subscription_id, 1).await; self.0.metrics_collector.consume(len as u64).await; + } else { + println!("other message arrived"); } } Some(Err(err)) => { @@ -377,6 +402,7 @@ impl MessageHandler for ConsumerMessageHandler { Ok(()) } } + /// Envelope from incoming message #[derive(Debug)] pub struct Delivery { diff --git a/src/superstream_consumer.rs b/src/superstream_consumer.rs index 708dd421..b7c61e14 100644 --- a/src/superstream_consumer.rs +++ b/src/superstream_consumer.rs @@ -105,6 +105,11 @@ impl SuperStreamConsumerBuilder { self.client_provided_name = String::from(name); self } + + pub fn properties(mut self, properties: HashMap) -> Self { + self.properties = properties; + self + } } impl Stream for SuperStreamConsumer { From 68ef2ad987d577082a3039abb0090cd35b5aba64 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Sun, 10 Nov 2024 17:21:00 +0100 Subject: [PATCH 04/14] Implementing callback support and consumer_update response --- protocol/src/commands/consumer_update.rs | 8 +++ protocol/src/request/mod.rs | 5 -- src/client/dispatcher.rs | 2 +- src/client/mod.rs | 12 ++++ src/consumer.rs | 87 +++++++++++++++++++----- src/environment.rs | 2 + src/lib.rs | 6 +- src/superstream_consumer.rs | 20 +++++- tests/integration/client_test.rs | 13 ---- 9 files changed, 116 insertions(+), 39 deletions(-) diff --git a/protocol/src/commands/consumer_update.rs b/protocol/src/commands/consumer_update.rs index 463ad0ca..6d074727 100644 --- a/protocol/src/commands/consumer_update.rs +++ b/protocol/src/commands/consumer_update.rs @@ -27,6 +27,14 @@ impl ConsumerUpdateCommand { active, } } + + pub fn get_correlation_id(&self) -> u32 { + self.correlation_id + } + + pub fn is_active(&self) -> u8 { + self.active + } } impl Encoder for ConsumerUpdateCommand { diff --git a/protocol/src/request/mod.rs b/protocol/src/request/mod.rs index 31bd805e..fe488038 100644 --- a/protocol/src/request/mod.rs +++ b/protocol/src/request/mod.rs @@ -397,9 +397,4 @@ mod tests { fn request_route_command() { request_encode_decode_test::() } - - #[test] - fn request_consumer_update_request_command() { - request_encode_decode_test::() - } } diff --git a/src/client/dispatcher.rs b/src/client/dispatcher.rs index 5ac3b7fd..1abcf43b 100644 --- a/src/client/dispatcher.rs +++ b/src/client/dispatcher.rs @@ -168,7 +168,7 @@ where match result { Ok(item) => match item.correlation_id() { Some(correlation_id) => match item.kind_ref() { - ResponseKind::ConsumerUpdate(consumer_update) => state.notify(item).await, + ResponseKind::ConsumerUpdate(_) => state.notify(item).await, _ => state.dispatch(correlation_id, item).await, }, None => state.notify(item).await, diff --git a/src/client/mod.rs b/src/client/mod.rs index 920186b6..98bab85b 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -42,6 +42,7 @@ pub use options::ClientOptions; use rabbitmq_stream_protocol::{ commands::{ close::{CloseRequest, CloseResponse}, + consumer_update_request::ConsumerUpdateRequestCommand, create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand, credit::CreditCommand, @@ -851,4 +852,15 @@ impl Client { Ok(config) } + + pub async fn consumer_update( + &self, + correlation_id: u32, + offset_specification: OffsetSpecification, + ) -> RabbitMQStreamResult { + self.send_and_receive(|_| { + ConsumerUpdateRequestCommand::new(correlation_id, 1, offset_specification) + }) + .await + } } diff --git a/src/consumer.rs b/src/consumer.rs index a4650ce1..f651b945 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -15,6 +15,8 @@ use rabbitmq_stream_protocol::{ commands::subscribe::OffsetSpecification, message::Message, ResponseKind, }; +use core::option::Option::None; + use tokio::sync::mpsc::{channel, Receiver, Sender}; use tracing::trace; @@ -26,13 +28,13 @@ use crate::{ Client, ClientOptions, Environment, MetricsCollector, }; use futures::{task::AtomicWaker, Stream}; -use rabbitmq_stream_protocol::commands::consumer_update::ConsumerUpdateCommand; use rand::rngs::StdRng; use rand::{seq::SliceRandom, SeedableRng}; type FilterPredicate = Option bool + Send + Sync>>; -type ConsumerUpdateListener = Option u64 + Send + Sync>>; +pub type ConsumerUpdateListener = + Arc OffsetSpecification + Send + Sync>; /// API for consuming RabbitMQ stream messages pub struct Consumer { @@ -43,6 +45,7 @@ pub struct Consumer { } struct ConsumerInternal { + name: Option, client: Client, stream: String, offset_specification: OffsetSpecification, @@ -52,6 +55,7 @@ struct ConsumerInternal { waker: AtomicWaker, metrics_collector: Arc, filter_configuration: Option, + consumer_update_listener: Option, } impl ConsumerInternal { @@ -86,22 +90,17 @@ impl FilterConfiguration { } pub struct MessageContext { - consumer: Consumer, - subscriber_name: String, - reference: String, + consumer_name: Option, + stream: String, } impl MessageContext { - pub fn get_consumer(self) -> Consumer { - self.consumer + pub fn get_name(self) -> Option { + self.consumer_name } - pub fn get_subscriber_name(self) -> String { - self.subscriber_name - } - - pub fn get_reference(self) -> String { - self.reference + pub fn get_stream(self) -> String { + self.stream } } @@ -111,6 +110,7 @@ pub struct ConsumerBuilder { pub(crate) environment: Environment, pub(crate) offset_specification: OffsetSpecification, pub(crate) filter_configuration: Option, + pub(crate) consumer_update_listener: Option, pub(crate) client_provided_name: String, pub(crate) properties: HashMap, } @@ -172,6 +172,7 @@ impl ConsumerBuilder { let subscription_id = 1; let (tx, rx) = channel(10000); let consumer = Arc::new(ConsumerInternal { + name: self.consumer_name.clone(), subscription_id, stream: stream.to_string(), client: client.clone(), @@ -181,6 +182,7 @@ impl ConsumerBuilder { waker: AtomicWaker::new(), metrics_collector: collector, filter_configuration: self.filter_configuration.clone(), + consumer_update_listener: self.consumer_update_listener.clone(), }); let msg_handler = ConsumerMessageHandler(consumer.clone()); client.set_handler(msg_handler).await; @@ -213,7 +215,7 @@ impl ConsumerBuilder { if response.is_ok() { Ok(Consumer { - name: self.consumer_name, + name: self.consumer_name.clone(), receiver: rx, internal: consumer, }) @@ -245,6 +247,26 @@ impl ConsumerBuilder { self } + pub fn consumer_update( + mut self, + consumer_update_listener: impl Fn(u8, &MessageContext) -> OffsetSpecification + + Send + + Sync + + 'static, + ) -> Self { + let f = Arc::new(consumer_update_listener); + self.consumer_update_listener = Some(f); + self + } + + pub fn consumer_update_arc( + mut self, + consumer_update_listener: Option, + ) -> Self { + self.consumer_update_listener = consumer_update_listener; + self + } + pub fn properties(mut self, properties: HashMap) -> Self { self.properties = properties; self @@ -386,8 +408,41 @@ impl MessageHandler for ConsumerMessageHandler { // TODO handle credit fail let _ = self.0.client.credit(self.0.subscription_id, 1).await; self.0.metrics_collector.consume(len as u64).await; - } else { - println!("other message arrived"); + } else if let ResponseKind::ConsumerUpdate(consumer_update) = response.kind_ref() { + trace!("Received a ConsumerUpdate message"); + // If no callback is provided by the user we will restart from Next by protocol + // We need to respond to the server too + if self.0.consumer_update_listener.is_none() { + trace!("User defined callback is not provided"); + let offset_specification = OffsetSpecification::Next; + let _ = self + .0 + .client + .consumer_update( + consumer_update.get_correlation_id(), + offset_specification, + ) + .await; + } else { + // Otherwise the Offset specification is returned by the user callback + let is_active = consumer_update.is_active(); + let message_context = MessageContext { + consumer_name: self.0.name.clone(), + stream: self.0.stream.clone(), + }; + let consumer_update_listener_callback = + self.0.consumer_update_listener.clone().unwrap(); + let offset_specification = + consumer_update_listener_callback(is_active, &message_context); + let _ = self + .0 + .client + .consumer_update( + consumer_update.get_correlation_id(), + offset_specification, + ) + .await; + } } } Some(Err(err)) => { diff --git a/src/environment.rs b/src/environment.rs index 290415d1..283bad4e 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -74,6 +74,7 @@ impl Environment { environment: self.clone(), offset_specification: OffsetSpecification::Next, filter_configuration: None, + consumer_update_listener: None, client_provided_name: String::from("rust-stream-consumer"), properties: HashMap::new(), } @@ -84,6 +85,7 @@ impl Environment { environment: self.clone(), offset_specification: OffsetSpecification::Next, filter_configuration: None, + consumer_update_listener: None, client_provided_name: String::from("rust-super-stream-consumer"), properties: HashMap::new(), } diff --git a/src/lib.rs b/src/lib.rs index 2fe8c661..67d15e64 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -87,14 +87,16 @@ pub type RabbitMQStreamResult = Result; pub use crate::client::{Client, ClientOptions, MetricsCollector}; -pub use crate::consumer::{Consumer, ConsumerBuilder, ConsumerHandle, FilterConfiguration}; +pub use crate::consumer::{ + Consumer, ConsumerBuilder, ConsumerHandle, FilterConfiguration, MessageContext, +}; pub use crate::environment::{Environment, EnvironmentBuilder, TlsConfiguration}; pub use crate::producer::{Dedup, NoDedup, Producer, ProducerBuilder}; pub mod types { pub use crate::byte_capacity::ByteCapacity; pub use crate::client::{Broker, MessageResult, StreamMetadata}; - pub use crate::consumer::Delivery; + pub use crate::consumer::{Delivery, MessageContext}; pub use crate::offset_specification::OffsetSpecification; pub use crate::stream_creator::StreamCreator; pub use crate::superstream::HashRoutingMurmurStrategy; diff --git a/src/superstream_consumer.rs b/src/superstream_consumer.rs index b7c61e14..cdb299c0 100644 --- a/src/superstream_consumer.rs +++ b/src/superstream_consumer.rs @@ -1,8 +1,10 @@ use crate::client::Client; -use crate::consumer::Delivery; +use crate::consumer::{ConsumerUpdateListener, Delivery}; use crate::error::{ConsumerCloseError, ConsumerDeliveryError}; use crate::superstream::DefaultSuperStreamMetadata; -use crate::{error::ConsumerCreateError, ConsumerHandle, Environment, FilterConfiguration}; +use crate::{ + error::ConsumerCreateError, ConsumerHandle, Environment, FilterConfiguration, MessageContext, +}; use futures::task::AtomicWaker; use futures::{Stream, StreamExt}; use rabbitmq_stream_protocol::commands::subscribe::OffsetSpecification; @@ -33,6 +35,7 @@ pub struct SuperStreamConsumerBuilder { pub(crate) environment: Environment, pub(crate) offset_specification: OffsetSpecification, pub(crate) filter_configuration: Option, + pub(crate) consumer_update_listener: Option, pub(crate) client_provided_name: String, pub(crate) properties: HashMap, } @@ -64,6 +67,7 @@ impl SuperStreamConsumerBuilder { .offset(self.offset_specification.clone()) .client_provided_name(self.client_provided_name.as_str()) .filter_input(self.filter_configuration.clone()) + .consumer_update_arc(self.consumer_update_listener.clone()) .properties(self.properties.clone()) .build(partition.as_str()) .await @@ -101,6 +105,18 @@ impl SuperStreamConsumerBuilder { self } + pub fn consumer_update( + mut self, + consumer_update_listener: impl Fn(u8, &MessageContext) -> OffsetSpecification + + Send + + Sync + + 'static, + ) -> Self { + let f = Arc::new(consumer_update_listener); + self.consumer_update_listener = Some(f); + self + } + pub fn client_provided_name(mut self, name: &str) -> Self { self.client_provided_name = String::from(name); self diff --git a/tests/integration/client_test.rs b/tests/integration/client_test.rs index 03c9a4c4..4051c484 100644 --- a/tests/integration/client_test.rs +++ b/tests/integration/client_test.rs @@ -457,16 +457,3 @@ async fn client_test_route_test() { test.partitions.get(0).unwrap() ); } - -#[tokio::test(flavor = "multi_thread")] -async fn client_consumer_update_request_test() { - let test = TestClient::create().await; - - let response = test - .client - .consumer_update(OffsetSpecification::Next) - .await - .unwrap(); - - assert_eq!(&ResponseCode::Ok, response.code()); -} From fc9578dd2f265121f8ba66f9d89ecc35f295b85f Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Mon, 11 Nov 2024 10:50:48 +0100 Subject: [PATCH 05/14] adding basic test --- src/consumer.rs | 4 +- tests/integration/consumer_test.rs | 129 ++++++++++++++++++++++++++++- 2 files changed, 128 insertions(+), 5 deletions(-) diff --git a/src/consumer.rs b/src/consumer.rs index f651b945..4eb3567d 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -99,8 +99,8 @@ impl MessageContext { self.consumer_name } - pub fn get_stream(self) -> String { - self.stream + pub fn get_stream(&self) -> String { + self.stream.clone() } } diff --git a/tests/integration/consumer_test.rs b/tests/integration/consumer_test.rs index f4f18996..0f26aacd 100644 --- a/tests/integration/consumer_test.rs +++ b/tests/integration/consumer_test.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use crate::common::TestEnvironment; +use crate::common::{consumer_update_handler, TestEnvironment}; use fake::{Fake, Faker}; use futures::StreamExt; use rabbitmq_stream_client::{ @@ -11,10 +11,10 @@ use rabbitmq_stream_client::{ types::{Delivery, Message, OffsetSpecification, SuperStreamConsumer}, Consumer, FilterConfiguration, NoDedup, Producer, }; -use tokio::task; -use tokio::time::sleep; +use std::collections::HashMap; use crate::producer_test::routing_key_strategy_value_extractor; +use rabbitmq_stream_client::types::MessageContext; use rabbitmq_stream_client::types::{ HashRoutingMurmurStrategy, RoutingKeyRoutingStrategy, RoutingStrategy, }; @@ -667,3 +667,126 @@ async fn consumer_test_with_filtering_match_unfiltered() { assert!(repsonse_length == filtering_response_length); } + +#[tokio::test(flavor = "multi_thread")] +async fn super_stream_single_active_consumer_test() { + let env = TestEnvironment::create_super_stream().await; + + let message_count = 1000; + let mut super_stream_producer = env + .env + .super_stream_producer(RoutingStrategy::HashRoutingStrategy( + HashRoutingMurmurStrategy { + routing_extractor: &hash_strategy_value_extractor, + }, + )) + .client_provided_name("test super stream consumer ") + .build(&env.super_stream) + .await + .unwrap(); + + let mut properties = HashMap::new(); + + properties.insert("single-active-consumer".to_string(), "true".to_string()); + properties.insert("name".to_string(), "consumer-group-1".to_string()); + properties.insert("super-stream".to_string(), env.super_stream.clone()); + + let mut super_stream_consumer: SuperStreamConsumer = env + .env + .super_stream_consumer() + .offset(OffsetSpecification::First) + .properties(properties) + .build(&env.super_stream) + .await + .unwrap(); + + for n in 0..message_count { + let msg = Message::builder().body(format!("message{}", n)).build(); + let _ = super_stream_producer + .send(msg, |confirmation_status| async move {}) + .await + .unwrap(); + } + + let mut received_messages = 0; + let handle = super_stream_consumer.handle(); + + while let _ = super_stream_consumer.next().await.unwrap() { + received_messages = received_messages + 1; + if received_messages == message_count { + break; + } + } + + assert!(received_messages == message_count); + + super_stream_producer.close().await.unwrap(); + _ = handle.close().await; +} + +#[tokio::test(flavor = "multi_thread")] +async fn super_stream_single_active_consumer_test_with_callback() { + let env = TestEnvironment::create_super_stream().await; + + let message_count = 1000; + let mut super_stream_producer = env + .env + .super_stream_producer(RoutingStrategy::HashRoutingStrategy( + HashRoutingMurmurStrategy { + routing_extractor: &hash_strategy_value_extractor, + }, + )) + .client_provided_name("test super stream consumer ") + .build(&env.super_stream) + .await + .unwrap(); + + let mut properties = HashMap::new(); + + properties.insert("single-active-consumer".to_string(), "true".to_string()); + properties.insert("name".to_string(), "consumer-group-1".to_string()); + properties.insert("super-stream".to_string(), env.super_stream.clone()); + + let mut is_active: u8 = 0; + let mut stream: String = String::from(""); + + let mut super_stream_consumer: SuperStreamConsumer = env + .env + .super_stream_consumer() + .offset(OffsetSpecification::First) + .properties(properties) + .consumer_update(move |active, message_context| { + let mut stream_int = stream.clone(); + async move { + is_active = active; + stream_int = message_context.get_stream().clone(); + }; + OffsetSpecification::First + }) + .build(&env.super_stream) + .await + .unwrap(); + + for n in 0..message_count { + let msg = Message::builder().body(format!("message{}", n)).build(); + let _ = super_stream_producer + .send(msg, |confirmation_status| async move {}) + .await + .unwrap(); + } + + let mut received_messages = 0; + let handle = super_stream_consumer.handle(); + + while let _ = super_stream_consumer.next().await.unwrap() { + received_messages = received_messages + 1; + if received_messages == message_count { + break; + } + } + + assert!(received_messages == message_count); + + super_stream_producer.close().await.unwrap(); + _ = handle.close().await; +} From 5040aa4b3d92f432afe8f1bfe17d250b069b3d37 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Mon, 11 Nov 2024 15:02:08 +0100 Subject: [PATCH 06/14] improved test --- tests/integration/consumer_test.rs | 84 ++++++++++++++++++++++++++---- 1 file changed, 73 insertions(+), 11 deletions(-) diff --git a/tests/integration/consumer_test.rs b/tests/integration/consumer_test.rs index 0f26aacd..eb2251c1 100644 --- a/tests/integration/consumer_test.rs +++ b/tests/integration/consumer_test.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use crate::common::{consumer_update_handler, TestEnvironment}; +use crate::common::TestEnvironment; use fake::{Fake, Faker}; use futures::StreamExt; use rabbitmq_stream_client::{ @@ -20,6 +20,8 @@ use rabbitmq_stream_client::types::{ }; use rabbitmq_stream_protocol::ResponseCode; use std::sync::atomic::{AtomicU32, Ordering}; +use tokio::sync::Notify; +use tokio::task; use {std::sync::Arc, std::sync::Mutex}; #[tokio::test(flavor = "multi_thread")] @@ -686,6 +688,7 @@ async fn super_stream_single_active_consumer_test() { .unwrap(); let mut properties = HashMap::new(); + let notify_received_messages = Arc::new(Notify::new()); properties.insert("single-active-consumer".to_string(), "true".to_string()); properties.insert("name".to_string(), "consumer-group-1".to_string()); @@ -695,7 +698,25 @@ async fn super_stream_single_active_consumer_test() { .env .super_stream_consumer() .offset(OffsetSpecification::First) - .properties(properties) + .properties(properties.clone()) + .build(&env.super_stream) + .await + .unwrap(); + + let mut super_stream_consumer_2: SuperStreamConsumer = env + .env + .super_stream_consumer() + .offset(OffsetSpecification::First) + .properties(properties.clone()) + .build(&env.super_stream) + .await + .unwrap(); + + let mut super_stream_consumer_3: SuperStreamConsumer = env + .env + .super_stream_consumer() + .offset(OffsetSpecification::First) + .properties(properties.clone()) .build(&env.super_stream) .await .unwrap(); @@ -708,20 +729,61 @@ async fn super_stream_single_active_consumer_test() { .unwrap(); } - let mut received_messages = 0; - let handle = super_stream_consumer.handle(); + let mut received_messages = Arc::new(AtomicU32::new(1)); + let handle_consumer_1 = super_stream_consumer.handle(); + let handle_consumer_2 = super_stream_consumer_2.handle(); + let handle_consumer_3 = super_stream_consumer_3.handle(); + + let received_message_outer = received_messages.clone(); + let notify_received_messages_outer = notify_received_messages.clone(); + task::spawn(async move { + let received_messages_int = received_message_outer.clone(); + let notify_received_messages_inner = notify_received_messages_outer.clone(); + while let _ = super_stream_consumer.next().await.unwrap() { + let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); + if message_count == rec_msg { + notify_received_messages_inner.notify_one(); + break; + } + } + }); - while let _ = super_stream_consumer.next().await.unwrap() { - received_messages = received_messages + 1; - if received_messages == message_count { - break; + let received_message_outer = received_messages.clone(); + let notify_received_messages_outer = notify_received_messages.clone(); + task::spawn(async move { + let received_messages_int = received_message_outer.clone(); + let notify_received_messages_inner = notify_received_messages_outer.clone(); + while let _ = super_stream_consumer_2.next().await.unwrap() { + let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); + if message_count == rec_msg { + notify_received_messages_inner.notify_one(); + break; + } } - } + }); - assert!(received_messages == message_count); + let received_message_outer = received_messages.clone(); + let notify_received_messages_outer = notify_received_messages.clone(); + task::spawn(async move { + let received_messages_int = received_message_outer.clone(); + let notify_received_messages_inner = notify_received_messages_outer.clone(); + while let _ = super_stream_consumer_3.next().await.unwrap() { + let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); + if message_count == rec_msg { + notify_received_messages_inner.notify_one(); + break; + } + } + }); + + notify_received_messages.notified().await; + + assert!(received_messages.load(Ordering::Relaxed) == message_count + 1); super_stream_producer.close().await.unwrap(); - _ = handle.close().await; + _ = handle_consumer_1.close().await; + _ = handle_consumer_2.close().await; + _ = handle_consumer_3.close().await; } #[tokio::test(flavor = "multi_thread")] From ff83eb7ea6d409e45e658d276a6331531dc3882c Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Tue, 12 Nov 2024 10:30:48 +0100 Subject: [PATCH 07/14] expand unit test scope --- src/consumer.rs | 4 +- tests/integration/consumer_test.rs | 116 ++++++++++++++++++++++++----- 2 files changed, 101 insertions(+), 19 deletions(-) diff --git a/src/consumer.rs b/src/consumer.rs index 4eb3567d..bca28d6d 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -95,8 +95,8 @@ pub struct MessageContext { } impl MessageContext { - pub fn get_name(self) -> Option { - self.consumer_name + pub fn get_name(&self) -> Option { + self.consumer_name.clone() } pub fn get_stream(&self) -> String { diff --git a/tests/integration/consumer_test.rs b/tests/integration/consumer_test.rs index eb2251c1..6d2e56c6 100644 --- a/tests/integration/consumer_test.rs +++ b/tests/integration/consumer_test.rs @@ -804,27 +804,59 @@ async fn super_stream_single_active_consumer_test_with_callback() { .unwrap(); let mut properties = HashMap::new(); + let notify_received_messages = Arc::new(Notify::new()); + + let mut result_stream_name_1 = Arc::new(Mutex::new(String::from(""))); + let mut result_stream_name_2 = Arc::new(Mutex::new(String::from(""))); + let mut result_stream_name_3 = Arc::new(Mutex::new(String::from(""))); properties.insert("single-active-consumer".to_string(), "true".to_string()); properties.insert("name".to_string(), "consumer-group-1".to_string()); properties.insert("super-stream".to_string(), env.super_stream.clone()); - let mut is_active: u8 = 0; - let mut stream: String = String::from(""); + let mut result_stream_name_outer = result_stream_name_1.clone(); + let mut result_stream_name_2_outer = result_stream_name_2.clone(); + let mut result_stream_name_3_outer = result_stream_name_3.clone(); let mut super_stream_consumer: SuperStreamConsumer = env .env .super_stream_consumer() .offset(OffsetSpecification::First) - .properties(properties) .consumer_update(move |active, message_context| { - let mut stream_int = stream.clone(); - async move { - is_active = active; - stream_int = message_context.get_stream().clone(); - }; + let mut result_consumer_name_int = result_stream_name_outer.clone(); + *result_consumer_name_int.lock().unwrap() = message_context.get_stream().clone(); + + OffsetSpecification::First + }) + .properties(properties.clone()) + .build(&env.super_stream) + .await + .unwrap(); + + let mut super_stream_consumer_2: SuperStreamConsumer = env + .env + .super_stream_consumer() + .offset(OffsetSpecification::First) + .consumer_update(move |active, message_context| { + let mut result_consumer_name_int = result_stream_name_2_outer.clone(); + *result_consumer_name_int.lock().unwrap() = message_context.get_stream().clone(); OffsetSpecification::First }) + .properties(properties.clone()) + .build(&env.super_stream) + .await + .unwrap(); + + let mut super_stream_consumer_3: SuperStreamConsumer = env + .env + .super_stream_consumer() + .offset(OffsetSpecification::First) + .consumer_update(move |active, message_context| { + let mut result_consumer_name_int = result_stream_name_3_outer.clone(); + *result_consumer_name_int.lock().unwrap() = message_context.get_stream().clone(); + OffsetSpecification::First + }) + .properties(properties.clone()) .build(&env.super_stream) .await .unwrap(); @@ -837,18 +869,68 @@ async fn super_stream_single_active_consumer_test_with_callback() { .unwrap(); } - let mut received_messages = 0; - let handle = super_stream_consumer.handle(); + let mut received_messages = Arc::new(AtomicU32::new(1)); + let handle_consumer_1 = super_stream_consumer.handle(); + let handle_consumer_2 = super_stream_consumer_2.handle(); + let handle_consumer_3 = super_stream_consumer_3.handle(); - while let _ = super_stream_consumer.next().await.unwrap() { - received_messages = received_messages + 1; - if received_messages == message_count { - break; + let received_message_outer = received_messages.clone(); + let notify_received_messages_outer = notify_received_messages.clone(); + task::spawn(async move { + let received_messages_int = received_message_outer.clone(); + let notify_received_messages_inner = notify_received_messages_outer.clone(); + while let _ = super_stream_consumer.next().await.unwrap() { + let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); + if message_count == rec_msg { + notify_received_messages_inner.notify_one(); + break; + } } - } + }); - assert!(received_messages == message_count); + let received_message_outer = received_messages.clone(); + let notify_received_messages_outer = notify_received_messages.clone(); + task::spawn(async move { + let received_messages_int = received_message_outer.clone(); + let notify_received_messages_inner = notify_received_messages_outer.clone(); + while let _ = super_stream_consumer_2.next().await.unwrap() { + let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); + if message_count == rec_msg { + notify_received_messages_inner.notify_one(); + break; + } + } + }); + + let received_message_outer = received_messages.clone(); + let notify_received_messages_outer = notify_received_messages.clone(); + task::spawn(async move { + let received_messages_int = received_message_outer.clone(); + let notify_received_messages_inner = notify_received_messages_outer.clone(); + while let _ = super_stream_consumer_3.next().await.unwrap() { + let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); + if message_count == rec_msg { + notify_received_messages_inner.notify_one(); + break; + } + } + }); + + notify_received_messages.notified().await; + + assert!(received_messages.load(Ordering::Relaxed) == message_count + 1); + assert!(env + .partitions + .contains(&(*result_stream_name_1.clone().lock().unwrap()))); + assert!(env + .partitions + .contains(&(*result_stream_name_2.clone().lock().unwrap()))); + assert!(env + .partitions + .contains(&(*result_stream_name_3.clone().lock().unwrap()))); super_stream_producer.close().await.unwrap(); - _ = handle.close().await; + _ = handle_consumer_1.close().await; + _ = handle_consumer_2.close().await; + _ = handle_consumer_3.close().await; } From d1c21c58053a3e8c46097c808b42041a49a5222d Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Tue, 12 Nov 2024 14:40:37 +0100 Subject: [PATCH 08/14] adding example --- .../single_active_consumer_super_stream.rs | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 examples/single_active_consumer/single_active_consumer_super_stream.rs diff --git a/examples/single_active_consumer/single_active_consumer_super_stream.rs b/examples/single_active_consumer/single_active_consumer_super_stream.rs new file mode 100644 index 00000000..3edf03a9 --- /dev/null +++ b/examples/single_active_consumer/single_active_consumer_super_stream.rs @@ -0,0 +1,77 @@ +use futures::StreamExt; +use rabbitmq_stream_client::error::StreamCreateError; +use rabbitmq_stream_client::types::{ + ByteCapacity, OffsetSpecification, ResponseCode, SuperStreamConsumer, +}; +use std::collections::HashMap; + +#[tokio::main] +async fn main() -> Result<(), Box> { + use rabbitmq_stream_client::Environment; + let environment = Environment::builder().build().await?; + let message_count = 1000000; + let super_stream = "hello-rust-super-stream"; + + let create_response = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create_super_stream(super_stream, 3, None) + .await; + + if let Err(e) = create_response { + if let StreamCreateError::Create { stream, status } = e { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); + } + } + } + } + println!( + "Super stream consumer example, consuming messages from the super stream {}", + super_stream + ); + + let mut properties = HashMap::new(); + + properties.insert("single-active-consumer".to_string(), "true".to_string()); + properties.insert("name".to_string(), "consumer-group-1".to_string()); + properties.insert("super-stream".to_string(), "hello-rust-super-stream".to_string()); + + let mut super_stream_consumer: SuperStreamConsumer = environment + .super_stream_consumer() + .offset(OffsetSpecification::First) + .client_provided_name("my super stream consumer for hello rust") + .consumer_update(move |active, message_context| { + println!("single active consumer: is active: {} on stream {}", active, message_context.get_stream()); + OffsetSpecification::First + }) + .properties(properties) + .build(super_stream) + .await + .unwrap(); + + for _ in 0..message_count { + let delivery = super_stream_consumer.next().await.unwrap(); + { + let delivery = delivery.unwrap(); + println!( + "Got message: {:#?} from stream: {} with offset: {}", + delivery + .message() + .data() + .map(|data| String::from_utf8(data.to_vec()).unwrap()) + .unwrap(), + delivery.stream(), + delivery.offset() + ); + } + } + + println!("Stopping super stream consumer..."); + let _ = super_stream_consumer.handle().close().await; + println!("Super stream consumer stopped"); + Ok(()) +} \ No newline at end of file From 79044113ec65e9b32a93d40f7e2b54433c80a300 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Wed, 13 Nov 2024 09:37:32 +0100 Subject: [PATCH 09/14] Adding README --- examples/single_active_consumer/README.md | 23 +++++++++++++++++++ .../single_active_consumer_super_stream.rs | 2 ++ 2 files changed, 25 insertions(+) create mode 100644 examples/single_active_consumer/README.md diff --git a/examples/single_active_consumer/README.md b/examples/single_active_consumer/README.md new file mode 100644 index 00000000..8ef27bea --- /dev/null +++ b/examples/single_active_consumer/README.md @@ -0,0 +1,23 @@ +Single active consumer +--- + +This is an example to enable single active consumer functionality for superstream: +https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams +https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams + +This folder contains a super-stream consumer configured to enable it. +You can use the example in the super-stream folder to produce messages for a super-stream. + +You can then run the consumer in this folder. +Assuming the super-stream is composed by three streams, you can see that the Consumer will consume messages from all the streams part of the superstream. + +You can then run another consumer in parallel. +now you'll see that one of the two consumers will consume from 2 streams while the other on one stream. + +If you run another you'll see that every Consumer will read from a single stream. + +If you then stop one of the Consumer you'll notice that the related stream is now read from on the Consumer which is still running. + + + + diff --git a/examples/single_active_consumer/single_active_consumer_super_stream.rs b/examples/single_active_consumer/single_active_consumer_super_stream.rs index 3edf03a9..37e066b5 100644 --- a/examples/single_active_consumer/single_active_consumer_super_stream.rs +++ b/examples/single_active_consumer/single_active_consumer_super_stream.rs @@ -44,6 +44,8 @@ async fn main() -> Result<(), Box> { .super_stream_consumer() .offset(OffsetSpecification::First) .client_provided_name("my super stream consumer for hello rust") + /*We can decide a strategy to manage Offset specification in single active consumer based on is_active flag + By default if this clousure is not present the default strategy OffsetSpecification::NEXT will be set.*/ .consumer_update(move |active, message_context| { println!("single active consumer: is active: {} on stream {}", active, message_context.get_stream()); OffsetSpecification::First From fcc11c1661b14c1e3733877bb84add0849f17c7c Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Thu, 14 Nov 2024 15:36:53 +0100 Subject: [PATCH 10/14] enabling naming for super_stream consumers and setting up sac properties internally --- .../single_active_consumer_super_stream.rs | 12 ++------ src/consumer.rs | 26 ++++++++++++++++- src/environment.rs | 3 ++ src/error.rs | 3 ++ src/superstream_consumer.rs | 28 ++++++++++++++++++- tests/integration/consumer_test.rs | 28 ++++++++----------- 6 files changed, 73 insertions(+), 27 deletions(-) diff --git a/examples/single_active_consumer/single_active_consumer_super_stream.rs b/examples/single_active_consumer/single_active_consumer_super_stream.rs index 37e066b5..51b74e44 100644 --- a/examples/single_active_consumer/single_active_consumer_super_stream.rs +++ b/examples/single_active_consumer/single_active_consumer_super_stream.rs @@ -34,23 +34,17 @@ async fn main() -> Result<(), Box> { super_stream ); - let mut properties = HashMap::new(); - - properties.insert("single-active-consumer".to_string(), "true".to_string()); - properties.insert("name".to_string(), "consumer-group-1".to_string()); - properties.insert("super-stream".to_string(), "hello-rust-super-stream".to_string()); - let mut super_stream_consumer: SuperStreamConsumer = environment .super_stream_consumer() + // Mandatory if sac is enabled + .name("consumer-group-1") .offset(OffsetSpecification::First) + .enable_single_active_consumer(true) .client_provided_name("my super stream consumer for hello rust") - /*We can decide a strategy to manage Offset specification in single active consumer based on is_active flag - By default if this clousure is not present the default strategy OffsetSpecification::NEXT will be set.*/ .consumer_update(move |active, message_context| { println!("single active consumer: is active: {} on stream {}", active, message_context.get_stream()); OffsetSpecification::First }) - .properties(properties) .build(super_stream) .await .unwrap(); diff --git a/src/consumer.rs b/src/consumer.rs index bca28d6d..97857c32 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -113,13 +113,20 @@ pub struct ConsumerBuilder { pub(crate) consumer_update_listener: Option, pub(crate) client_provided_name: String, pub(crate) properties: HashMap, + pub(crate) is_single_active_consumer: bool, } impl ConsumerBuilder { pub async fn build(mut self, stream: &str) -> Result { + if (self.is_single_active_consumer + || self.properties.contains_key("single-active-consumer")) + && self.consumer_name.is_none() + { + return Err(ConsumerCreateError::SingleActiveConsumerNotSupported); + } + // Connect to the user specified node first, then look for a random replica to connect to instead. // This is recommended for load balancing purposes - let mut opt_with_client_provided_name = self.environment.options.client_options.clone(); opt_with_client_provided_name.client_provided_name = self.client_provided_name.clone(); @@ -203,6 +210,13 @@ impl ConsumerBuilder { ); } + if self.is_single_active_consumer { + self.properties + .insert("single-active-consumer".to_string(), "true".to_string()); + self.properties + .insert("name".to_string(), self.consumer_name.clone().unwrap()); + } + let response = client .subscribe( subscription_id, @@ -242,6 +256,16 @@ impl ConsumerBuilder { self } + pub fn name_optional(mut self, consumer_name: Option) -> Self { + self.consumer_name = consumer_name; + self + } + + pub fn enable_single_active_consumer(mut self, is_single_active_consumer: bool) -> Self { + self.is_single_active_consumer = is_single_active_consumer; + self + } + pub fn filter_input(mut self, filter_configuration: Option) -> Self { self.filter_configuration = filter_configuration; self diff --git a/src/environment.rs b/src/environment.rs index 283bad4e..859cd845 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -77,17 +77,20 @@ impl Environment { consumer_update_listener: None, client_provided_name: String::from("rust-stream-consumer"), properties: HashMap::new(), + is_single_active_consumer: false, } } pub fn super_stream_consumer(&self) -> SuperStreamConsumerBuilder { SuperStreamConsumerBuilder { + super_stream_consumer_name: None, environment: self.clone(), offset_specification: OffsetSpecification::Next, filter_configuration: None, consumer_update_listener: None, client_provided_name: String::from("rust-super-stream-consumer"), properties: HashMap::new(), + is_single_active_consumer: false, } } diff --git a/src/error.rs b/src/error.rs index 250f82e5..f138a699 100644 --- a/src/error.rs +++ b/src/error.rs @@ -159,6 +159,9 @@ pub enum ConsumerCreateError { #[error("Filtering is not supported by the broker (requires RabbitMQ 3.13+ and stream_filtering feature flag activated)")] FilteringNotSupport, + + #[error("if you set single active consumer a consumer and super_stream consumer name need to be setup")] + SingleActiveConsumerNotSupported, } #[derive(Error, Debug)] diff --git a/src/superstream_consumer.rs b/src/superstream_consumer.rs index cdb299c0..43b9f5b3 100644 --- a/src/superstream_consumer.rs +++ b/src/superstream_consumer.rs @@ -32,21 +32,30 @@ struct SuperStreamConsumerInternal { /// Builder for [`Consumer`] pub struct SuperStreamConsumerBuilder { + pub(crate) super_stream_consumer_name: Option, pub(crate) environment: Environment, pub(crate) offset_specification: OffsetSpecification, pub(crate) filter_configuration: Option, pub(crate) consumer_update_listener: Option, pub(crate) client_provided_name: String, + pub(crate) is_single_active_consumer: bool, pub(crate) properties: HashMap, } impl SuperStreamConsumerBuilder { pub async fn build( - self, + &mut self, super_stream: &str, ) -> Result { // Connect to the user specified node first, then look for a random replica to connect to instead. // This is recommended for load balancing purposes. + if (self.is_single_active_consumer + || self.properties.contains_key("single-active-consumer")) + && self.super_stream_consumer_name.is_none() + { + return Err(ConsumerCreateError::SingleActiveConsumerNotSupported); + } + let client = self.environment.create_client().await?; let (tx, rx) = channel(10000); @@ -58,17 +67,24 @@ impl SuperStreamConsumerBuilder { }; let partitions = super_stream_metadata.partitions().await; + if self.is_single_active_consumer { + self.properties + .insert("super-stream".to_string(), super_stream.to_string()); + } + let mut handlers = Vec::::new(); for partition in partitions.into_iter() { let tx_cloned = tx.clone(); let mut consumer = self .environment .consumer() + .name_optional(self.super_stream_consumer_name.clone()) .offset(self.offset_specification.clone()) .client_provided_name(self.client_provided_name.as_str()) .filter_input(self.filter_configuration.clone()) .consumer_update_arc(self.consumer_update_listener.clone()) .properties(self.properties.clone()) + .enable_single_active_consumer(self.is_single_active_consumer) .build(partition.as_str()) .await .unwrap(); @@ -100,6 +116,16 @@ impl SuperStreamConsumerBuilder { self } + pub fn name(mut self, consumer_name: &str) -> Self { + self.super_stream_consumer_name = Some(String::from(consumer_name)); + self + } + + pub fn enable_single_active_consumer(mut self, is_single_active_consumer: bool) -> Self { + self.is_single_active_consumer = is_single_active_consumer; + self + } + pub fn filter_input(mut self, filter_configuration: Option) -> Self { self.filter_configuration = filter_configuration; self diff --git a/tests/integration/consumer_test.rs b/tests/integration/consumer_test.rs index 6d2e56c6..4c2ce0a0 100644 --- a/tests/integration/consumer_test.rs +++ b/tests/integration/consumer_test.rs @@ -687,18 +687,14 @@ async fn super_stream_single_active_consumer_test() { .await .unwrap(); - let mut properties = HashMap::new(); let notify_received_messages = Arc::new(Notify::new()); - properties.insert("single-active-consumer".to_string(), "true".to_string()); - properties.insert("name".to_string(), "consumer-group-1".to_string()); - properties.insert("super-stream".to_string(), env.super_stream.clone()); - let mut super_stream_consumer: SuperStreamConsumer = env .env .super_stream_consumer() + .name("super-stream-with-sac-enabled") + .enable_single_active_consumer(true) .offset(OffsetSpecification::First) - .properties(properties.clone()) .build(&env.super_stream) .await .unwrap(); @@ -706,8 +702,9 @@ async fn super_stream_single_active_consumer_test() { let mut super_stream_consumer_2: SuperStreamConsumer = env .env .super_stream_consumer() + .name("super-stream-with-sac-enabled") + .enable_single_active_consumer(true) .offset(OffsetSpecification::First) - .properties(properties.clone()) .build(&env.super_stream) .await .unwrap(); @@ -715,8 +712,9 @@ async fn super_stream_single_active_consumer_test() { let mut super_stream_consumer_3: SuperStreamConsumer = env .env .super_stream_consumer() + .name("super-stream-with-sac-enabled") + .enable_single_active_consumer(true) .offset(OffsetSpecification::First) - .properties(properties.clone()) .build(&env.super_stream) .await .unwrap(); @@ -803,17 +801,12 @@ async fn super_stream_single_active_consumer_test_with_callback() { .await .unwrap(); - let mut properties = HashMap::new(); let notify_received_messages = Arc::new(Notify::new()); let mut result_stream_name_1 = Arc::new(Mutex::new(String::from(""))); let mut result_stream_name_2 = Arc::new(Mutex::new(String::from(""))); let mut result_stream_name_3 = Arc::new(Mutex::new(String::from(""))); - properties.insert("single-active-consumer".to_string(), "true".to_string()); - properties.insert("name".to_string(), "consumer-group-1".to_string()); - properties.insert("super-stream".to_string(), env.super_stream.clone()); - let mut result_stream_name_outer = result_stream_name_1.clone(); let mut result_stream_name_2_outer = result_stream_name_2.clone(); let mut result_stream_name_3_outer = result_stream_name_3.clone(); @@ -821,6 +814,8 @@ async fn super_stream_single_active_consumer_test_with_callback() { let mut super_stream_consumer: SuperStreamConsumer = env .env .super_stream_consumer() + .name("super-stream-with-sac-enabled") + .enable_single_active_consumer(true) .offset(OffsetSpecification::First) .consumer_update(move |active, message_context| { let mut result_consumer_name_int = result_stream_name_outer.clone(); @@ -828,7 +823,6 @@ async fn super_stream_single_active_consumer_test_with_callback() { OffsetSpecification::First }) - .properties(properties.clone()) .build(&env.super_stream) .await .unwrap(); @@ -836,13 +830,14 @@ async fn super_stream_single_active_consumer_test_with_callback() { let mut super_stream_consumer_2: SuperStreamConsumer = env .env .super_stream_consumer() + .name("super-stream-with-sac-enabled") + .enable_single_active_consumer(true) .offset(OffsetSpecification::First) .consumer_update(move |active, message_context| { let mut result_consumer_name_int = result_stream_name_2_outer.clone(); *result_consumer_name_int.lock().unwrap() = message_context.get_stream().clone(); OffsetSpecification::First }) - .properties(properties.clone()) .build(&env.super_stream) .await .unwrap(); @@ -850,13 +845,14 @@ async fn super_stream_single_active_consumer_test_with_callback() { let mut super_stream_consumer_3: SuperStreamConsumer = env .env .super_stream_consumer() + .name("super-stream-with-sac-enabled") + .enable_single_active_consumer(true) .offset(OffsetSpecification::First) .consumer_update(move |active, message_context| { let mut result_consumer_name_int = result_stream_name_3_outer.clone(); *result_consumer_name_int.lock().unwrap() = message_context.get_stream().clone(); OffsetSpecification::First }) - .properties(properties.clone()) .build(&env.super_stream) .await .unwrap(); From bb6ee6fd8d59745f93972f64831f1c0346f4fd39 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Thu, 14 Nov 2024 15:51:02 +0100 Subject: [PATCH 11/14] expanding test --- tests/integration/consumer_test.rs | 32 +++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/tests/integration/consumer_test.rs b/tests/integration/consumer_test.rs index 4c2ce0a0..d388e787 100644 --- a/tests/integration/consumer_test.rs +++ b/tests/integration/consumer_test.rs @@ -787,6 +787,7 @@ async fn super_stream_single_active_consumer_test() { #[tokio::test(flavor = "multi_thread")] async fn super_stream_single_active_consumer_test_with_callback() { let env = TestEnvironment::create_super_stream().await; + let super_stream_consumer_name = "super-stream-with-sac-enabled"; let message_count = 1000; let mut super_stream_producer = env @@ -807,19 +808,29 @@ async fn super_stream_single_active_consumer_test_with_callback() { let mut result_stream_name_2 = Arc::new(Mutex::new(String::from(""))); let mut result_stream_name_3 = Arc::new(Mutex::new(String::from(""))); + let mut result_name_1 = Arc::new(Mutex::new(String::from(""))); + let mut result_name_2 = Arc::new(Mutex::new(String::from(""))); + let mut result_name_3 = Arc::new(Mutex::new(String::from(""))); + let mut result_stream_name_outer = result_stream_name_1.clone(); let mut result_stream_name_2_outer = result_stream_name_2.clone(); let mut result_stream_name_3_outer = result_stream_name_3.clone(); + let mut result_name_1_outer = result_name_1.clone(); + let mut result_name_2_outer = result_name_2.clone(); + let mut result_name_3_outer = result_name_3.clone(); + let mut super_stream_consumer: SuperStreamConsumer = env .env .super_stream_consumer() - .name("super-stream-with-sac-enabled") + .name(super_stream_consumer_name) .enable_single_active_consumer(true) .offset(OffsetSpecification::First) .consumer_update(move |active, message_context| { - let mut result_consumer_name_int = result_stream_name_outer.clone(); - *result_consumer_name_int.lock().unwrap() = message_context.get_stream().clone(); + let mut result_stream_name_int = result_stream_name_outer.clone(); + let mut result_consumer_name_int = result_name_1_outer.clone(); + *result_stream_name_int.lock().unwrap() = message_context.get_stream().clone(); + *result_consumer_name_int.lock().unwrap() = message_context.get_name().clone().unwrap(); OffsetSpecification::First }) @@ -834,8 +845,10 @@ async fn super_stream_single_active_consumer_test_with_callback() { .enable_single_active_consumer(true) .offset(OffsetSpecification::First) .consumer_update(move |active, message_context| { - let mut result_consumer_name_int = result_stream_name_2_outer.clone(); - *result_consumer_name_int.lock().unwrap() = message_context.get_stream().clone(); + let mut result_stream_name_int = result_stream_name_2_outer.clone(); + let mut result_consumer_name_int = result_name_2_outer.clone(); + *result_stream_name_int.lock().unwrap() = message_context.get_stream().clone(); + *result_consumer_name_int.lock().unwrap() = message_context.get_name().clone().unwrap(); OffsetSpecification::First }) .build(&env.super_stream) @@ -849,8 +862,10 @@ async fn super_stream_single_active_consumer_test_with_callback() { .enable_single_active_consumer(true) .offset(OffsetSpecification::First) .consumer_update(move |active, message_context| { - let mut result_consumer_name_int = result_stream_name_3_outer.clone(); - *result_consumer_name_int.lock().unwrap() = message_context.get_stream().clone(); + let mut result_stream_name_int = result_stream_name_3_outer.clone(); + let mut result_consumer_name_int = result_name_3_outer.clone(); + *result_stream_name_int.lock().unwrap() = message_context.get_stream().clone(); + *result_consumer_name_int.lock().unwrap() = message_context.get_name().clone().unwrap(); OffsetSpecification::First }) .build(&env.super_stream) @@ -924,6 +939,9 @@ async fn super_stream_single_active_consumer_test_with_callback() { assert!(env .partitions .contains(&(*result_stream_name_3.clone().lock().unwrap()))); + assert!(super_stream_consumer_name == *result_name_1.clone().lock().unwrap()); + assert!(super_stream_consumer_name == *result_name_2.clone().lock().unwrap()); + assert!(super_stream_consumer_name == *result_name_3.clone().lock().unwrap()); super_stream_producer.close().await.unwrap(); _ = handle_consumer_1.close().await; From 116193fb8d78fed98bb9cf1ef06e23f03897ae80 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Fri, 15 Nov 2024 14:33:17 +0100 Subject: [PATCH 12/14] few improvements and test for simple SAC --- examples/single_active_consumer/README.md | 4 +- .../example_for_enrico.rs | 96 +++++++++++++++++++ .../single_active_consumer.rs | 72 ++++++++++++++ src/consumer.rs | 17 +++- src/superstream_consumer.rs | 2 +- 5 files changed, 183 insertions(+), 8 deletions(-) create mode 100644 examples/single_active_consumer/example_for_enrico.rs create mode 100644 examples/single_active_consumer/single_active_consumer.rs diff --git a/examples/single_active_consumer/README.md b/examples/single_active_consumer/README.md index 8ef27bea..29523a84 100644 --- a/examples/single_active_consumer/README.md +++ b/examples/single_active_consumer/README.md @@ -5,10 +5,10 @@ This is an example to enable single active consumer functionality for superstrea https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams -This folder contains a super-stream consumer configured to enable it. +This folder contains a consumer and a super-stream consumer configured to enable it. You can use the example in the super-stream folder to produce messages for a super-stream. -You can then run the consumer in this folder. +You can then run the single_active_consumer_super_stream.rs in this folder. Assuming the super-stream is composed by three streams, you can see that the Consumer will consume messages from all the streams part of the superstream. You can then run another consumer in parallel. diff --git a/examples/single_active_consumer/example_for_enrico.rs b/examples/single_active_consumer/example_for_enrico.rs new file mode 100644 index 00000000..c3bbde83 --- /dev/null +++ b/examples/single_active_consumer/example_for_enrico.rs @@ -0,0 +1,96 @@ +use futures::StreamExt; +use rabbitmq_stream_client::error::StreamCreateError; +use rabbitmq_stream_client::types::{ + ByteCapacity, OffsetSpecification, ResponseCode, +}; + + +#[tokio::main] +async fn main() -> Result<(), Box> { + use rabbitmq_stream_client::Environment; + let environment = Environment::builder().build().await?; + let message_count = 1000000; + let stream = "hello-rust-super-stream-0"; + + let create_response = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create(stream) + .await; + + if let Err(e) = create_response { + if let StreamCreateError::Create { stream, status } = e { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); + } + } + } + } + println!( + "Super stream consumer example, consuming messages from the super stream {}", + stream + ); + + let mut consumer = environment + .consumer() + // Mandatory if sac is enabled + .name("consumer-group-1") + .offset(OffsetSpecification::First) + .enable_single_active_consumer(true) + .client_provided_name("my super stream consumer for hello rust") + .consumer_update(move |active, message_context| async { + let name = message_context.name(); + let stream = message_context.stream(); + let client = message_context.client(); + + let mut stored_offset = 0; + println!("single active consumer: is active: {} on stream: {} with consumer_name: {}", active, stream, name); + let my_async = async { + println!("im hereXXXXXXXXXXX"); + stored_offset = client.query_offset(name, stream.as_str()).await.unwrap(); + println!("stored offset {}", stored_offset); + //OffsetSpecification::Offset(stored_offset) + + }; + println!("im hereXXXXXXXXXXX {}", stored_offset); + my_async.await; + OffsetSpecification::Offset(stored_offset) + + }) + .build(stream) + .await + .unwrap(); + + for i in 0..message_count { + let delivery = consumer.next().await.unwrap(); + { + let delivery = delivery.unwrap(); + /*println!( + "Got message: {:#?} from stream: {} with offset: {}", + delivery + .message() + .data() + .map(|data| String::from_utf8(data.to_vec()).unwrap()) + .unwrap(), + delivery.stream(), + delivery.offset() + );*/ + + //store an offset + if i == 10000 { + let _ = consumer + .store_offset(delivery.offset()) + .await + .unwrap_or_else(|e| println!("Err: {}", e)); + } + } + } + + println!("Stopping consumer..."); + let _ = consumer.handle().close().await; + println!("consumer stopped"); + Ok(()) +} \ No newline at end of file diff --git a/examples/single_active_consumer/single_active_consumer.rs b/examples/single_active_consumer/single_active_consumer.rs new file mode 100644 index 00000000..614afc31 --- /dev/null +++ b/examples/single_active_consumer/single_active_consumer.rs @@ -0,0 +1,72 @@ +use futures::StreamExt; +use rabbitmq_stream_client::error::StreamCreateError; +use rabbitmq_stream_client::types::{ + ByteCapacity, OffsetSpecification, ResponseCode, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + use rabbitmq_stream_client::Environment; + let environment = Environment::builder().build().await?; + let message_count = 1000000; + let stream = "hello-rust-stream"; + + let create_response = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create(stream) + .await; + + if let Err(e) = create_response { + if let StreamCreateError::Create { stream, status } = e { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); + } + } + } + } + println!( + "Super stream consumer example, consuming messages from the super stream {}", + stream + ); + + let mut consumer = environment + .consumer() + // Mandatory if sac is enabled + .name("consumer-group-1") + .offset(OffsetSpecification::First) + .enable_single_active_consumer(true) + .client_provided_name("my super stream consumer for hello rust") + .consumer_update(move |active, message_context| { + println!("single active consumer: is active: {} on stream {}", active, message_context.get_stream()); + OffsetSpecification::First + }) + .build(stream) + .await + .unwrap(); + + for _ in 0..message_count { + let delivery = consumer.next().await.unwrap(); + { + let delivery = delivery.unwrap(); + println!( + "Got message: {:#?} from stream: {} with offset: {}", + delivery + .message() + .data() + .map(|data| String::from_utf8(data.to_vec()).unwrap()) + .unwrap(), + delivery.stream(), + delivery.offset() + ); + } + } + + println!("Stopping consumer..."); + let _ = consumer.handle().close().await; + println!("consumer stopped"); + Ok(()) +} \ No newline at end of file diff --git a/src/consumer.rs b/src/consumer.rs index 97857c32..2ec83b86 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -89,19 +89,25 @@ impl FilterConfiguration { } } +#[derive(Clone)] pub struct MessageContext { - consumer_name: Option, + name: String, stream: String, + client: Client, } impl MessageContext { - pub fn get_name(&self) -> Option { - self.consumer_name.clone() + pub fn name(&self) -> String { + self.name.clone() } - pub fn get_stream(&self) -> String { + pub fn stream(&self) -> String { self.stream.clone() } + + pub fn client(&self) -> Client { + self.client.clone() + } } /// Builder for [`Consumer`] @@ -451,8 +457,9 @@ impl MessageHandler for ConsumerMessageHandler { // Otherwise the Offset specification is returned by the user callback let is_active = consumer_update.is_active(); let message_context = MessageContext { - consumer_name: self.0.name.clone(), + name: self.0.name.clone().unwrap(), stream: self.0.stream.clone(), + client: self.0.client.clone(), }; let consumer_update_listener_callback = self.0.consumer_update_listener.clone().unwrap(); diff --git a/src/superstream_consumer.rs b/src/superstream_consumer.rs index 43b9f5b3..54a5cf77 100644 --- a/src/superstream_consumer.rs +++ b/src/superstream_consumer.rs @@ -131,7 +131,7 @@ impl SuperStreamConsumerBuilder { self } - pub fn consumer_update( + pub fn consumer_update( mut self, consumer_update_listener: impl Fn(u8, &MessageContext) -> OffsetSpecification + Send From 7b19c5f8ec08ef4b4e6898088e249e2d2e8f47c6 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Sat, 16 Nov 2024 15:48:03 +0100 Subject: [PATCH 13/14] making consumer_update callback able to call async methods --- .../example_for_enrico.rs | 96 ------------------- .../single_active_consumer.rs | 35 ++++++- src/consumer.rs | 23 ++--- src/superstream_consumer.rs | 18 ++-- tests/integration/consumer_test.rs | 24 +++-- 5 files changed, 67 insertions(+), 129 deletions(-) delete mode 100644 examples/single_active_consumer/example_for_enrico.rs diff --git a/examples/single_active_consumer/example_for_enrico.rs b/examples/single_active_consumer/example_for_enrico.rs deleted file mode 100644 index c3bbde83..00000000 --- a/examples/single_active_consumer/example_for_enrico.rs +++ /dev/null @@ -1,96 +0,0 @@ -use futures::StreamExt; -use rabbitmq_stream_client::error::StreamCreateError; -use rabbitmq_stream_client::types::{ - ByteCapacity, OffsetSpecification, ResponseCode, -}; - - -#[tokio::main] -async fn main() -> Result<(), Box> { - use rabbitmq_stream_client::Environment; - let environment = Environment::builder().build().await?; - let message_count = 1000000; - let stream = "hello-rust-super-stream-0"; - - let create_response = environment - .stream_creator() - .max_length(ByteCapacity::GB(5)) - .create(stream) - .await; - - if let Err(e) = create_response { - if let StreamCreateError::Create { stream, status } = e { - match status { - // we can ignore this error because the stream already exists - ResponseCode::StreamAlreadyExists => {} - err => { - println!("Error creating stream: {:?} {:?}", stream, err); - } - } - } - } - println!( - "Super stream consumer example, consuming messages from the super stream {}", - stream - ); - - let mut consumer = environment - .consumer() - // Mandatory if sac is enabled - .name("consumer-group-1") - .offset(OffsetSpecification::First) - .enable_single_active_consumer(true) - .client_provided_name("my super stream consumer for hello rust") - .consumer_update(move |active, message_context| async { - let name = message_context.name(); - let stream = message_context.stream(); - let client = message_context.client(); - - let mut stored_offset = 0; - println!("single active consumer: is active: {} on stream: {} with consumer_name: {}", active, stream, name); - let my_async = async { - println!("im hereXXXXXXXXXXX"); - stored_offset = client.query_offset(name, stream.as_str()).await.unwrap(); - println!("stored offset {}", stored_offset); - //OffsetSpecification::Offset(stored_offset) - - }; - println!("im hereXXXXXXXXXXX {}", stored_offset); - my_async.await; - OffsetSpecification::Offset(stored_offset) - - }) - .build(stream) - .await - .unwrap(); - - for i in 0..message_count { - let delivery = consumer.next().await.unwrap(); - { - let delivery = delivery.unwrap(); - /*println!( - "Got message: {:#?} from stream: {} with offset: {}", - delivery - .message() - .data() - .map(|data| String::from_utf8(data.to_vec()).unwrap()) - .unwrap(), - delivery.stream(), - delivery.offset() - );*/ - - //store an offset - if i == 10000 { - let _ = consumer - .store_offset(delivery.offset()) - .await - .unwrap_or_else(|e| println!("Err: {}", e)); - } - } - } - - println!("Stopping consumer..."); - let _ = consumer.handle().close().await; - println!("consumer stopped"); - Ok(()) -} \ No newline at end of file diff --git a/examples/single_active_consumer/single_active_consumer.rs b/examples/single_active_consumer/single_active_consumer.rs index 614afc31..24b47058 100644 --- a/examples/single_active_consumer/single_active_consumer.rs +++ b/examples/single_active_consumer/single_active_consumer.rs @@ -4,12 +4,13 @@ use rabbitmq_stream_client::types::{ ByteCapacity, OffsetSpecification, ResponseCode, }; + #[tokio::main] async fn main() -> Result<(), Box> { use rabbitmq_stream_client::Environment; let environment = Environment::builder().build().await?; let message_count = 1000000; - let stream = "hello-rust-stream"; + let stream = "hello-rust-super-stream-2"; let create_response = environment .stream_creator() @@ -40,15 +41,31 @@ async fn main() -> Result<(), Box> { .offset(OffsetSpecification::First) .enable_single_active_consumer(true) .client_provided_name("my super stream consumer for hello rust") - .consumer_update(move |active, message_context| { - println!("single active consumer: is active: {} on stream {}", active, message_context.get_stream()); - OffsetSpecification::First + .consumer_update(move |active, message_context| async move { + let name = message_context.name(); + let stream = message_context.stream(); + let client = message_context.client(); + + println!( + "single active consumer: is active: {} on stream: {} with consumer_name: {}", + active, stream, name + ); + let stored_offset = client.query_offset(name, stream.as_str()).await; + + if let Err(e) = stored_offset { + return OffsetSpecification::First; + } + + let stored_offset_u = stored_offset.unwrap(); + println!("restarting from stored_offset: {}", stored_offset_u); + OffsetSpecification::Offset(stored_offset_u) + }) .build(stream) .await .unwrap(); - for _ in 0..message_count { + for i in 0..message_count { let delivery = consumer.next().await.unwrap(); { let delivery = delivery.unwrap(); @@ -62,6 +79,14 @@ async fn main() -> Result<(), Box> { delivery.stream(), delivery.offset() ); + + //store an offset + if i == 10000 { + let _ = consumer + .store_offset(i) + .await + .unwrap_or_else(|e| println!("Err: {}", e)); + } } } diff --git a/src/consumer.rs b/src/consumer.rs index 2ec83b86..26674297 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -16,7 +16,8 @@ use rabbitmq_stream_protocol::{ }; use core::option::Option::None; - +use futures::FutureExt; +use std::future::Future; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tracing::trace; @@ -27,14 +28,14 @@ use crate::{ error::{ConsumerCloseError, ConsumerCreateError, ConsumerDeliveryError}, Client, ClientOptions, Environment, MetricsCollector, }; -use futures::{task::AtomicWaker, Stream}; +use futures::{future::BoxFuture, task::AtomicWaker, Stream}; use rand::rngs::StdRng; use rand::{seq::SliceRandom, SeedableRng}; type FilterPredicate = Option bool + Send + Sync>>; pub type ConsumerUpdateListener = - Arc OffsetSpecification + Send + Sync>; + Arc BoxFuture<'static, OffsetSpecification> + Send + Sync>; /// API for consuming RabbitMQ stream messages pub struct Consumer { @@ -277,14 +278,14 @@ impl ConsumerBuilder { self } - pub fn consumer_update( + pub fn consumer_update( mut self, - consumer_update_listener: impl Fn(u8, &MessageContext) -> OffsetSpecification - + Send - + Sync - + 'static, - ) -> Self { - let f = Arc::new(consumer_update_listener); + consumer_update_listener: impl Fn(u8, MessageContext) -> Fut + Send + Sync + 'static, + ) -> Self + where + Fut: Future + Send + Sync + 'static, + { + let f = Arc::new(move |a, b| consumer_update_listener(a, b).boxed()); self.consumer_update_listener = Some(f); self } @@ -464,7 +465,7 @@ impl MessageHandler for ConsumerMessageHandler { let consumer_update_listener_callback = self.0.consumer_update_listener.clone().unwrap(); let offset_specification = - consumer_update_listener_callback(is_active, &message_context); + consumer_update_listener_callback(is_active, message_context).await; let _ = self .0 .client diff --git a/src/superstream_consumer.rs b/src/superstream_consumer.rs index 54a5cf77..a73b33b2 100644 --- a/src/superstream_consumer.rs +++ b/src/superstream_consumer.rs @@ -6,9 +6,12 @@ use crate::{ error::ConsumerCreateError, ConsumerHandle, Environment, FilterConfiguration, MessageContext, }; use futures::task::AtomicWaker; -use futures::{Stream, StreamExt}; +use futures::FutureExt; +use futures::Stream; +use futures::StreamExt; use rabbitmq_stream_protocol::commands::subscribe::OffsetSpecification; use std::collections::HashMap; +use std::future::Future; use std::pin::Pin; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; @@ -130,15 +133,14 @@ impl SuperStreamConsumerBuilder { self.filter_configuration = filter_configuration; self } - pub fn consumer_update( mut self, - consumer_update_listener: impl Fn(u8, &MessageContext) -> OffsetSpecification - + Send - + Sync - + 'static, - ) -> Self { - let f = Arc::new(consumer_update_listener); + consumer_update_listener: impl Fn(u8, MessageContext) -> Fut + Send + Sync + 'static, + ) -> Self + where + Fut: Future + Send + Sync + 'static, + { + let f = Arc::new(move |a, b| consumer_update_listener(a, b).boxed()); self.consumer_update_listener = Some(f); self } diff --git a/tests/integration/consumer_test.rs b/tests/integration/consumer_test.rs index d388e787..9b8d7db1 100644 --- a/tests/integration/consumer_test.rs +++ b/tests/integration/consumer_test.rs @@ -829,10 +829,12 @@ async fn super_stream_single_active_consumer_test_with_callback() { .consumer_update(move |active, message_context| { let mut result_stream_name_int = result_stream_name_outer.clone(); let mut result_consumer_name_int = result_name_1_outer.clone(); - *result_stream_name_int.lock().unwrap() = message_context.get_stream().clone(); - *result_consumer_name_int.lock().unwrap() = message_context.get_name().clone().unwrap(); + async move { + *result_stream_name_int.lock().unwrap() = message_context.stream().clone(); + *result_consumer_name_int.lock().unwrap() = message_context.name().clone(); - OffsetSpecification::First + OffsetSpecification::First + } }) .build(&env.super_stream) .await @@ -847,9 +849,11 @@ async fn super_stream_single_active_consumer_test_with_callback() { .consumer_update(move |active, message_context| { let mut result_stream_name_int = result_stream_name_2_outer.clone(); let mut result_consumer_name_int = result_name_2_outer.clone(); - *result_stream_name_int.lock().unwrap() = message_context.get_stream().clone(); - *result_consumer_name_int.lock().unwrap() = message_context.get_name().clone().unwrap(); - OffsetSpecification::First + async move { + *result_stream_name_int.lock().unwrap() = message_context.stream().clone(); + *result_consumer_name_int.lock().unwrap() = message_context.name().clone(); + OffsetSpecification::First + } }) .build(&env.super_stream) .await @@ -864,9 +868,11 @@ async fn super_stream_single_active_consumer_test_with_callback() { .consumer_update(move |active, message_context| { let mut result_stream_name_int = result_stream_name_3_outer.clone(); let mut result_consumer_name_int = result_name_3_outer.clone(); - *result_stream_name_int.lock().unwrap() = message_context.get_stream().clone(); - *result_consumer_name_int.lock().unwrap() = message_context.get_name().clone().unwrap(); - OffsetSpecification::First + async move { + *result_stream_name_int.lock().unwrap() = message_context.stream().clone(); + *result_consumer_name_int.lock().unwrap() = message_context.name().clone(); + OffsetSpecification::First + } }) .build(&env.super_stream) .await From 4dc096a685abda2f535da80817858378b6b010f6 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Sun, 17 Nov 2024 17:37:58 +0100 Subject: [PATCH 14/14] making Delivery export client in order to use store_offset and review super_stream example --- .../single_active_consumer_super_stream.rs | 27 ++++++++++++++++--- src/consumer.rs | 6 +++++ src/superstream_consumer.rs | 4 +++ 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/examples/single_active_consumer/single_active_consumer_super_stream.rs b/examples/single_active_consumer/single_active_consumer_super_stream.rs index 51b74e44..c66b9aa3 100644 --- a/examples/single_active_consumer/single_active_consumer_super_stream.rs +++ b/examples/single_active_consumer/single_active_consumer_super_stream.rs @@ -41,9 +41,24 @@ async fn main() -> Result<(), Box> { .offset(OffsetSpecification::First) .enable_single_active_consumer(true) .client_provided_name("my super stream consumer for hello rust") - .consumer_update(move |active, message_context| { - println!("single active consumer: is active: {} on stream {}", active, message_context.get_stream()); - OffsetSpecification::First + .consumer_update(move |active, message_context| async move { + let name = message_context.name(); + let stream = message_context.stream(); + let client = message_context.client(); + + println!( + "single active consumer: is active: {} on stream: {} with consumer_name: {}", + active, stream, name + ); + let stored_offset = client.query_offset(name, stream.as_str()).await; + + if let Err(e) = stored_offset { + return OffsetSpecification::First; + } + let stored_offset_u = stored_offset.unwrap(); + println!("stored_offset_u {}", stored_offset_u.clone()); + OffsetSpecification::Offset(stored_offset_u) + }) .build(super_stream) .await @@ -63,6 +78,12 @@ async fn main() -> Result<(), Box> { delivery.stream(), delivery.offset() ); + + // Store an offset for every consumer + if delivery.consumer_name().is_some() && delivery.offset() == 1000 { + super_stream_consumer.client().store_offset(delivery.consumer_name().unwrap().as_str(), delivery.stream().as_str(), delivery.offset()).await; + } + } } diff --git a/src/consumer.rs b/src/consumer.rs index 26674297..8a41bb1b 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -427,6 +427,7 @@ impl MessageHandler for ConsumerMessageHandler { .0 .sender .send(Ok(Delivery { + name: self.0.name.clone(), stream: self.0.stream.clone(), subscription_id: self.0.subscription_id, message, @@ -493,6 +494,7 @@ impl MessageHandler for ConsumerMessageHandler { /// Envelope from incoming message #[derive(Debug)] pub struct Delivery { + name: Option, stream: String, subscription_id: u8, message: Message, @@ -519,4 +521,8 @@ impl Delivery { pub fn offset(&self) -> u64 { self.offset } + + pub fn consumer_name(&self) -> Option { + self.name.clone() + } } diff --git a/src/superstream_consumer.rs b/src/superstream_consumer.rs index a73b33b2..1ddce9c8 100644 --- a/src/superstream_consumer.rs +++ b/src/superstream_consumer.rs @@ -178,6 +178,10 @@ impl SuperStreamConsumer { pub fn handle(&self) -> SuperStreamConsumerHandle { SuperStreamConsumerHandle(self.internal.clone()) } + + pub fn client(&self) -> Client { + self.internal.client.clone() + } } impl SuperStreamConsumerInternal {