diff --git a/examples/environment.rs b/examples/environment.rs index 9c52470f..2fe3d07a 100644 --- a/examples/environment.rs +++ b/examples/environment.rs @@ -28,7 +28,7 @@ async fn main() -> Result<(), Box> { let mut producer = environment .producer() - .name("test_producer") + .client_provided_name("my producer") .build("test") .await?; @@ -42,6 +42,7 @@ async fn main() -> Result<(), Box> { let mut consumer = environment .consumer() + .client_provided_name("my consumer") .offset(OffsetSpecification::First) .build("test") .await diff --git a/examples/receive_super_stream.rs b/examples/receive_super_stream.rs index 125d4d59..6adbc2c9 100644 --- a/examples/receive_super_stream.rs +++ b/examples/receive_super_stream.rs @@ -35,6 +35,7 @@ async fn main() -> Result<(), Box> { let mut super_stream_consumer: SuperStreamConsumer = environment .super_stream_consumer() .offset(OffsetSpecification::First) + .client_provided_name("my super stream consumer for hello rust") .build(super_stream) .await .unwrap(); diff --git a/examples/send_super_stream.rs b/examples/send_super_stream.rs index 3b6269b6..a0318b90 100644 --- a/examples/send_super_stream.rs +++ b/examples/send_super_stream.rs @@ -73,6 +73,7 @@ async fn main() -> Result<(), Box> { routing_extractor: &hash_strategy_value_extractor, }, )) + .client_provided_name("my super stream producer for hello rust") .build(super_stream) .await .unwrap(); diff --git a/src/client/mod.rs b/src/client/mod.rs index 458072cd..dc242d6e 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -200,6 +200,7 @@ pub struct Client { tune_notifier: Arc, publish_sequence: Arc, filtering_supported: bool, + client_properties: HashMap, } impl Client { @@ -227,8 +228,34 @@ impl Client { tune_notifier: Arc::new(Notify::new()), publish_sequence: Arc::new(AtomicU64::new(1)), filtering_supported: false, + client_properties: HashMap::new(), }; + const VERSION: &str = env!("CARGO_PKG_VERSION"); + + client + .client_properties + .insert(String::from("product"), String::from("RabbitMQ")); + client + .client_properties + .insert(String::from("version"), String::from(VERSION)); + client + .client_properties + .insert(String::from("platform"), String::from("Rust")); + client.client_properties.insert( + String::from("copyright"), + String::from("Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.")); + client.client_properties.insert( + String::from("information"), + String::from( + "Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/", + ), + ); + client.client_properties.insert( + String::from("connection_name"), + client.opts.client_provided_name.clone(), + ); + client.initialize(receiver).await?; let command_versions = client.exchange_command_versions().await?; @@ -236,7 +263,6 @@ impl Client { if max_version >= 2 { client.filtering_supported = true } - Ok(client) } @@ -676,7 +702,7 @@ impl Client { async fn peer_properties(&self) -> Result, ClientError> { self.send_and_receive::(|correlation_id| { - PeerPropertiesCommand::new(correlation_id, HashMap::new()) + PeerPropertiesCommand::new(correlation_id, self.client_properties.clone()) }) .await .map(|peer_properties| peer_properties.server_properties) diff --git a/src/client/options.rs b/src/client/options.rs index 829c0f1d..a3824921 100644 --- a/src/client/options.rs +++ b/src/client/options.rs @@ -15,6 +15,7 @@ pub struct ClientOptions { pub(crate) load_balancer_mode: bool, pub(crate) tls: TlsConfiguration, pub(crate) collector: Arc, + pub(crate) client_provided_name: String, } impl Debug for ClientOptions { @@ -27,6 +28,7 @@ impl Debug for ClientOptions { .field("v_host", &self.v_host) .field("heartbeat", &self.heartbeat) .field("max_frame_size", &self.max_frame_size) + .field("client_provided_name", &self.client_provided_name) .finish() } } @@ -49,6 +51,7 @@ impl Default for ClientOptions { client_certificates_path: String::from(""), client_keys_path: String::from(""), }, + client_provided_name: String::from("rust-stream"), } } } @@ -69,6 +72,10 @@ impl ClientOptions { pub fn set_port(&mut self, port: u16) { self.port = port; } + + pub fn set_client_provided_name(&mut self, name: &str) { + self.client_provided_name = name.to_owned(); + } } pub struct ClientOptionsBuilder(ClientOptions); diff --git a/src/consumer.rs b/src/consumer.rs index 23661cc8..a6d63457 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -88,13 +88,21 @@ pub struct ConsumerBuilder { pub(crate) environment: Environment, pub(crate) offset_specification: OffsetSpecification, pub(crate) filter_configuration: Option, + pub(crate) client_provided_name: String, } impl ConsumerBuilder { pub async fn build(self, stream: &str) -> Result { // Connect to the user specified node first, then look for a random replica to connect to instead. - // This is recommended for load balancing purposes. - let mut client = self.environment.create_client().await?; + // This is recommended for load balancing purposes + + let mut opt_with_client_provided_name = self.environment.options.client_options.clone(); + opt_with_client_provided_name.client_provided_name = self.client_provided_name.clone(); + + let mut client = self + .environment + .create_client_with_options(opt_with_client_provided_name) + .await?; let collector = self.environment.options.client_options.collector.clone(); if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) { // If there are no replicas we do not reassign client, meaning we just keep reading from the leader. @@ -199,6 +207,11 @@ impl ConsumerBuilder { self } + pub fn client_provided_name(mut self, name: &str) -> Self { + self.client_provided_name = String::from(name); + self + } + pub fn name(mut self, consumer_name: &str) -> Self { self.consumer_name = Some(String::from(consumer_name)); self diff --git a/src/environment.rs b/src/environment.rs index 32a7b88e..7b34e0a1 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -49,6 +49,7 @@ impl Environment { batch_publishing_delay: Duration::from_millis(100), data: PhantomData, filter_value_extractor: None, + client_provided_name: String::from("rust-stream-producer"), } } @@ -61,6 +62,7 @@ impl Environment { data: PhantomData, filter_value_extractor: None, route_strategy: routing_strategy, + client_provided_name: String::from("rust-super-stream-producer"), } } @@ -71,6 +73,7 @@ impl Environment { environment: self.clone(), offset_specification: OffsetSpecification::Next, filter_configuration: None, + client_provided_name: String::from("rust-stream-consumer"), } } @@ -79,12 +82,19 @@ impl Environment { environment: self.clone(), offset_specification: OffsetSpecification::Next, filter_configuration: None, + client_provided_name: String::from("rust-super-stream-consumer"), } } pub(crate) async fn create_client(&self) -> RabbitMQStreamResult { Client::connect(self.options.client_options.clone()).await } + pub(crate) async fn create_client_with_options( + &self, + opts: impl Into, + ) -> RabbitMQStreamResult { + Client::connect(opts).await + } /// Delete a stream pub async fn delete_stream(&self, stream: &str) -> Result<(), StreamDeleteError> { @@ -172,6 +182,11 @@ impl EnvironmentBuilder { self.0.client_options.load_balancer_mode = load_balancer_mode; self } + + pub fn client_provided_name(mut self, name: &str) -> EnvironmentBuilder { + self.0.client_options.client_provided_name = name.to_owned(); + self + } } #[derive(Clone, Default)] pub struct EnvironmentOptions { diff --git a/src/producer.rs b/src/producer.rs index c781587c..158d9096 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -105,6 +105,7 @@ pub struct ProducerBuilder { pub batch_publishing_delay: Duration, pub(crate) data: PhantomData, pub filter_value_extractor: Option, + pub(crate) client_provided_name: String, } #[derive(Clone)] @@ -117,7 +118,14 @@ impl ProducerBuilder { // Connect to the user specified node first, then look for the stream leader. // The leader is the recommended node for writing, because writing to a replica will redundantly pass these messages // to the leader anyway - it is the only one capable of writing. - let mut client = self.environment.create_client().await?; + + let mut opt_with_client_provided_name = self.environment.options.client_options.clone(); + opt_with_client_provided_name.client_provided_name = self.client_provided_name.clone(); + + let mut client = self + .environment + .create_client_with_options(opt_with_client_provided_name.clone()) + .await?; let mut publish_version = 1; @@ -157,7 +165,7 @@ impl ProducerBuilder { client = Client::connect(ClientOptions { host: metadata.leader.host.clone(), port: metadata.leader.port as u16, - ..self.environment.options.client_options + ..opt_with_client_provided_name.clone() }) .await? }; @@ -227,6 +235,12 @@ impl ProducerBuilder { self.batch_publishing_delay = delay; self } + + pub fn client_provided_name(mut self, name: &str) -> Self { + self.client_provided_name = String::from(name); + self + } + pub fn name(mut self, name: &str) -> ProducerBuilder { self.name = Some(name.to_owned()); ProducerBuilder { @@ -236,6 +250,7 @@ impl ProducerBuilder { batch_publishing_delay: self.batch_publishing_delay, data: PhantomData, filter_value_extractor: None, + client_provided_name: String::from("rust-stream-producer"), } } diff --git a/src/superstream_consumer.rs b/src/superstream_consumer.rs index b9938880..23857f54 100644 --- a/src/superstream_consumer.rs +++ b/src/superstream_consumer.rs @@ -32,6 +32,7 @@ pub struct SuperStreamConsumerBuilder { pub(crate) environment: Environment, pub(crate) offset_specification: OffsetSpecification, pub(crate) filter_configuration: Option, + pub(crate) client_provided_name: String, } impl SuperStreamConsumerBuilder { @@ -59,6 +60,7 @@ impl SuperStreamConsumerBuilder { .environment .consumer() .offset(self.offset_specification.clone()) + .client_provided_name(self.client_provided_name.as_str()) .filter_input(self.filter_configuration.clone()) .build(partition.as_str()) .await @@ -95,6 +97,11 @@ impl SuperStreamConsumerBuilder { self.filter_configuration = filter_configuration; self } + + pub fn client_provided_name(mut self, name: &str) -> Self { + self.client_provided_name = String::from(name); + self + } } impl Stream for SuperStreamConsumer { diff --git a/src/superstream_producer.rs b/src/superstream_producer.rs index 7598afab..b0678d69 100644 --- a/src/superstream_producer.rs +++ b/src/superstream_producer.rs @@ -29,6 +29,7 @@ pub struct SuperStreamProducerBuilder { pub filter_value_extractor: Option, pub route_strategy: RoutingStrategy, pub(crate) data: PhantomData, + pub(crate) client_provided_name: String, } pub struct SuperStreamProducerInternal { @@ -36,6 +37,7 @@ pub struct SuperStreamProducerInternal { client: Client, // TODO: implement filtering for superstream filter_value_extractor: Option, + client_provided_name: String, } impl SuperStreamProducer { @@ -70,6 +72,7 @@ impl SuperStreamProducer { .0 .environment .producer() + .client_provided_name(self.0.client_provided_name.as_str()) .filter_value_extractor_arc(self.0.filter_value_extractor.clone()) .build(route.as_str()) .await?; @@ -126,6 +129,7 @@ impl SuperStreamProducerBuilder { environment: self.environment.clone(), client, filter_value_extractor: self.filter_value_extractor, + client_provided_name: self.client_provided_name, }; let internal_producer = Arc::new(super_stream_producer); @@ -148,4 +152,9 @@ impl SuperStreamProducerBuilder { self.filter_value_extractor = Some(f); self } + + pub fn client_provided_name(mut self, name: &str) -> Self { + self.client_provided_name = String::from(name); + self + } } diff --git a/tests/integration/client_test.rs b/tests/integration/client_test.rs index 822fd66e..4051c484 100644 --- a/tests/integration/client_test.rs +++ b/tests/integration/client_test.rs @@ -17,7 +17,15 @@ use crate::common::TestClient; #[tokio::test] async fn client_connection_test() { let client = Client::connect(ClientOptions::default()).await.unwrap(); + assert_ne!(client.server_properties().await.len(), 0); + assert_ne!(client.connection_properties().await.len(), 0); +} +#[tokio::test] +async fn client_connection_with_properties_test() { + let mut opts = ClientOptions::default(); + opts.set_client_provided_name("my_connection_name"); + let client = Client::connect(opts).await.unwrap(); assert_ne!(client.server_properties().await.len(), 0); assert_ne!(client.connection_properties().await.len(), 0); } diff --git a/tests/integration/consumer_test.rs b/tests/integration/consumer_test.rs index d2bd68ea..f4f18996 100644 --- a/tests/integration/consumer_test.rs +++ b/tests/integration/consumer_test.rs @@ -39,6 +39,7 @@ async fn consumer_test() { let mut consumer = env .env .consumer() + .client_provided_name("my consumer") .offset(OffsetSpecification::Next) .build(&env.stream) .await @@ -78,6 +79,7 @@ async fn super_stream_consumer_test() { routing_extractor: &hash_strategy_value_extractor, }, )) + .client_provided_name("test super stream consumer ") .build(&env.super_stream) .await .unwrap(); diff --git a/tests/integration/producer_test.rs b/tests/integration/producer_test.rs index b26d070e..588ef203 100644 --- a/tests/integration/producer_test.rs +++ b/tests/integration/producer_test.rs @@ -60,6 +60,7 @@ async fn producer_send_name_deduplication_unique_ids() { let mut producer = env .producer() .name("my_producer") + .client_provided_name("my producer") .build(&stream) .await .unwrap(); @@ -543,6 +544,7 @@ async fn hash_super_steam_producer_test() { routing_extractor: &hash_strategy_value_extractor, }, )) + .client_provided_name("test super stream producer ") .build(&env.super_stream) .await .unwrap();