From 69e5d42e094bbf6b2d1ef51a8a95e991ebc0497c Mon Sep 17 00:00:00 2001 From: JiaYing Zhang Date: Sat, 27 Jul 2024 10:57:34 +0800 Subject: [PATCH 1/4] add the load_malance_made attribute to the Client Options. --- README.md | 13 +++++++++++++ src/client/options.rs | 9 +++++++++ src/consumer.rs | 27 ++++++++++++++++++++------- src/environment.rs | 5 +++++ src/producer.rs | 29 ++++++++++++++++++++++------- 5 files changed, 69 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 4ba1e33d..aa8b5ddc 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,19 @@ let environment = Environment::builder() .build() ``` +##### Building the environment with a load balancer + +```rust,no_run +use rabbitmq_stream_client::Environment; + + +let environment = Environment::builder() + .load_balancer_mode(true) + .build() +``` + + + ##### Publishing messages ```rust,no_run diff --git a/src/client/options.rs b/src/client/options.rs index 16ae1b72..829c0f1d 100644 --- a/src/client/options.rs +++ b/src/client/options.rs @@ -12,6 +12,7 @@ pub struct ClientOptions { pub(crate) v_host: String, pub(crate) heartbeat: u32, pub(crate) max_frame_size: u32, + pub(crate) load_balancer_mode: bool, pub(crate) tls: TlsConfiguration, pub(crate) collector: Arc, } @@ -39,6 +40,7 @@ impl Default for ClientOptions { v_host: "/".to_owned(), heartbeat: 60, max_frame_size: 1048576, + load_balancer_mode: false, collector: Arc::new(NopMetricsCollector {}), tls: TlsConfiguration { enabled: false, @@ -117,6 +119,11 @@ impl ClientOptionsBuilder { self } + pub fn load_balancer_mode(mut self, load_balancer_mode: bool) -> Self { + self.0.load_balancer_mode = load_balancer_mode; + self + } + pub fn build(self) -> ClientOptions { self.0 } @@ -145,6 +152,7 @@ mod tests { client_keys_path: String::from(""), }) .collector(Arc::new(NopMetricsCollector {})) + .load_balancer_mode(true) .build(); assert_eq!(options.host, "test"); assert_eq!(options.port, 8888); @@ -154,5 +162,6 @@ mod tests { assert_eq!(options.heartbeat, 10000); assert_eq!(options.max_frame_size, 1); assert_eq!(options.tls.enabled, true); + assert_eq!(options.load_balancer_mode, true); } } diff --git a/src/consumer.rs b/src/consumer.rs index 59971d3f..58959028 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -76,12 +76,26 @@ impl ConsumerBuilder { metadata.replicas, stream ); - client = Client::connect(ClientOptions { - host: replica.host.clone(), - port: replica.port as u16, - ..self.environment.options.client_options - }) - .await?; + let load_balancer_mode = self.environment.options.client_options.load_balancer_mode; + if load_balancer_mode { + let options = self.environment.options.client_options.clone(); + loop { + let temp_client = Client::connect(options.clone()).await?; + let mapping = temp_client.connection_properties().await; + let advertised_host = mapping.get("advertised_host").unwrap(); + if *advertised_host == replica.host.clone() { + client = temp_client; + break; + } + } + } else { + client = Client::connect(ClientOptions { + host: replica.host.clone(), + port: replica.port as u16, + ..self.environment.options.client_options + }) + .await?; + } } } else { return Err(ConsumerCreateError::StreamDoesNotExist { @@ -100,7 +114,6 @@ impl ConsumerBuilder { waker: AtomicWaker::new(), metrics_collector: collector, }); - let msg_handler = ConsumerMessageHandler(consumer.clone()); client.set_handler(msg_handler).await; diff --git a/src/environment.rs b/src/environment.rs index 84bd11fd..b63077be 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -126,6 +126,11 @@ impl EnvironmentBuilder { self.0.client_options.collector = Arc::new(collector); self } + + pub fn load_balancer_mode(mut self, load_balancer_mode: bool) -> EnvironmentBuilder { + self.0.client_options.load_balancer_mode = load_balancer_mode; + self + } } #[derive(Clone, Default)] pub struct EnvironmentOptions { diff --git a/src/producer.rs b/src/producer.rs index 39ad8bce..aa289187 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -119,13 +119,28 @@ impl ProducerBuilder { metadata.leader, stream ); - client.close().await?; - client = Client::connect(ClientOptions { - host: metadata.leader.host.clone(), - port: metadata.leader.port as u16, - ..self.environment.options.client_options - }) - .await?; + let load_balancer_mode: bool = self.environment.options.client_options.load_balancer_mode; + if load_balancer_mode { + // Producer must connect to leader node + let options: ClientOptions = self.environment.options.client_options.clone(); + loop { + let temp_client = Client::connect(options.clone()).await?; + let mapping = temp_client.connection_properties().await; + let advertised_host = mapping.get("advertised_host").unwrap(); + if *advertised_host == metadata.leader.host.clone() { + client = temp_client; + break; + } + } + } else { + client.close().await?; + client = Client::connect(ClientOptions { + host: metadata.leader.host.clone(), + port: metadata.leader.port as u16, + ..self.environment.options.client_options + }) + .await? + }; } else { return Err(ProducerCreateError::StreamDoesNotExist { stream: stream.into(), From 76550c11773efee163131cf562a4c98089074837 Mon Sep 17 00:00:00 2001 From: JiaYing Zhang Date: Tue, 30 Jul 2024 09:08:44 +0800 Subject: [PATCH 2/4] fixup cargo fmt --- src/producer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/producer.rs b/src/producer.rs index aa289187..7b3ec5e0 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -119,7 +119,7 @@ impl ProducerBuilder { metadata.leader, stream ); - let load_balancer_mode: bool = self.environment.options.client_options.load_balancer_mode; + let load_balancer_mode = self.environment.options.client_options.load_balancer_mode; if load_balancer_mode { // Producer must connect to leader node let options: ClientOptions = self.environment.options.client_options.clone(); From 9faf502a0bc717d22ffbd1af3baae35ff21494ed Mon Sep 17 00:00:00 2001 From: JiaYing Zhang Date: Tue, 30 Jul 2024 16:37:45 +0800 Subject: [PATCH 3/4] fixup: remove unwrap and skip the temp_client if no advertised_host property --- src/consumer.rs | 9 +++++---- src/producer.rs | 9 +++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/consumer.rs b/src/consumer.rs index 58959028..409cd374 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -82,10 +82,11 @@ impl ConsumerBuilder { loop { let temp_client = Client::connect(options.clone()).await?; let mapping = temp_client.connection_properties().await; - let advertised_host = mapping.get("advertised_host").unwrap(); - if *advertised_host == replica.host.clone() { - client = temp_client; - break; + if let Some(advertised_host) = mapping.get("advertised_host") { + if *advertised_host == replica.host.clone() { + client = temp_client; + break; + } } } } else { diff --git a/src/producer.rs b/src/producer.rs index 7b3ec5e0..db8169ca 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -126,10 +126,11 @@ impl ProducerBuilder { loop { let temp_client = Client::connect(options.clone()).await?; let mapping = temp_client.connection_properties().await; - let advertised_host = mapping.get("advertised_host").unwrap(); - if *advertised_host == metadata.leader.host.clone() { - client = temp_client; - break; + if let Some(advertised_host) = mapping.get("advertised_host") { + if *advertised_host == metadata.leader.host.clone() { + client = temp_client; + break; + } } } } else { From 17eb38d585ec69ff3f676e3d154b1b9c98a8cb5a Mon Sep 17 00:00:00 2001 From: JiaYing Zhang Date: Tue, 30 Jul 2024 18:18:11 +0800 Subject: [PATCH 4/4] fixup: fixup clippy error --- src/environment.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/environment.rs b/src/environment.rs index b63077be..cf472729 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -121,7 +121,7 @@ impl EnvironmentBuilder { } pub fn metrics_collector( mut self, - collector: impl MetricsCollector + Send + Sync + 'static, + collector: impl MetricsCollector + 'static, ) -> EnvironmentBuilder { self.0.client_options.collector = Arc::new(collector); self