Skip to content

Commit 1eca3b8

Browse files
committed
add client_provided_name
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent b21981d commit 1eca3b8

File tree

9 files changed

+67
-5
lines changed

9 files changed

+67
-5
lines changed

examples/environment.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2828

2929
let mut producer = environment
3030
.producer()
31-
.name("test_producer")
31+
.client_provided_name("my producer")
3232
.build("test")
3333
.await?;
3434

@@ -42,6 +42,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4242

4343
let mut consumer = environment
4444
.consumer()
45+
.client_provided_name("my consumer")
4546
.offset(OffsetSpecification::First)
4647
.build("test")
4748
.await

src/client/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,9 +231,14 @@ impl Client {
231231
client_properties: HashMap::new(),
232232
};
233233

234+
const VERSION: &str = env!("CARGO_PKG_VERSION");
235+
234236
client
235237
.client_properties
236238
.insert(String::from("product"), String::from("RabbitMQ"));
239+
client
240+
.client_properties
241+
.insert(String::from("version"), String::from(VERSION));
237242
client
238243
.client_properties
239244
.insert(String::from("platform"), String::from("Rust"));
@@ -259,6 +264,7 @@ impl Client {
259264
client.filtering_supported = true
260265
}
261266

267+
println!("Connect {}", client.opts.client_provided_name.clone());
262268
Ok(client)
263269
}
264270

src/client/options.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ impl ClientOptions {
7272
pub fn set_port(&mut self, port: u16) {
7373
self.port = port;
7474
}
75+
76+
pub fn set_client_provided_name(&mut self, name: &str) {
77+
self.client_provided_name = name.to_owned();
78+
}
7579
}
7680

7781
pub struct ClientOptionsBuilder(ClientOptions);

src/consumer.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,21 @@ pub struct ConsumerBuilder {
8888
pub(crate) environment: Environment,
8989
pub(crate) offset_specification: OffsetSpecification,
9090
pub(crate) filter_configuration: Option<FilterConfiguration>,
91+
pub(crate) client_provided_name: String,
9192
}
9293

9394
impl ConsumerBuilder {
9495
pub async fn build(self, stream: &str) -> Result<Consumer, ConsumerCreateError> {
9596
// Connect to the user specified node first, then look for a random replica to connect to instead.
96-
// This is recommended for load balancing purposes.
97-
let mut client = self.environment.create_client().await?;
97+
// This is recommended for load balancing purposes
98+
99+
let mut opt_with_client_provided_name = self.environment.options.client_options.clone();
100+
opt_with_client_provided_name.client_provided_name = self.client_provided_name.clone();
101+
102+
let mut client = self
103+
.environment
104+
.create_client_with_options(opt_with_client_provided_name)
105+
.await?;
98106
let collector = self.environment.options.client_options.collector.clone();
99107
if let Some(metadata) = client.metadata(vec![stream.to_string()]).await?.get(stream) {
100108
// If there are no replicas we do not reassign client, meaning we just keep reading from the leader.
@@ -197,6 +205,11 @@ impl ConsumerBuilder {
197205
self
198206
}
199207

208+
pub fn client_provided_name(mut self, name: &str) -> Self {
209+
self.client_provided_name = String::from(name);
210+
self
211+
}
212+
200213
pub fn name(mut self, consumer_name: &str) -> Self {
201214
self.consumer_name = Some(String::from(consumer_name));
202215
self

src/environment.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ impl Environment {
4646
batch_publishing_delay: Duration::from_millis(100),
4747
data: PhantomData,
4848
filter_value_extractor: None,
49+
client_provided_name: String::from("rust-stream-producer"),
4950
}
5051
}
5152

@@ -56,11 +57,18 @@ impl Environment {
5657
environment: self.clone(),
5758
offset_specification: OffsetSpecification::Next,
5859
filter_configuration: None,
60+
client_provided_name: String::from("rust-stream-consumer"),
5961
}
6062
}
6163
pub(crate) async fn create_client(&self) -> RabbitMQStreamResult<Client> {
6264
Client::connect(self.options.client_options.clone()).await
6365
}
66+
pub(crate) async fn create_client_with_options(
67+
&self,
68+
opts: impl Into<ClientOptions>,
69+
) -> RabbitMQStreamResult<Client> {
70+
Client::connect(opts).await
71+
}
6472

6573
/// Delete a stream
6674
pub async fn delete_stream(&self, stream: &str) -> Result<(), StreamDeleteError> {
@@ -148,6 +156,11 @@ impl EnvironmentBuilder {
148156
self.0.client_options.load_balancer_mode = load_balancer_mode;
149157
self
150158
}
159+
160+
pub fn client_provided_name(mut self, name: &str) -> EnvironmentBuilder {
161+
self.0.client_options.client_provided_name = name.to_owned();
162+
self
163+
}
151164
}
152165
#[derive(Clone, Default)]
153166
pub struct EnvironmentOptions {

src/producer.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ pub struct ProducerBuilder<T> {
105105
pub batch_publishing_delay: Duration,
106106
pub(crate) data: PhantomData<T>,
107107
pub filter_value_extractor: Option<FilterValueExtractor>,
108+
pub(crate) client_provided_name: String,
108109
}
109110

110111
#[derive(Clone)]
@@ -117,7 +118,14 @@ impl<T> ProducerBuilder<T> {
117118
// Connect to the user specified node first, then look for the stream leader.
118119
// The leader is the recommended node for writing, because writing to a replica will redundantly pass these messages
119120
// to the leader anyway - it is the only one capable of writing.
120-
let mut client = self.environment.create_client().await?;
121+
122+
let mut opt_with_client_provided_name = self.environment.options.client_options.clone();
123+
opt_with_client_provided_name.client_provided_name = self.client_provided_name.clone();
124+
125+
let mut client = self
126+
.environment
127+
.create_client_with_options(opt_with_client_provided_name.clone())
128+
.await?;
121129

122130
let mut publish_version = 1;
123131

@@ -155,7 +163,7 @@ impl<T> ProducerBuilder<T> {
155163
client = Client::connect(ClientOptions {
156164
host: metadata.leader.host.clone(),
157165
port: metadata.leader.port as u16,
158-
..self.environment.options.client_options
166+
..opt_with_client_provided_name.clone()
159167
})
160168
.await?
161169
};
@@ -225,6 +233,12 @@ impl<T> ProducerBuilder<T> {
225233
self.batch_publishing_delay = delay;
226234
self
227235
}
236+
237+
pub fn client_provided_name(mut self, name: &str) -> Self {
238+
self.client_provided_name = String::from(name);
239+
self
240+
}
241+
228242
pub fn name(mut self, name: &str) -> ProducerBuilder<Dedup> {
229243
self.name = Some(name.to_owned());
230244
ProducerBuilder {
@@ -234,6 +248,7 @@ impl<T> ProducerBuilder<T> {
234248
batch_publishing_delay: self.batch_publishing_delay,
235249
data: PhantomData,
236250
filter_value_extractor: None,
251+
client_provided_name: String::from("rust-stream-producer"),
237252
}
238253
}
239254

tests/integration/client_test.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,15 @@ use crate::common::TestClient;
1717
#[tokio::test]
1818
async fn client_connection_test() {
1919
let client = Client::connect(ClientOptions::default()).await.unwrap();
20+
assert_ne!(client.server_properties().await.len(), 0);
21+
assert_ne!(client.connection_properties().await.len(), 0);
22+
}
2023

24+
#[tokio::test]
25+
async fn client_connection_with_properties_test() {
26+
let mut opts = ClientOptions::default();
27+
opts.set_client_provided_name("my_connection_name");
28+
let client = Client::connect(opts).await.unwrap();
2129
assert_ne!(client.server_properties().await.len(), 0);
2230
assert_ne!(client.connection_properties().await.len(), 0);
2331
}

tests/integration/consumer_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ async fn consumer_test() {
3232
let mut consumer = env
3333
.env
3434
.consumer()
35+
.client_provided_name("my consumer")
3536
.offset(OffsetSpecification::Next)
3637
.build(&env.stream)
3738
.await

tests/integration/producer_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ async fn producer_send_name_deduplication_unique_ids() {
5353
let mut producer = env
5454
.producer()
5555
.name("my_producer")
56+
.client_provided_name("my producer")
5657
.build(&stream)
5758
.await
5859
.unwrap();

0 commit comments

Comments
 (0)