From 9a89e3e11fda2cd59b1ac34600468f3372248198 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Fri, 4 Oct 2024 14:01:10 +0200 Subject: [PATCH 1/4] WIP: implementing create and delete superstream commands --- protocol/src/commands/create_super_stream.rs | 100 +++++++++++++++++++ protocol/src/commands/delete_super_stream.rs | 74 ++++++++++++++ protocol/src/commands/mod.rs | 2 + protocol/src/protocol.rs | 2 + protocol/src/request/mod.rs | 43 +++++++- protocol/src/request/shims.rs | 15 +++ protocol/src/response/mod.rs | 2 + src/client/mod.rs | 32 ++++++ src/environment.rs | 15 +++ src/stream_creator.rs | 42 ++++++++ tests/integration/client_test.rs | 33 ++++++ tests/integration/environment_test.rs | 18 ++++ 12 files changed, 373 insertions(+), 5 deletions(-) create mode 100644 protocol/src/commands/create_super_stream.rs create mode 100644 protocol/src/commands/delete_super_stream.rs diff --git a/protocol/src/commands/create_super_stream.rs b/protocol/src/commands/create_super_stream.rs new file mode 100644 index 00000000..60fddc16 --- /dev/null +++ b/protocol/src/commands/create_super_stream.rs @@ -0,0 +1,100 @@ +use std::collections::HashMap; +use std::io::Write; + +#[cfg(test)] +use fake::Fake; + +use crate::{ + codec::{Decoder, Encoder}, + error::{DecodeError, EncodeError}, + protocol::commands::COMMAND_CREATE_SUPER_STREAM, +}; + +use super::Command; + +#[cfg_attr(test, derive(fake::Dummy))] +#[derive(PartialEq, Eq, Debug)] +pub struct CreateSuperStreamCommand { + correlation_id: u32, + super_stream_name: String, + partitions: Vec, + binding_keys: Vec, + args: HashMap, +} + +impl CreateSuperStreamCommand { + pub fn new( + correlation_id: u32, + super_stream_name: String, + partitions: Vec, + binding_keys: Vec, + args: HashMap, + ) -> Self { + Self { + correlation_id, + super_stream_name, + partitions, + binding_keys, + args, + } + } +} + +impl Encoder for CreateSuperStreamCommand { + fn encoded_size(&self) -> u32 { + self.correlation_id.encoded_size() + + self.super_stream_name.as_str().encoded_size() + + self.partitions.encoded_size() + + self.binding_keys.encoded_size() + + self.args.encoded_size() + } + + fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> { + self.correlation_id.encode(writer)?; + self.super_stream_name.as_str().encode(writer)?; + self.partitions.encode(writer)?; + self.binding_keys.encode(writer)?; + self.args.encode(writer)?; + Ok(()) + } +} + +impl Decoder for CreateSuperStreamCommand { + fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> { + let (input, correlation_id) = u32::decode(input)?; + let (input, super_stream_name) = Option::decode(input)?; + let (input, partitions) = >::decode(input)?; + let (input, binding_keys) = >::decode(input)?; + let (input, args) = HashMap::decode(input)?; + + Ok(( + input, + CreateSuperStreamCommand { + correlation_id, + super_stream_name: super_stream_name.unwrap(), + partitions, + binding_keys, + args, + }, + )) + } +} + +impl Command for CreateSuperStreamCommand { + fn key(&self) -> u16 { + COMMAND_CREATE_SUPER_STREAM + } +} + +#[cfg(test)] +mod tests { + use crate::commands::create_stream::CreateStreamCommand; + use crate::commands::tests::command_encode_decode_test; + + use super::CreateSuperStreamCommand; + + #[test] + fn create_super_stream_request_test() { + command_encode_decode_test::(); + } +} diff --git a/protocol/src/commands/delete_super_stream.rs b/protocol/src/commands/delete_super_stream.rs new file mode 100644 index 00000000..21a57994 --- /dev/null +++ b/protocol/src/commands/delete_super_stream.rs @@ -0,0 +1,74 @@ +use std::io::Write; + +#[cfg(test)] +use fake::Fake; + +use crate::{ + codec::{Decoder, Encoder}, + error::{DecodeError, EncodeError}, + protocol::commands::COMMAND_DELETE_SUPER_STREAM, +}; + +use super::Command; + +#[cfg_attr(test, derive(fake::Dummy))] +#[derive(PartialEq, Eq, Debug)] +pub struct DeleteSuperStreamCommand { + correlation_id: u32, + super_stream_name: String, +} + +impl DeleteSuperStreamCommand { + pub fn new(correlation_id: u32, super_stream_name: String) -> Self { + Self { + correlation_id, + super_stream_name, + } + } +} + +impl Encoder for DeleteSuperStreamCommand { + fn encoded_size(&self) -> u32 { + self.correlation_id.encoded_size() + self.super_stream_name.as_str().encoded_size() + } + + fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> { + self.correlation_id.encode(writer)?; + self.super_stream_name.as_str().encode(writer)?; + Ok(()) + } +} + +impl Decoder for DeleteSuperStreamCommand { + fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> { + let (input, correlation_id) = u32::decode(input)?; + let (input, super_stream_name) = Option::decode(input)?; + + Ok(( + input, + DeleteSuperStreamCommand { + correlation_id, + super_stream_name: super_stream_name.unwrap(), + }, + )) + } +} + +impl Command for DeleteSuperStreamCommand { + fn key(&self) -> u16 { + COMMAND_DELETE_SUPER_STREAM + } +} + +#[cfg(test)] +mod tests { + use crate::commands::create_super_stream::CreateSuperStreamCommand; + use crate::commands::tests::command_encode_decode_test; + + use super::DeleteSuperStreamCommand; + + #[test] + fn delete_super_stream_request_test() { + command_encode_decode_test::(); + } +} diff --git a/protocol/src/commands/mod.rs b/protocol/src/commands/mod.rs index 0a4ae942..5392b07a 100644 --- a/protocol/src/commands/mod.rs +++ b/protocol/src/commands/mod.rs @@ -2,10 +2,12 @@ use crate::protocol::version::PROTOCOL_VERSION; pub mod close; pub mod create_stream; +pub mod create_super_stream; pub mod credit; pub mod declare_publisher; pub mod delete; pub mod delete_publisher; +pub mod delete_super_stream; pub mod deliver; pub mod exchange_command_versions; pub mod generic; diff --git a/protocol/src/protocol.rs b/protocol/src/protocol.rs index d87be920..5dcb0d3c 100644 --- a/protocol/src/protocol.rs +++ b/protocol/src/protocol.rs @@ -30,6 +30,8 @@ pub mod commands { pub const COMMAND_CONSUMER_UPDATE: u16 = 26; pub const COMMAND_EXCHANGE_COMMAND_VERSIONS: u16 = 27; pub const COMMAND_STREAMS_STATS: u16 = 28; + pub const COMMAND_CREATE_SUPER_STREAM: u16 = 29; + pub const COMMAND_DELETE_SUPER_STREAM: u16 = 30; } // server responses diff --git a/protocol/src/request/mod.rs b/protocol/src/request/mod.rs index fc616436..380450a8 100644 --- a/protocol/src/request/mod.rs +++ b/protocol/src/request/mod.rs @@ -3,9 +3,10 @@ use std::io::Write; use crate::{ codec::{decoder::read_u32, Decoder, Encoder}, commands::{ - close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand, + close::CloseRequest, create_stream::CreateStreamCommand, + create_super_stream::CreateSuperStreamCommand, credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete, - delete_publisher::DeletePublisherCommand, + delete_publisher::DeletePublisherCommand, delete_super_stream::DeleteSuperStreamCommand, exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand, metadata::MetadataCommand, open::OpenCommand, peer_properties::PeerPropertiesCommand, publish::PublishCommand, query_offset::QueryOffsetRequest, @@ -20,6 +21,7 @@ use crate::{ }; use byteorder::{BigEndian, WriteBytesExt}; + mod shims; #[derive(Debug, PartialEq, Eq)] pub struct Request { @@ -60,6 +62,8 @@ pub enum RequestKind { StoreOffset(StoreOffset), Unsubscribe(UnSubscribeCommand), ExchangeCommandVersions(ExchangeCommandVersionsRequest), + CreateSuperStream(CreateSuperStreamCommand), + DeleteSuperStream(DeleteSuperStreamCommand), } impl Encoder for RequestKind { @@ -87,6 +91,12 @@ impl Encoder for RequestKind { RequestKind::ExchangeCommandVersions(exchange_command_versions) => { exchange_command_versions.encoded_size() } + RequestKind::CreateSuperStream(create_super_stream) => { + create_super_stream.encoded_size() + } + RequestKind::DeleteSuperStream(delete_super_stream) => { + delete_super_stream.encoded_size() + } } } @@ -114,6 +124,12 @@ impl Encoder for RequestKind { RequestKind::ExchangeCommandVersions(exchange_command_versions) => { exchange_command_versions.encode(writer) } + RequestKind::CreateSuperStream(create_super_stream) => { + create_super_stream.encode(writer) + } + RequestKind::DeleteSuperStream(delete_super_stream) => { + delete_super_stream.encode(writer) + } } } } @@ -182,6 +198,12 @@ impl Decoder for Request { COMMAND_EXCHANGE_COMMAND_VERSIONS => { ExchangeCommandVersionsRequest::decode(input).map(|(i, kind)| (i, kind.into()))? } + COMMAND_CREATE_SUPER_STREAM => { + CreateSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))? + } + COMMAND_DELETE_SUPER_STREAM => { + DeleteSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))? + } n => return Err(DecodeError::UnsupportedResponseType(n)), }; Ok((input, Request { header, kind: cmd })) @@ -194,9 +216,11 @@ mod tests { use crate::{ codec::{Decoder, Encoder}, commands::{ - close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand, + close::CloseRequest, create_stream::CreateStreamCommand, + create_super_stream::CreateSuperStreamCommand, credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete, delete_publisher::DeletePublisherCommand, + delete_super_stream::DeleteSuperStreamCommand, exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand, metadata::MetadataCommand, open::OpenCommand, peer_properties::PeerPropertiesCommand, publish::PublishCommand, @@ -209,9 +233,8 @@ mod tests { use std::fmt::Debug; - use fake::{Dummy, Fake, Faker}; - use super::Request; + use fake::{Dummy, Fake, Faker}; #[test] fn request_open_test() { @@ -324,4 +347,14 @@ mod tests { fn request_exchange_command_versions_test() { request_encode_decode_test::() } + + #[test] + fn request_create_super_stream_test() { + request_encode_decode_test::() + } + + #[test] + fn request_delete_super_stream_test() { + request_encode_decode_test::() + } } diff --git a/protocol/src/request/shims.rs b/protocol/src/request/shims.rs index 323d4d51..0f7964a2 100644 --- a/protocol/src/request/shims.rs +++ b/protocol/src/request/shims.rs @@ -1,3 +1,5 @@ +use crate::commands::create_super_stream::CreateSuperStreamCommand; +use crate::commands::delete_super_stream::DeleteSuperStreamCommand; use crate::{ commands::{ close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand, @@ -14,6 +16,7 @@ use crate::{ types::Header, Request, RequestKind, }; + impl From for Request where T: Into + Command, @@ -135,3 +138,15 @@ impl From for RequestKind { RequestKind::ExchangeCommandVersions(cmd) } } + +impl From for RequestKind { + fn from(cmd: CreateSuperStreamCommand) -> Self { + RequestKind::CreateSuperStream(cmd) + } +} + +impl From for RequestKind { + fn from(cmd: DeleteSuperStreamCommand) -> Self { + RequestKind::DeleteSuperStream(cmd) + } +} diff --git a/protocol/src/response/mod.rs b/protocol/src/response/mod.rs index 905a0954..89bc9243 100644 --- a/protocol/src/response/mod.rs +++ b/protocol/src/response/mod.rs @@ -139,6 +139,8 @@ impl Decoder for Response { | COMMAND_SUBSCRIBE | COMMAND_UNSUBSCRIBE | COMMAND_CREATE_STREAM + | COMMAND_CREATE_SUPER_STREAM + | COMMAND_DELETE_SUPER_STREAM | COMMAND_DELETE_STREAM => { GenericResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Generic(kind)))? } diff --git a/src/client/mod.rs b/src/client/mod.rs index 83e2a096..69549e8b 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -42,10 +42,12 @@ use rabbitmq_stream_protocol::{ commands::{ close::{CloseRequest, CloseResponse}, create_stream::CreateStreamCommand, + create_super_stream::CreateSuperStreamCommand, credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete, delete_publisher::DeletePublisherCommand, + delete_super_stream::DeleteSuperStreamCommand, generic::GenericResponse, heart_beat::HeartBeatCommand, metadata::MetadataCommand, @@ -306,11 +308,40 @@ impl Client { .await } + pub async fn create_super_stream( + &self, + super_stream: &str, + partitions: Vec, + binding_keys: Vec, + options: HashMap, + ) -> RabbitMQStreamResult { + self.send_and_receive(|correlation_id| { + CreateSuperStreamCommand::new( + correlation_id, + super_stream.to_owned(), + partitions, + binding_keys, + options, + ) + }) + .await + } + pub async fn delete_stream(&self, stream: &str) -> RabbitMQStreamResult { self.send_and_receive(|correlation_id| Delete::new(correlation_id, stream.to_owned())) .await } + pub async fn delete_super_stream( + &self, + super_stream: &str, + ) -> RabbitMQStreamResult { + self.send_and_receive(|correlation_id| { + DeleteSuperStreamCommand::new(correlation_id, super_stream.to_owned()) + }) + .await + } + pub async fn credit(&self, subscription_id: u8, credit: u16) -> RabbitMQStreamResult<()> { self.send(CreditCommand::new(subscription_id, credit)).await } @@ -574,6 +605,7 @@ impl Client { M: FnOnce(u32) -> R, { let Some((correlation_id, mut receiver)) = self.dispatcher.response_channel() else { + trace!("Connection si closed here"); return Err(ClientError::ConnectionClosed); }; diff --git a/src/environment.rs b/src/environment.rs index 8b1a590d..84817a6b 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -77,6 +77,21 @@ impl Environment { }) } } + + pub async fn delete_super_stream(&self, super_stream: &str) -> Result<(), StreamDeleteError> { + let client = self.create_client().await?; + let response = client.delete_super_stream(super_stream).await?; + client.close().await?; + + if response.is_ok() { + Ok(()) + } else { + Err(StreamDeleteError::Delete { + stream: super_stream.to_owned(), + status: response.code().clone(), + }) + } + } } /// Builder for [`Environment`] diff --git a/src/stream_creator.rs b/src/stream_creator.rs index 085eb164..5625aee4 100644 --- a/src/stream_creator.rs +++ b/src/stream_creator.rs @@ -34,6 +34,48 @@ impl StreamCreator { } } + pub async fn create_super_stream( + self, + super_stream: &str, + number_of_partitions: usize, + binding_keys: Option>, + ) -> Result<(), StreamCreateError> { + let mut partitions_names = Vec::with_capacity(number_of_partitions); + let mut new_binding_keys: Vec = Vec::with_capacity(number_of_partitions); + + if binding_keys.is_none() { + for i in 0..number_of_partitions { + new_binding_keys.push(i.to_string()); + partitions_names.push(super_stream.to_owned() + "-" + i.to_string().as_str()) + } + } else { + new_binding_keys = binding_keys.unwrap(); + for binding_key in new_binding_keys.iter() { + partitions_names.push(super_stream.to_owned() + "-" + binding_key) + } + } + + let client = self.env.create_client().await?; + let response = client + .create_super_stream( + super_stream, + partitions_names, + new_binding_keys, + self.options, + ) + .await?; + client.close().await?; + + if response.is_ok() { + Ok(()) + } else { + Err(StreamCreateError::Create { + stream: super_stream.to_owned(), + status: response.code().clone(), + }) + } + } + pub fn max_age(mut self, max_age: Duration) -> Self { self.options .insert("max-age".to_owned(), format!("{}s", max_age.as_secs())); diff --git a/tests/integration/client_test.rs b/tests/integration/client_test.rs index c9dae67f..fab736fb 100644 --- a/tests/integration/client_test.rs +++ b/tests/integration/client_test.rs @@ -41,6 +41,39 @@ async fn client_create_stream_error_test() { } #[tokio::test(flavor = "multi_thread")] +async fn client_create_and_delete_super_stream_test() { + let super_stream_name = "test-super-stream"; + + let client = Client::connect(ClientOptions::default()).await.unwrap(); + + let partitions: Vec = [ + "test-super-stream-0", + "test-super-stream-1", + "test-super-stream-2", + ] + .iter() + .map(|&x| x.into()) + .collect(); + + let binding_keys: Vec = ["0", "1", "2"].iter().map(|&x| x.into()).collect(); + + let response = client + .create_super_stream(&super_stream_name, partitions, binding_keys, HashMap::new()) + .await + .unwrap(); + + assert_eq!(&ResponseCode::Ok, response.code()); + + let response = client + .delete_super_stream(&super_stream_name) + .await + .unwrap(); + + assert_eq!(&ResponseCode::Ok, response.code()); + + let _ = client.close().await; +} + async fn client_delete_stream_test() { let test = TestClient::create().await; diff --git a/tests/integration/environment_test.rs b/tests/integration/environment_test.rs index abf900a3..d8cc87be 100644 --- a/tests/integration/environment_test.rs +++ b/tests/integration/environment_test.rs @@ -13,6 +13,24 @@ async fn environment_create_test() { let _ = TestEnvironment::create().await; } +#[tokio::test(flavor = "multi_thread")] +async fn environment_create_and_delete_super_stream_test() { + let super_stream = "super_stream_test"; + let env = Environment::builder().build().await.unwrap(); + + let response = env + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create_super_stream(super_stream, 3, None) + .await; + + assert_eq!(response.is_ok(), true); + + let response = env.delete_super_stream(super_stream).await; + + assert_eq!(response.is_ok(), true); +} + #[tokio::test(flavor = "multi_thread")] async fn environment_fail_to_connect_wrong_config() { // test the wrong config From 551df267bf729c8645c4d89f6d821697ef2e049d Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Tue, 8 Oct 2024 09:17:40 +0200 Subject: [PATCH 2/4] adding superstream error-case test --- tests/integration/client_test.rs | 42 ++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/integration/client_test.rs b/tests/integration/client_test.rs index fab736fb..c4f2650b 100644 --- a/tests/integration/client_test.rs +++ b/tests/integration/client_test.rs @@ -73,7 +73,49 @@ async fn client_create_and_delete_super_stream_test() { let _ = client.close().await; } +#[tokio::test(flavor = "multi_thread")] +async fn client_create_super_stream_error_test() { + let super_stream_name = "test-super-stream-error"; + + let client = Client::connect(ClientOptions::default()).await.unwrap(); + + let partitions: Vec = [ + "test-super-stream-error-0", + "test-super-stream-error-1", + "test-super-stream-error-2", + ] + .iter() + .map(|&x| x.into()) + .collect(); + + let binding_keys: Vec = ["0", "1", "2"].iter().map(|&x| x.into()).collect(); + + let response = client + .create_super_stream( + &super_stream_name, + partitions.clone(), + binding_keys.clone(), + HashMap::new(), + ) + .await + .unwrap(); + assert_eq!(&ResponseCode::Ok, response.code()); + + let response = client + .create_super_stream(&super_stream_name, partitions, binding_keys, HashMap::new()) + .await + .unwrap(); + + assert_eq!(&ResponseCode::StreamAlreadyExists, response.code()); + + let response = client + .delete_super_stream(&super_stream_name) + .await + .unwrap(); + + assert_eq!(&ResponseCode::Ok, response.code()); +} async fn client_delete_stream_test() { let test = TestClient::create().await; From 51590eb8a445be1dda3b6205039d0ab61f0d600c Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Tue, 8 Oct 2024 12:11:31 +0200 Subject: [PATCH 3/4] implementing route and partition commands --- Dockerfile | 4 +- protocol/src/commands/create_super_stream.rs | 2 +- protocol/src/commands/mod.rs | 2 + .../src/commands/superstream_partitions.rs | 145 +++++++++++++++++ protocol/src/commands/superstream_route.rs | 150 ++++++++++++++++++ protocol/src/request/mod.rs | 34 +++- protocol/src/request/shims.rs | 16 +- protocol/src/response/mod.rs | 57 ++++++- src/client/mod.rs | 18 ++- tests/integration/client_test.rs | 86 ++++++++++ 10 files changed, 501 insertions(+), 13 deletions(-) create mode 100644 protocol/src/commands/superstream_partitions.rs create mode 100644 protocol/src/commands/superstream_route.rs diff --git a/Dockerfile b/Dockerfile index d832fba8..e113708f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ -FROM rabbitmq:3.13-rc-management +FROM rabbitmq:4.0.1-management COPY .ci/conf/rabbitmq.conf /etc/rabbitmq/rabbitmq.conf COPY .ci/conf/enabled_plugins /etc/rabbitmq/enabled_plugins -COPY .ci/certs /etc/rabbitmq/certs \ No newline at end of file +COPY .ci/certs /etc/rabbitmq/certs diff --git a/protocol/src/commands/create_super_stream.rs b/protocol/src/commands/create_super_stream.rs index 60fddc16..f6b1e532 100644 --- a/protocol/src/commands/create_super_stream.rs +++ b/protocol/src/commands/create_super_stream.rs @@ -88,7 +88,7 @@ impl Command for CreateSuperStreamCommand { #[cfg(test)] mod tests { - use crate::commands::create_stream::CreateStreamCommand; + use crate::commands::tests::command_encode_decode_test; use super::CreateSuperStreamCommand; diff --git a/protocol/src/commands/mod.rs b/protocol/src/commands/mod.rs index 5392b07a..f6ecf1f8 100644 --- a/protocol/src/commands/mod.rs +++ b/protocol/src/commands/mod.rs @@ -25,6 +25,8 @@ pub mod sasl_authenticate; pub mod sasl_handshake; pub mod store_offset; pub mod subscribe; +pub mod superstream_partitions; +pub mod superstream_route; pub mod tune; pub mod unsubscribe; diff --git a/protocol/src/commands/superstream_partitions.rs b/protocol/src/commands/superstream_partitions.rs new file mode 100644 index 00000000..8703ae93 --- /dev/null +++ b/protocol/src/commands/superstream_partitions.rs @@ -0,0 +1,145 @@ +use std::io::Write; + +#[cfg(test)] +use fake::Fake; + +use crate::{ + codec::{Decoder, Encoder}, + error::{DecodeError, EncodeError}, + protocol::commands::COMMAND_PARTITIONS, + FromResponse, ResponseCode, +}; +use crate::commands::exchange_command_versions::ExchangeCommandVersion; +use super::Command; + +#[cfg_attr(test, derive(fake::Dummy))] +#[derive(PartialEq, Eq, Debug)] +pub struct SuperStreamPartitionsRequest { + correlation_id: u32, + super_stream: String, +} + +impl SuperStreamPartitionsRequest { + pub fn new(correlation_id: u32, super_stream: String) -> Self { + Self { + correlation_id, + super_stream, + } + } +} + +impl Encoder for SuperStreamPartitionsRequest { + fn encoded_size(&self) -> u32 { + self.correlation_id.encoded_size() + self.super_stream.as_str().encoded_size() + } + + fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> { + self.correlation_id.encode(writer)?; + self.super_stream.as_str().encode(writer)?; + Ok(()) + } +} + +impl Decoder for SuperStreamPartitionsRequest { + fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> { + let (input, correlation_id) = u32::decode(input)?; + let (input, super_stream) = Option::decode(input)?; + + Ok(( + input, + SuperStreamPartitionsRequest { + correlation_id, + super_stream: super_stream.unwrap(), + }, + )) + } +} + +impl Command for SuperStreamPartitionsRequest { + fn key(&self) -> u16 { + COMMAND_PARTITIONS + } +} + +#[cfg_attr(test, derive(fake::Dummy))] +#[derive(PartialEq, Eq, Debug)] +pub struct SuperStreamPartitionsResponse { + pub(crate) correlation_id: u32, + response_code: ResponseCode, + pub streams: Vec, +} + +impl SuperStreamPartitionsResponse { + pub fn new(correlation_id: u32, streams: Vec, response_code: ResponseCode) -> Self { + Self { + correlation_id, + response_code, + streams, + } + } + pub fn is_ok(&self) -> bool { + self.response_code == ResponseCode::Ok + } +} + +impl Encoder for SuperStreamPartitionsResponse { + fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> { + self.correlation_id.encode(writer)?; + self.response_code.encode(writer)?; + self.streams.encode(writer)?; + Ok(()) + } + + fn encoded_size(&self) -> u32 { + self.correlation_id.encoded_size() + + self.response_code.encoded_size() + + self.streams.encoded_size() + } +} + +impl Decoder for SuperStreamPartitionsResponse { + fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> { + let (input, correlation_id) = u32::decode(input)?; + let (input, response_code) = ResponseCode::decode(input)?; + let (input, streams) = >::decode(input)?; + + Ok(( + input, + SuperStreamPartitionsResponse { + correlation_id, + response_code, + streams, + }, + )) + } +} + +impl FromResponse for SuperStreamPartitionsResponse { + fn from_response(response: crate::Response) -> Option { + match response.kind { + crate::ResponseKind::SuperStreamPartitions(partitions_response) => { + Some(partitions_response) + } + _ => None, + } + } +} + +#[cfg(test)] +mod tests { + + use crate::commands::tests::command_encode_decode_test; + + use super::SuperStreamPartitionsRequest; + use super::SuperStreamPartitionsResponse; + + #[test] + fn super_stream_partition_request_test() { + command_encode_decode_test::(); + } + + #[test] + fn super_stream_partition_response_test() { + command_encode_decode_test::(); + } +} diff --git a/protocol/src/commands/superstream_route.rs b/protocol/src/commands/superstream_route.rs new file mode 100644 index 00000000..6fa280d4 --- /dev/null +++ b/protocol/src/commands/superstream_route.rs @@ -0,0 +1,150 @@ +use std::io::Write; + +#[cfg(test)] +use fake::Fake; + +use crate::{ + codec::{Decoder, Encoder}, + error::{DecodeError, EncodeError}, + protocol::commands::COMMAND_ROUTE, + FromResponse, ResponseCode, +}; + +use super::Command; + +#[cfg_attr(test, derive(fake::Dummy))] +#[derive(PartialEq, Eq, Debug)] +pub struct SuperStreamRouteRequest { + correlation_id: u32, + routing_key: String, + super_stream: String, +} + +impl SuperStreamRouteRequest { + pub fn new(correlation_id: u32, routing_key: String, super_stream: String) -> Self { + Self { + correlation_id, + routing_key, + super_stream, + } + } +} + +impl Encoder for SuperStreamRouteRequest { + fn encoded_size(&self) -> u32 { + self.correlation_id.encoded_size() + + self.routing_key.as_str().encoded_size() + + self.super_stream.as_str().encoded_size() + } + + fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> { + self.correlation_id.encode(writer)?; + self.routing_key.as_str().encode(writer)?; + self.super_stream.as_str().encode(writer)?; + Ok(()) + } +} + +impl Decoder for SuperStreamRouteRequest { + fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> { + let (input, correlation_id) = u32::decode(input)?; + let (input, routing_key) = Option::decode(input)?; + let (input, super_stream) = Option::decode(input)?; + + Ok(( + input, + SuperStreamRouteRequest { + correlation_id, + routing_key: routing_key.unwrap(), + super_stream: super_stream.unwrap(), + }, + )) + } +} + +impl Command for SuperStreamRouteRequest { + fn key(&self) -> u16 { + COMMAND_ROUTE + } +} + +#[cfg_attr(test, derive(fake::Dummy))] +#[derive(PartialEq, Eq, Debug)] +pub struct SuperStreamRouteResponse { + pub(crate) correlation_id: u32, + response_code: ResponseCode, + pub streams: Vec, +} + +impl SuperStreamRouteResponse { + pub fn new(correlation_id: u32, streams: Vec, response_code: ResponseCode) -> Self { + Self { + correlation_id, + response_code, + streams, + } + } + pub fn is_ok(&self) -> bool { + self.response_code == ResponseCode::Ok + } +} + +impl Encoder for SuperStreamRouteResponse { + fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> { + self.correlation_id.encode(writer)?; + self.response_code.encode(writer)?; + self.streams.encode(writer)?; + Ok(()) + } + + fn encoded_size(&self) -> u32 { + self.correlation_id.encoded_size() + + self.streams.encoded_size() + + self.response_code.encoded_size() + } +} + +impl Decoder for SuperStreamRouteResponse { + fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> { + let (input, correlation_id) = u32::decode(input)?; + let (input, response_code) = ResponseCode::decode(input)?; + let (input, streams) = Vec::decode(input)?; + + Ok(( + input, + SuperStreamRouteResponse { + correlation_id, + response_code, + streams, + }, + )) + } +} + +impl FromResponse for SuperStreamRouteResponse { + fn from_response(response: crate::Response) -> Option { + match response.kind { + crate::ResponseKind::SuperStreamRoute(route) => Some(route), + _ => None, + } + } +} + +#[cfg(test)] +mod tests { + + use crate::commands::tests::command_encode_decode_test; + + use super::SuperStreamRouteRequest; + use super::SuperStreamRouteResponse; + + #[test] + fn super_stream_route_request_test() { + command_encode_decode_test::(); + } + + #[test] + fn super_stream_route_response_test() { + command_encode_decode_test::(); + } +} diff --git a/protocol/src/request/mod.rs b/protocol/src/request/mod.rs index 380450a8..1eea47bd 100644 --- a/protocol/src/request/mod.rs +++ b/protocol/src/request/mod.rs @@ -12,7 +12,9 @@ use crate::{ publish::PublishCommand, query_offset::QueryOffsetRequest, query_publisher_sequence::QueryPublisherRequest, sasl_authenticate::SaslAuthenticateCommand, sasl_handshake::SaslHandshakeCommand, - store_offset::StoreOffset, subscribe::SubscribeCommand, tune::TunesCommand, + store_offset::StoreOffset, subscribe::SubscribeCommand, + superstream_partitions::SuperStreamPartitionsRequest, + superstream_route::SuperStreamRouteRequest, tune::TunesCommand, unsubscribe::UnSubscribeCommand, }, error::{DecodeError, EncodeError}, @@ -64,6 +66,8 @@ pub enum RequestKind { ExchangeCommandVersions(ExchangeCommandVersionsRequest), CreateSuperStream(CreateSuperStreamCommand), DeleteSuperStream(DeleteSuperStreamCommand), + SuperStreamPartitions(SuperStreamPartitionsRequest), + SuperStreamRoute(SuperStreamRouteRequest), } impl Encoder for RequestKind { @@ -97,6 +101,10 @@ impl Encoder for RequestKind { RequestKind::DeleteSuperStream(delete_super_stream) => { delete_super_stream.encoded_size() } + RequestKind::SuperStreamPartitions(super_stream_partitions) => { + super_stream_partitions.encoded_size() + } + RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encoded_size(), } } @@ -130,6 +138,10 @@ impl Encoder for RequestKind { RequestKind::DeleteSuperStream(delete_super_stream) => { delete_super_stream.encode(writer) } + RequestKind::SuperStreamPartitions(super_stream_partition) => { + super_stream_partition.encode(writer) + } + RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encode(writer), } } } @@ -204,6 +216,12 @@ impl Decoder for Request { COMMAND_DELETE_SUPER_STREAM => { DeleteSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))? } + COMMAND_PARTITIONS => { + SuperStreamPartitionsRequest::decode(input).map(|(i, kind)| (i, kind.into()))? + } + COMMAND_ROUTE => { + SuperStreamRouteRequest::decode(input).map(|(i, kind)| (i, kind.into()))? + } n => return Err(DecodeError::UnsupportedResponseType(n)), }; Ok((input, Request { header, kind: cmd })) @@ -226,7 +244,9 @@ mod tests { peer_properties::PeerPropertiesCommand, publish::PublishCommand, query_offset::QueryOffsetRequest, query_publisher_sequence::QueryPublisherRequest, sasl_authenticate::SaslAuthenticateCommand, sasl_handshake::SaslHandshakeCommand, - store_offset::StoreOffset, subscribe::SubscribeCommand, tune::TunesCommand, + store_offset::StoreOffset, subscribe::SubscribeCommand, + superstream_partitions::SuperStreamPartitionsRequest, + superstream_route::SuperStreamRouteRequest, tune::TunesCommand, unsubscribe::UnSubscribeCommand, Command, }, }; @@ -357,4 +377,14 @@ mod tests { fn request_delete_super_stream_test() { request_encode_decode_test::() } + + #[test] + fn request_partitions_command() { + request_encode_decode_test::() + } + + #[test] + fn request_route_command() { + request_encode_decode_test::() + } } diff --git a/protocol/src/request/shims.rs b/protocol/src/request/shims.rs index 0f7964a2..102efe6f 100644 --- a/protocol/src/request/shims.rs +++ b/protocol/src/request/shims.rs @@ -10,7 +10,9 @@ use crate::{ publish::PublishCommand, query_offset::QueryOffsetRequest, query_publisher_sequence::QueryPublisherRequest, sasl_authenticate::SaslAuthenticateCommand, sasl_handshake::SaslHandshakeCommand, - store_offset::StoreOffset, subscribe::SubscribeCommand, tune::TunesCommand, + store_offset::StoreOffset, subscribe::SubscribeCommand, + superstream_partitions::SuperStreamPartitionsRequest, + superstream_route::SuperStreamRouteRequest, tune::TunesCommand, unsubscribe::UnSubscribeCommand, Command, }, types::Header, @@ -150,3 +152,15 @@ impl From for RequestKind { RequestKind::DeleteSuperStream(cmd) } } + +impl From for RequestKind { + fn from(cmd: SuperStreamPartitionsRequest) -> Self { + RequestKind::SuperStreamPartitions(cmd) + } +} + +impl From for RequestKind { + fn from(cmd: SuperStreamRouteRequest) -> Self { + RequestKind::SuperStreamRoute(cmd) + } +} diff --git a/protocol/src/response/mod.rs b/protocol/src/response/mod.rs index 89bc9243..0f394c14 100644 --- a/protocol/src/response/mod.rs +++ b/protocol/src/response/mod.rs @@ -13,12 +13,14 @@ use crate::{ peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm, publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse, query_publisher_sequence::QueryPublisherResponse, sasl_handshake::SaslHandshakeResponse, - tune::TunesCommand, + superstream_partitions::SuperStreamPartitionsResponse, + superstream_route::SuperStreamRouteResponse, tune::TunesCommand, }, error::DecodeError, protocol::commands::*, types::Header, }; + mod shims; #[cfg_attr(test, derive(fake::Dummy))] @@ -68,6 +70,8 @@ pub enum ResponseKind { QueryPublisherSequence(QueryPublisherResponse), Credit(CreditResponse), ExchangeCommandVersions(ExchangeCommandVersionsResponse), + SuperStreamPartitions(SuperStreamPartitionsResponse), + SuperStreamRoute(SuperStreamRouteResponse), } impl Response { @@ -97,6 +101,12 @@ impl Response { ResponseKind::ExchangeCommandVersions(exchange_command_versions) => { Some(exchange_command_versions.correlation_id) } + ResponseKind::SuperStreamPartitions(super_stream_partitions_command) => { + Some(super_stream_partitions_command.correlation_id) + } + ResponseKind::SuperStreamRoute(super_stream_route_command) => { + Some(super_stream_route_command.correlation_id) + } } } @@ -174,6 +184,10 @@ impl Decoder for Response { .map(|(remaining, kind)| { (remaining, ResponseKind::ExchangeCommandVersions(kind)) })?, + COMMAND_PARTITIONS => SuperStreamPartitionsResponse::decode(input) + .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamPartitions(kind)))?, + COMMAND_ROUTE => SuperStreamRouteResponse::decode(input) + .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamRoute(kind)))?, n => return Err(DecodeError::UnsupportedResponseType(n)), }; Ok((input, Response { header, kind })) @@ -201,6 +215,7 @@ mod tests { use byteorder::{BigEndian, WriteBytesExt}; + use super::{Response, ResponseKind}; use crate::{ codec::{Decoder, Encoder}, commands::{ @@ -211,14 +226,16 @@ mod tests { peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm, publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse, query_publisher_sequence::QueryPublisherResponse, - sasl_handshake::SaslHandshakeResponse, tune::TunesCommand, + sasl_handshake::SaslHandshakeResponse, + superstream_partitions::SuperStreamPartitionsResponse, + superstream_route::SuperStreamRouteResponse, tune::TunesCommand, }, protocol::{ commands::{ COMMAND_CLOSE, COMMAND_DELIVER, COMMAND_HEARTBEAT, COMMAND_METADATA, - COMMAND_METADATA_UPDATE, COMMAND_OPEN, COMMAND_PEER_PROPERTIES, + COMMAND_METADATA_UPDATE, COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES, COMMAND_PUBLISH_CONFIRM, COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET, - COMMAND_QUERY_PUBLISHER_SEQUENCE, COMMAND_SASL_AUTHENTICATE, + COMMAND_QUERY_PUBLISHER_SEQUENCE, COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE, COMMAND_SASL_HANDSHAKE, COMMAND_TUNE, }, version::PROTOCOL_VERSION, @@ -227,8 +244,6 @@ mod tests { types::Header, ResponseCode, }; - - use super::{Response, ResponseKind}; impl Encoder for ResponseKind { fn encoded_size(&self) -> u32 { match self { @@ -252,6 +267,12 @@ mod tests { ResponseKind::ExchangeCommandVersions(exchange_command_versions) => { exchange_command_versions.encoded_size() } + ResponseKind::SuperStreamPartitions(super_stream_response) => { + super_stream_response.encoded_size() + } + ResponseKind::SuperStreamRoute(super_stream_response) => { + super_stream_response.encoded_size() + } } } @@ -280,6 +301,12 @@ mod tests { ResponseKind::ExchangeCommandVersions(exchange_command_versions) => { exchange_command_versions.encode(writer) } + ResponseKind::SuperStreamPartitions(super_stream_command_versions) => { + super_stream_command_versions.encode(writer) + } + ResponseKind::SuperStreamRoute(super_stream_command_versions) => { + super_stream_command_versions.encode(writer) + } } } } @@ -449,4 +476,22 @@ mod tests { COMMAND_EXCHANGE_COMMAND_VERSIONS ); } + + #[test] + fn super_stream_partitions_response_test() { + response_test!( + SuperStreamPartitionsResponse, + ResponseKind::SuperStreamPartitions, + COMMAND_PARTITIONS + ); + } + + #[test] + fn super_stream_route_response_test() { + response_test!( + SuperStreamRouteResponse, + ResponseKind::SuperStreamRoute, + COMMAND_ROUTE + ); + } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 69549e8b..bc7e05cb 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -61,7 +61,9 @@ use rabbitmq_stream_protocol::{ store_offset::StoreOffset, subscribe::{OffsetSpecification, SubscribeCommand}, tune::TunesCommand, - unsubscribe::UnSubscribeCommand, + unsubscribe::UnSubscribeCommand, superstream_partitions::SuperStreamPartitionsResponse, + superstream_partitions::SuperStreamPartitionsRequest, superstream_route::SuperStreamRouteRequest, + superstream_route::SuperStreamRouteResponse, }, types::PublishedMessage, FromResponse, Request, Response, ResponseCode, ResponseKind, @@ -297,6 +299,20 @@ impl Client { .await } + pub async fn partitions(&self, super_stream: String) -> RabbitMQStreamResult { + self.send_and_receive(|correlation_id| { + SuperStreamPartitionsRequest::new(correlation_id, super_stream) + }) + .await + } + + pub async fn route(&self, routing_key: String, super_stream: String) -> RabbitMQStreamResult { + self.send_and_receive(|correlation_id| { + SuperStreamRouteRequest::new(correlation_id, routing_key, super_stream) + }) + .await + } + pub async fn create_stream( &self, stream: &str, diff --git a/tests/integration/client_test.rs b/tests/integration/client_test.rs index c4f2650b..38de6357 100644 --- a/tests/integration/client_test.rs +++ b/tests/integration/client_test.rs @@ -460,3 +460,89 @@ async fn client_exchange_command_versions() { let response = test.client.exchange_command_versions().await.unwrap(); assert_eq!(&ResponseCode::Ok, response.code()); } + +#[tokio::test(flavor = "multi_thread")] +async fn client_test_partitions_test() { + let super_stream_name = "test-super-stream-partitions"; + + let client = Client::connect(ClientOptions::default()).await.unwrap(); + + let partitions: Vec = [ + "test-super-stream-partitions-0", + "test-super-stream-partitions-1", + "test-super-stream-partitions-2", + ] + .iter() + .map(|&x| x.into()) + .collect(); + + let binding_keys: Vec = ["0", "1", "2"].iter().map(|&x| x.into()).collect(); + + let response = client + .create_super_stream(&super_stream_name, partitions, binding_keys, HashMap::new()) + .await + .unwrap(); + + assert_eq!(&ResponseCode::Ok, response.code()); + + let response = client + .partitions(super_stream_name.to_string()) + .await + .unwrap(); + + assert_eq!(response.streams.get(0).unwrap(), "test-super-stream-partitions-0"); + assert_eq!(response.streams.get(1).unwrap(), "test-super-stream-partitions-1"); + assert_eq!(response.streams.get(2).unwrap(), "test-super-stream-partitions-2"); + + let response = client + .delete_super_stream(&super_stream_name) + .await + .unwrap(); + + assert_eq!(&ResponseCode::Ok, response.code()); + + let _ = client.close().await; +} + +#[tokio::test(flavor = "multi_thread")] +async fn client_test_route_test() { + let super_stream_name = "test-super-stream-route"; + + let client = Client::connect(ClientOptions::default()).await.unwrap(); + + let partitions: Vec = [ + "test-super-stream-route-0", + "test-super-stream-route-1", + "test-super-stream-route-2", + ] + .iter() + .map(|&x| x.into()) + .collect(); + + let binding_keys: Vec = ["0", "1", "2"].iter().map(|&x| x.into()).collect(); + + let response = client + .create_super_stream(&super_stream_name, partitions, binding_keys, HashMap::new()) + .await + .unwrap(); + + assert_eq!(&ResponseCode::Ok, response.code()); + + let response = client + .route("0".to_string(), super_stream_name.to_string()) + .await + .unwrap(); + + assert_eq!(response.streams.len(), 1); + assert_eq!(response.streams.get(0).unwrap(), "test-super-stream-route-0"); + + + let response = client + .delete_super_stream(&super_stream_name) + .await + .unwrap(); + + assert_eq!(&ResponseCode::Ok, response.code()); + + let _ = client.close().await; +} \ No newline at end of file From 4a7cb4553b1972a8ab846c4125cd1fc7e295e455 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Wed, 9 Oct 2024 09:40:53 +0200 Subject: [PATCH 4/4] refactoring tests --- .../src/commands/superstream_partitions.rs | 3 +- src/client/mod.rs | 23 ++- tests/integration/client_test.rs | 165 ++++-------------- tests/integration/common.rs | 63 ++++++- 4 files changed, 108 insertions(+), 146 deletions(-) diff --git a/protocol/src/commands/superstream_partitions.rs b/protocol/src/commands/superstream_partitions.rs index 8703ae93..bee07a7c 100644 --- a/protocol/src/commands/superstream_partitions.rs +++ b/protocol/src/commands/superstream_partitions.rs @@ -3,14 +3,13 @@ use std::io::Write; #[cfg(test)] use fake::Fake; +use super::Command; use crate::{ codec::{Decoder, Encoder}, error::{DecodeError, EncodeError}, protocol::commands::COMMAND_PARTITIONS, FromResponse, ResponseCode, }; -use crate::commands::exchange_command_versions::ExchangeCommandVersion; -use super::Command; #[cfg_attr(test, derive(fake::Dummy))] #[derive(PartialEq, Eq, Debug)] diff --git a/src/client/mod.rs b/src/client/mod.rs index bc7e05cb..458072cd 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -60,10 +60,12 @@ use rabbitmq_stream_protocol::{ sasl_handshake::{SaslHandshakeCommand, SaslHandshakeResponse}, store_offset::StoreOffset, subscribe::{OffsetSpecification, SubscribeCommand}, - tune::TunesCommand, - unsubscribe::UnSubscribeCommand, superstream_partitions::SuperStreamPartitionsResponse, - superstream_partitions::SuperStreamPartitionsRequest, superstream_route::SuperStreamRouteRequest, + superstream_partitions::SuperStreamPartitionsRequest, + superstream_partitions::SuperStreamPartitionsResponse, + superstream_route::SuperStreamRouteRequest, superstream_route::SuperStreamRouteResponse, + tune::TunesCommand, + unsubscribe::UnSubscribeCommand, }, types::PublishedMessage, FromResponse, Request, Response, ResponseCode, ResponseKind, @@ -299,18 +301,25 @@ impl Client { .await } - pub async fn partitions(&self, super_stream: String) -> RabbitMQStreamResult { + pub async fn partitions( + &self, + super_stream: String, + ) -> RabbitMQStreamResult { self.send_and_receive(|correlation_id| { SuperStreamPartitionsRequest::new(correlation_id, super_stream) }) - .await + .await } - pub async fn route(&self, routing_key: String, super_stream: String) -> RabbitMQStreamResult { + pub async fn route( + &self, + routing_key: String, + super_stream: String, + ) -> RabbitMQStreamResult { self.send_and_receive(|correlation_id| { SuperStreamRouteRequest::new(correlation_id, routing_key, super_stream) }) - .await + .await } pub async fn create_stream( diff --git a/tests/integration/client_test.rs b/tests/integration/client_test.rs index 38de6357..70368458 100644 --- a/tests/integration/client_test.rs +++ b/tests/integration/client_test.rs @@ -42,79 +42,26 @@ async fn client_create_stream_error_test() { #[tokio::test(flavor = "multi_thread")] async fn client_create_and_delete_super_stream_test() { - let super_stream_name = "test-super-stream"; - - let client = Client::connect(ClientOptions::default()).await.unwrap(); - - let partitions: Vec = [ - "test-super-stream-0", - "test-super-stream-1", - "test-super-stream-2", - ] - .iter() - .map(|&x| x.into()) - .collect(); - - let binding_keys: Vec = ["0", "1", "2"].iter().map(|&x| x.into()).collect(); - - let response = client - .create_super_stream(&super_stream_name, partitions, binding_keys, HashMap::new()) - .await - .unwrap(); - - assert_eq!(&ResponseCode::Ok, response.code()); - - let response = client - .delete_super_stream(&super_stream_name) - .await - .unwrap(); - - assert_eq!(&ResponseCode::Ok, response.code()); - - let _ = client.close().await; + let test = TestClient::create_super_stream().await; } + #[tokio::test(flavor = "multi_thread")] async fn client_create_super_stream_error_test() { - let super_stream_name = "test-super-stream-error"; - - let client = Client::connect(ClientOptions::default()).await.unwrap(); - - let partitions: Vec = [ - "test-super-stream-error-0", - "test-super-stream-error-1", - "test-super-stream-error-2", - ] - .iter() - .map(|&x| x.into()) - .collect(); - + let test = TestClient::create_super_stream().await; let binding_keys: Vec = ["0", "1", "2"].iter().map(|&x| x.into()).collect(); - let response = client + let response = test + .client .create_super_stream( - &super_stream_name, - partitions.clone(), - binding_keys.clone(), + &test.super_stream, + test.partitions.clone(), + binding_keys, HashMap::new(), ) .await .unwrap(); - assert_eq!(&ResponseCode::Ok, response.code()); - - let response = client - .create_super_stream(&super_stream_name, partitions, binding_keys, HashMap::new()) - .await - .unwrap(); - assert_eq!(&ResponseCode::StreamAlreadyExists, response.code()); - - let response = client - .delete_super_stream(&super_stream_name) - .await - .unwrap(); - - assert_eq!(&ResponseCode::Ok, response.code()); } async fn client_delete_stream_test() { let test = TestClient::create().await; @@ -463,86 +410,40 @@ async fn client_exchange_command_versions() { #[tokio::test(flavor = "multi_thread")] async fn client_test_partitions_test() { - let super_stream_name = "test-super-stream-partitions"; - - let client = Client::connect(ClientOptions::default()).await.unwrap(); - - let partitions: Vec = [ - "test-super-stream-partitions-0", - "test-super-stream-partitions-1", - "test-super-stream-partitions-2", - ] - .iter() - .map(|&x| x.into()) - .collect(); - - let binding_keys: Vec = ["0", "1", "2"].iter().map(|&x| x.into()).collect(); - - let response = client - .create_super_stream(&super_stream_name, partitions, binding_keys, HashMap::new()) - .await - .unwrap(); - - assert_eq!(&ResponseCode::Ok, response.code()); - - let response = client - .partitions(super_stream_name.to_string()) - .await - .unwrap(); - - assert_eq!(response.streams.get(0).unwrap(), "test-super-stream-partitions-0"); - assert_eq!(response.streams.get(1).unwrap(), "test-super-stream-partitions-1"); - assert_eq!(response.streams.get(2).unwrap(), "test-super-stream-partitions-2"); + let test = TestClient::create_super_stream().await; - let response = client - .delete_super_stream(&super_stream_name) + let response = test + .client + .partitions(test.super_stream.to_string()) .await .unwrap(); - assert_eq!(&ResponseCode::Ok, response.code()); - - let _ = client.close().await; + assert_eq!( + response.streams.get(0).unwrap(), + test.partitions.get(0).unwrap() + ); + assert_eq!( + response.streams.get(1).unwrap(), + test.partitions.get(1).unwrap() + ); + assert_eq!( + response.streams.get(2).unwrap(), + test.partitions.get(2).unwrap() + ); } #[tokio::test(flavor = "multi_thread")] async fn client_test_route_test() { - let super_stream_name = "test-super-stream-route"; - - let client = Client::connect(ClientOptions::default()).await.unwrap(); - - let partitions: Vec = [ - "test-super-stream-route-0", - "test-super-stream-route-1", - "test-super-stream-route-2", - ] - .iter() - .map(|&x| x.into()) - .collect(); - - let binding_keys: Vec = ["0", "1", "2"].iter().map(|&x| x.into()).collect(); - - let response = client - .create_super_stream(&super_stream_name, partitions, binding_keys, HashMap::new()) - .await - .unwrap(); - - assert_eq!(&ResponseCode::Ok, response.code()); - - let response = client - .route("0".to_string(), super_stream_name.to_string()) + let test = TestClient::create_super_stream().await; + let response = test + .client + .route("0".to_string(), test.super_stream.to_string()) .await .unwrap(); assert_eq!(response.streams.len(), 1); - assert_eq!(response.streams.get(0).unwrap(), "test-super-stream-route-0"); - - - let response = client - .delete_super_stream(&super_stream_name) - .await - .unwrap(); - - assert_eq!(&ResponseCode::Ok, response.code()); - - let _ = client.close().await; -} \ No newline at end of file + assert_eq!( + response.streams.get(0).unwrap(), + test.partitions.get(0).unwrap() + ); +} diff --git a/tests/integration/common.rs b/tests/integration/common.rs index 5834c794..98b4fec2 100644 --- a/tests/integration/common.rs +++ b/tests/integration/common.rs @@ -7,6 +7,8 @@ use rabbitmq_stream_protocol::ResponseCode; pub struct TestClient { pub client: Client, pub stream: String, + pub super_stream: String, + pub partitions: Vec, } pub struct TestEnvironment { @@ -22,16 +24,67 @@ impl TestClient { let response = client.create_stream(&stream, HashMap::new()).await.unwrap(); assert_eq!(&ResponseCode::Ok, response.code()); - TestClient { client, stream } + TestClient { + client, + stream, + super_stream: String::new(), + partitions: Vec::new(), + } + } + + pub async fn create_super_stream() -> TestClient { + let super_stream: String = Faker.fake(); + let client = Client::connect(ClientOptions::default()).await.unwrap(); + + let partitions: Vec = [ + super_stream.to_string() + "-0", + super_stream.to_string() + "-1", + super_stream.to_string() + "-2", + ] + .iter() + .map(|x| x.into()) + .collect(); + + let binding_keys: Vec = ["0", "1", "2"].iter().map(|&x| x.into()).collect(); + + let response = client + .create_super_stream( + &super_stream, + partitions.clone(), + binding_keys, + HashMap::new(), + ) + .await + .unwrap(); + + assert_eq!(&ResponseCode::Ok, response.code()); + TestClient { + client, + stream: String::new(), + super_stream, + partitions, + } } } impl Drop for TestClient { fn drop(&mut self) { - tokio::task::block_in_place(|| { - tokio::runtime::Handle::current() - .block_on(async { self.client.delete_stream(&self.stream).await.unwrap() }) - }); + if self.stream != "" { + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current() + .block_on(async { self.client.delete_stream(&self.stream).await.unwrap() }) + }); + } + if self.super_stream != "" { + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + self.client + .delete_super_stream(&self.super_stream) + .await + .unwrap() + }) + }); + } } }