From 9a89e3e11fda2cd59b1ac34600468f3372248198 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Fri, 4 Oct 2024 14:01:10 +0200 Subject: [PATCH 1/2] WIP: implementing create and delete superstream commands --- protocol/src/commands/create_super_stream.rs | 100 +++++++++++++++++++ protocol/src/commands/delete_super_stream.rs | 74 ++++++++++++++ protocol/src/commands/mod.rs | 2 + protocol/src/protocol.rs | 2 + protocol/src/request/mod.rs | 43 +++++++- protocol/src/request/shims.rs | 15 +++ protocol/src/response/mod.rs | 2 + src/client/mod.rs | 32 ++++++ src/environment.rs | 15 +++ src/stream_creator.rs | 42 ++++++++ tests/integration/client_test.rs | 33 ++++++ tests/integration/environment_test.rs | 18 ++++ 12 files changed, 373 insertions(+), 5 deletions(-) create mode 100644 protocol/src/commands/create_super_stream.rs create mode 100644 protocol/src/commands/delete_super_stream.rs diff --git a/protocol/src/commands/create_super_stream.rs b/protocol/src/commands/create_super_stream.rs new file mode 100644 index 00000000..60fddc16 --- /dev/null +++ b/protocol/src/commands/create_super_stream.rs @@ -0,0 +1,100 @@ +use std::collections::HashMap; +use std::io::Write; + +#[cfg(test)] +use fake::Fake; + +use crate::{ + codec::{Decoder, Encoder}, + error::{DecodeError, EncodeError}, + protocol::commands::COMMAND_CREATE_SUPER_STREAM, +}; + +use super::Command; + +#[cfg_attr(test, derive(fake::Dummy))] +#[derive(PartialEq, Eq, Debug)] +pub struct CreateSuperStreamCommand { + correlation_id: u32, + super_stream_name: String, + partitions: Vec, + binding_keys: Vec, + args: HashMap, +} + +impl CreateSuperStreamCommand { + pub fn new( + correlation_id: u32, + super_stream_name: String, + partitions: Vec, + binding_keys: Vec, + args: HashMap, + ) -> Self { + Self { + correlation_id, + super_stream_name, + partitions, + binding_keys, + args, + } + } +} + +impl Encoder for CreateSuperStreamCommand { + fn encoded_size(&self) -> u32 { + self.correlation_id.encoded_size() + + self.super_stream_name.as_str().encoded_size() + + self.partitions.encoded_size() + + self.binding_keys.encoded_size() + + self.args.encoded_size() + } + + fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> { + self.correlation_id.encode(writer)?; + self.super_stream_name.as_str().encode(writer)?; + self.partitions.encode(writer)?; + self.binding_keys.encode(writer)?; + self.args.encode(writer)?; + Ok(()) + } +} + +impl Decoder for CreateSuperStreamCommand { + fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> { + let (input, correlation_id) = u32::decode(input)?; + let (input, super_stream_name) = Option::decode(input)?; + let (input, partitions) = >::decode(input)?; + let (input, binding_keys) = >::decode(input)?; + let (input, args) = HashMap::decode(input)?; + + Ok(( + input, + CreateSuperStreamCommand { + correlation_id, + super_stream_name: super_stream_name.unwrap(), + partitions, + binding_keys, + args, + }, + )) + } +} + +impl Command for CreateSuperStreamCommand { + fn key(&self) -> u16 { + COMMAND_CREATE_SUPER_STREAM + } +} + +#[cfg(test)] +mod tests { + use crate::commands::create_stream::CreateStreamCommand; + use crate::commands::tests::command_encode_decode_test; + + use super::CreateSuperStreamCommand; + + #[test] + fn create_super_stream_request_test() { + command_encode_decode_test::(); + } +} diff --git a/protocol/src/commands/delete_super_stream.rs b/protocol/src/commands/delete_super_stream.rs new file mode 100644 index 00000000..21a57994 --- /dev/null +++ b/protocol/src/commands/delete_super_stream.rs @@ -0,0 +1,74 @@ +use std::io::Write; + +#[cfg(test)] +use fake::Fake; + +use crate::{ + codec::{Decoder, Encoder}, + error::{DecodeError, EncodeError}, + protocol::commands::COMMAND_DELETE_SUPER_STREAM, +}; + +use super::Command; + +#[cfg_attr(test, derive(fake::Dummy))] +#[derive(PartialEq, Eq, Debug)] +pub struct DeleteSuperStreamCommand { + correlation_id: u32, + super_stream_name: String, +} + +impl DeleteSuperStreamCommand { + pub fn new(correlation_id: u32, super_stream_name: String) -> Self { + Self { + correlation_id, + super_stream_name, + } + } +} + +impl Encoder for DeleteSuperStreamCommand { + fn encoded_size(&self) -> u32 { + self.correlation_id.encoded_size() + self.super_stream_name.as_str().encoded_size() + } + + fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> { + self.correlation_id.encode(writer)?; + self.super_stream_name.as_str().encode(writer)?; + Ok(()) + } +} + +impl Decoder for DeleteSuperStreamCommand { + fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> { + let (input, correlation_id) = u32::decode(input)?; + let (input, super_stream_name) = Option::decode(input)?; + + Ok(( + input, + DeleteSuperStreamCommand { + correlation_id, + super_stream_name: super_stream_name.unwrap(), + }, + )) + } +} + +impl Command for DeleteSuperStreamCommand { + fn key(&self) -> u16 { + COMMAND_DELETE_SUPER_STREAM + } +} + +#[cfg(test)] +mod tests { + use crate::commands::create_super_stream::CreateSuperStreamCommand; + use crate::commands::tests::command_encode_decode_test; + + use super::DeleteSuperStreamCommand; + + #[test] + fn delete_super_stream_request_test() { + command_encode_decode_test::(); + } +} diff --git a/protocol/src/commands/mod.rs b/protocol/src/commands/mod.rs index 0a4ae942..5392b07a 100644 --- a/protocol/src/commands/mod.rs +++ b/protocol/src/commands/mod.rs @@ -2,10 +2,12 @@ use crate::protocol::version::PROTOCOL_VERSION; pub mod close; pub mod create_stream; +pub mod create_super_stream; pub mod credit; pub mod declare_publisher; pub mod delete; pub mod delete_publisher; +pub mod delete_super_stream; pub mod deliver; pub mod exchange_command_versions; pub mod generic; diff --git a/protocol/src/protocol.rs b/protocol/src/protocol.rs index d87be920..5dcb0d3c 100644 --- a/protocol/src/protocol.rs +++ b/protocol/src/protocol.rs @@ -30,6 +30,8 @@ pub mod commands { pub const COMMAND_CONSUMER_UPDATE: u16 = 26; pub const COMMAND_EXCHANGE_COMMAND_VERSIONS: u16 = 27; pub const COMMAND_STREAMS_STATS: u16 = 28; + pub const COMMAND_CREATE_SUPER_STREAM: u16 = 29; + pub const COMMAND_DELETE_SUPER_STREAM: u16 = 30; } // server responses diff --git a/protocol/src/request/mod.rs b/protocol/src/request/mod.rs index fc616436..380450a8 100644 --- a/protocol/src/request/mod.rs +++ b/protocol/src/request/mod.rs @@ -3,9 +3,10 @@ use std::io::Write; use crate::{ codec::{decoder::read_u32, Decoder, Encoder}, commands::{ - close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand, + close::CloseRequest, create_stream::CreateStreamCommand, + create_super_stream::CreateSuperStreamCommand, credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete, - delete_publisher::DeletePublisherCommand, + delete_publisher::DeletePublisherCommand, delete_super_stream::DeleteSuperStreamCommand, exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand, metadata::MetadataCommand, open::OpenCommand, peer_properties::PeerPropertiesCommand, publish::PublishCommand, query_offset::QueryOffsetRequest, @@ -20,6 +21,7 @@ use crate::{ }; use byteorder::{BigEndian, WriteBytesExt}; + mod shims; #[derive(Debug, PartialEq, Eq)] pub struct Request { @@ -60,6 +62,8 @@ pub enum RequestKind { StoreOffset(StoreOffset), Unsubscribe(UnSubscribeCommand), ExchangeCommandVersions(ExchangeCommandVersionsRequest), + CreateSuperStream(CreateSuperStreamCommand), + DeleteSuperStream(DeleteSuperStreamCommand), } impl Encoder for RequestKind { @@ -87,6 +91,12 @@ impl Encoder for RequestKind { RequestKind::ExchangeCommandVersions(exchange_command_versions) => { exchange_command_versions.encoded_size() } + RequestKind::CreateSuperStream(create_super_stream) => { + create_super_stream.encoded_size() + } + RequestKind::DeleteSuperStream(delete_super_stream) => { + delete_super_stream.encoded_size() + } } } @@ -114,6 +124,12 @@ impl Encoder for RequestKind { RequestKind::ExchangeCommandVersions(exchange_command_versions) => { exchange_command_versions.encode(writer) } + RequestKind::CreateSuperStream(create_super_stream) => { + create_super_stream.encode(writer) + } + RequestKind::DeleteSuperStream(delete_super_stream) => { + delete_super_stream.encode(writer) + } } } } @@ -182,6 +198,12 @@ impl Decoder for Request { COMMAND_EXCHANGE_COMMAND_VERSIONS => { ExchangeCommandVersionsRequest::decode(input).map(|(i, kind)| (i, kind.into()))? } + COMMAND_CREATE_SUPER_STREAM => { + CreateSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))? + } + COMMAND_DELETE_SUPER_STREAM => { + DeleteSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))? + } n => return Err(DecodeError::UnsupportedResponseType(n)), }; Ok((input, Request { header, kind: cmd })) @@ -194,9 +216,11 @@ mod tests { use crate::{ codec::{Decoder, Encoder}, commands::{ - close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand, + close::CloseRequest, create_stream::CreateStreamCommand, + create_super_stream::CreateSuperStreamCommand, credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete, delete_publisher::DeletePublisherCommand, + delete_super_stream::DeleteSuperStreamCommand, exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand, metadata::MetadataCommand, open::OpenCommand, peer_properties::PeerPropertiesCommand, publish::PublishCommand, @@ -209,9 +233,8 @@ mod tests { use std::fmt::Debug; - use fake::{Dummy, Fake, Faker}; - use super::Request; + use fake::{Dummy, Fake, Faker}; #[test] fn request_open_test() { @@ -324,4 +347,14 @@ mod tests { fn request_exchange_command_versions_test() { request_encode_decode_test::() } + + #[test] + fn request_create_super_stream_test() { + request_encode_decode_test::() + } + + #[test] + fn request_delete_super_stream_test() { + request_encode_decode_test::() + } } diff --git a/protocol/src/request/shims.rs b/protocol/src/request/shims.rs index 323d4d51..0f7964a2 100644 --- a/protocol/src/request/shims.rs +++ b/protocol/src/request/shims.rs @@ -1,3 +1,5 @@ +use crate::commands::create_super_stream::CreateSuperStreamCommand; +use crate::commands::delete_super_stream::DeleteSuperStreamCommand; use crate::{ commands::{ close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand, @@ -14,6 +16,7 @@ use crate::{ types::Header, Request, RequestKind, }; + impl From for Request where T: Into + Command, @@ -135,3 +138,15 @@ impl From for RequestKind { RequestKind::ExchangeCommandVersions(cmd) } } + +impl From for RequestKind { + fn from(cmd: CreateSuperStreamCommand) -> Self { + RequestKind::CreateSuperStream(cmd) + } +} + +impl From for RequestKind { + fn from(cmd: DeleteSuperStreamCommand) -> Self { + RequestKind::DeleteSuperStream(cmd) + } +} diff --git a/protocol/src/response/mod.rs b/protocol/src/response/mod.rs index 905a0954..89bc9243 100644 --- a/protocol/src/response/mod.rs +++ b/protocol/src/response/mod.rs @@ -139,6 +139,8 @@ impl Decoder for Response { | COMMAND_SUBSCRIBE | COMMAND_UNSUBSCRIBE | COMMAND_CREATE_STREAM + | COMMAND_CREATE_SUPER_STREAM + | COMMAND_DELETE_SUPER_STREAM | COMMAND_DELETE_STREAM => { GenericResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Generic(kind)))? } diff --git a/src/client/mod.rs b/src/client/mod.rs index 83e2a096..69549e8b 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -42,10 +42,12 @@ use rabbitmq_stream_protocol::{ commands::{ close::{CloseRequest, CloseResponse}, create_stream::CreateStreamCommand, + create_super_stream::CreateSuperStreamCommand, credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete, delete_publisher::DeletePublisherCommand, + delete_super_stream::DeleteSuperStreamCommand, generic::GenericResponse, heart_beat::HeartBeatCommand, metadata::MetadataCommand, @@ -306,11 +308,40 @@ impl Client { .await } + pub async fn create_super_stream( + &self, + super_stream: &str, + partitions: Vec, + binding_keys: Vec, + options: HashMap, + ) -> RabbitMQStreamResult { + self.send_and_receive(|correlation_id| { + CreateSuperStreamCommand::new( + correlation_id, + super_stream.to_owned(), + partitions, + binding_keys, + options, + ) + }) + .await + } + pub async fn delete_stream(&self, stream: &str) -> RabbitMQStreamResult { self.send_and_receive(|correlation_id| Delete::new(correlation_id, stream.to_owned())) .await } + pub async fn delete_super_stream( + &self, + super_stream: &str, + ) -> RabbitMQStreamResult { + self.send_and_receive(|correlation_id| { + DeleteSuperStreamCommand::new(correlation_id, super_stream.to_owned()) + }) + .await + } + pub async fn credit(&self, subscription_id: u8, credit: u16) -> RabbitMQStreamResult<()> { self.send(CreditCommand::new(subscription_id, credit)).await } @@ -574,6 +605,7 @@ impl Client { M: FnOnce(u32) -> R, { let Some((correlation_id, mut receiver)) = self.dispatcher.response_channel() else { + trace!("Connection si closed here"); return Err(ClientError::ConnectionClosed); }; diff --git a/src/environment.rs b/src/environment.rs index 8b1a590d..84817a6b 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -77,6 +77,21 @@ impl Environment { }) } } + + pub async fn delete_super_stream(&self, super_stream: &str) -> Result<(), StreamDeleteError> { + let client = self.create_client().await?; + let response = client.delete_super_stream(super_stream).await?; + client.close().await?; + + if response.is_ok() { + Ok(()) + } else { + Err(StreamDeleteError::Delete { + stream: super_stream.to_owned(), + status: response.code().clone(), + }) + } + } } /// Builder for [`Environment`] diff --git a/src/stream_creator.rs b/src/stream_creator.rs index 085eb164..5625aee4 100644 --- a/src/stream_creator.rs +++ b/src/stream_creator.rs @@ -34,6 +34,48 @@ impl StreamCreator { } } + pub async fn create_super_stream( + self, + super_stream: &str, + number_of_partitions: usize, + binding_keys: Option>, + ) -> Result<(), StreamCreateError> { + let mut partitions_names = Vec::with_capacity(number_of_partitions); + let mut new_binding_keys: Vec = Vec::with_capacity(number_of_partitions); + + if binding_keys.is_none() { + for i in 0..number_of_partitions { + new_binding_keys.push(i.to_string()); + partitions_names.push(super_stream.to_owned() + "-" + i.to_string().as_str()) + } + } else { + new_binding_keys = binding_keys.unwrap(); + for binding_key in new_binding_keys.iter() { + partitions_names.push(super_stream.to_owned() + "-" + binding_key) + } + } + + let client = self.env.create_client().await?; + let response = client + .create_super_stream( + super_stream, + partitions_names, + new_binding_keys, + self.options, + ) + .await?; + client.close().await?; + + if response.is_ok() { + Ok(()) + } else { + Err(StreamCreateError::Create { + stream: super_stream.to_owned(), + status: response.code().clone(), + }) + } + } + pub fn max_age(mut self, max_age: Duration) -> Self { self.options .insert("max-age".to_owned(), format!("{}s", max_age.as_secs())); diff --git a/tests/integration/client_test.rs b/tests/integration/client_test.rs index c9dae67f..fab736fb 100644 --- a/tests/integration/client_test.rs +++ b/tests/integration/client_test.rs @@ -41,6 +41,39 @@ async fn client_create_stream_error_test() { } #[tokio::test(flavor = "multi_thread")] +async fn client_create_and_delete_super_stream_test() { + let super_stream_name = "test-super-stream"; + + let client = Client::connect(ClientOptions::default()).await.unwrap(); + + let partitions: Vec = [ + "test-super-stream-0", + "test-super-stream-1", + "test-super-stream-2", + ] + .iter() + .map(|&x| x.into()) + .collect(); + + let binding_keys: Vec = ["0", "1", "2"].iter().map(|&x| x.into()).collect(); + + let response = client + .create_super_stream(&super_stream_name, partitions, binding_keys, HashMap::new()) + .await + .unwrap(); + + assert_eq!(&ResponseCode::Ok, response.code()); + + let response = client + .delete_super_stream(&super_stream_name) + .await + .unwrap(); + + assert_eq!(&ResponseCode::Ok, response.code()); + + let _ = client.close().await; +} + async fn client_delete_stream_test() { let test = TestClient::create().await; diff --git a/tests/integration/environment_test.rs b/tests/integration/environment_test.rs index abf900a3..d8cc87be 100644 --- a/tests/integration/environment_test.rs +++ b/tests/integration/environment_test.rs @@ -13,6 +13,24 @@ async fn environment_create_test() { let _ = TestEnvironment::create().await; } +#[tokio::test(flavor = "multi_thread")] +async fn environment_create_and_delete_super_stream_test() { + let super_stream = "super_stream_test"; + let env = Environment::builder().build().await.unwrap(); + + let response = env + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create_super_stream(super_stream, 3, None) + .await; + + assert_eq!(response.is_ok(), true); + + let response = env.delete_super_stream(super_stream).await; + + assert_eq!(response.is_ok(), true); +} + #[tokio::test(flavor = "multi_thread")] async fn environment_fail_to_connect_wrong_config() { // test the wrong config From 551df267bf729c8645c4d89f6d821697ef2e049d Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Tue, 8 Oct 2024 09:17:40 +0200 Subject: [PATCH 2/2] adding superstream error-case test --- tests/integration/client_test.rs | 42 ++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/integration/client_test.rs b/tests/integration/client_test.rs index fab736fb..c4f2650b 100644 --- a/tests/integration/client_test.rs +++ b/tests/integration/client_test.rs @@ -73,7 +73,49 @@ async fn client_create_and_delete_super_stream_test() { let _ = client.close().await; } +#[tokio::test(flavor = "multi_thread")] +async fn client_create_super_stream_error_test() { + let super_stream_name = "test-super-stream-error"; + + let client = Client::connect(ClientOptions::default()).await.unwrap(); + + let partitions: Vec = [ + "test-super-stream-error-0", + "test-super-stream-error-1", + "test-super-stream-error-2", + ] + .iter() + .map(|&x| x.into()) + .collect(); + + let binding_keys: Vec = ["0", "1", "2"].iter().map(|&x| x.into()).collect(); + + let response = client + .create_super_stream( + &super_stream_name, + partitions.clone(), + binding_keys.clone(), + HashMap::new(), + ) + .await + .unwrap(); + assert_eq!(&ResponseCode::Ok, response.code()); + + let response = client + .create_super_stream(&super_stream_name, partitions, binding_keys, HashMap::new()) + .await + .unwrap(); + + assert_eq!(&ResponseCode::StreamAlreadyExists, response.code()); + + let response = client + .delete_super_stream(&super_stream_name) + .await + .unwrap(); + + assert_eq!(&ResponseCode::Ok, response.code()); +} async fn client_delete_stream_test() { let test = TestClient::create().await;