Skip to content

Commit 8287525

Browse files
committed
add ability to include properties to the consumers during subscriptions
1 parent 028f7e9 commit 8287525

File tree

3 files changed

+15
-4
lines changed

3 files changed

+15
-4
lines changed

src/consumer.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ pub struct ConsumerBuilder {
8989
pub(crate) offset_specification: OffsetSpecification,
9090
pub(crate) filter_configuration: Option<FilterConfiguration>,
9191
pub(crate) client_provided_name: String,
92+
pub(crate) properties: HashMap<String, String>,
9293
}
9394

9495
impl ConsumerBuilder {
@@ -161,18 +162,18 @@ impl ConsumerBuilder {
161162
let msg_handler = ConsumerMessageHandler(consumer.clone());
162163
client.set_handler(msg_handler).await;
163164

164-
let mut properties = HashMap::new();
165+
let properties = &self.properties;
165166
if let Some(filter_input) = self.filter_configuration {
166167
if !client.filtering_supported() {
167168
return Err(ConsumerCreateError::FilteringNotSupport);
168169
}
169170
for (index, item) in filter_input.filter_values.iter().enumerate() {
170171
let key = format!("filter.{}", index);
171-
properties.insert(key, item.to_owned());
172+
properties.clone().insert(key, item.to_owned());
172173
}
173174

174175
let match_unfiltered_key = "match-unfiltered".to_string();
175-
properties.insert(
176+
properties.clone().insert(
176177
match_unfiltered_key,
177178
filter_input.match_unfiltered.to_string(),
178179
);
@@ -184,7 +185,7 @@ impl ConsumerBuilder {
184185
stream,
185186
self.offset_specification,
186187
1,
187-
properties,
188+
properties.clone(),
188189
)
189190
.await?;
190191

@@ -221,6 +222,11 @@ impl ConsumerBuilder {
221222
self.filter_configuration = filter_configuration;
222223
self
223224
}
225+
226+
pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
227+
self.properties = properties;
228+
self
229+
}
224230
}
225231

226232
impl Consumer {

src/environment.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::time::Duration;
44

55
use crate::producer::NoDedup;
66
use crate::types::OffsetSpecification;
7+
use std::collections::HashMap;
78

89
use crate::{
910
client::{Client, ClientOptions, MetricsCollector},
@@ -74,6 +75,7 @@ impl Environment {
7475
offset_specification: OffsetSpecification::Next,
7576
filter_configuration: None,
7677
client_provided_name: String::from("rust-stream-consumer"),
78+
properties: HashMap::new(),
7779
}
7880
}
7981

@@ -83,6 +85,7 @@ impl Environment {
8385
offset_specification: OffsetSpecification::Next,
8486
filter_configuration: None,
8587
client_provided_name: String::from("rust-super-stream-consumer"),
88+
properties: HashMap::new(),
8689
}
8790
}
8891

src/superstream_consumer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub struct SuperStreamConsumerBuilder {
3434
pub(crate) offset_specification: OffsetSpecification,
3535
pub(crate) filter_configuration: Option<FilterConfiguration>,
3636
pub(crate) client_provided_name: String,
37+
pub(crate) properties: HashMap<String, String>,
3738
}
3839

3940
impl SuperStreamConsumerBuilder {
@@ -63,6 +64,7 @@ impl SuperStreamConsumerBuilder {
6364
.offset(self.offset_specification.clone())
6465
.client_provided_name(self.client_provided_name.as_str())
6566
.filter_input(self.filter_configuration.clone())
67+
.properties(self.properties.clone())
6668
.build(partition.as_str())
6769
.await
6870
.unwrap();

0 commit comments

Comments
 (0)