diff --git a/src/consumer.rs b/src/consumer.rs index a6d63457..519cc54c 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -89,10 +89,11 @@ pub struct ConsumerBuilder { pub(crate) offset_specification: OffsetSpecification, pub(crate) filter_configuration: Option, pub(crate) client_provided_name: String, + pub(crate) properties: HashMap, } impl ConsumerBuilder { - pub async fn build(self, stream: &str) -> Result { + pub async fn build(mut self, stream: &str) -> Result { // Connect to the user specified node first, then look for a random replica to connect to instead. // This is recommended for load balancing purposes @@ -161,18 +162,17 @@ impl ConsumerBuilder { let msg_handler = ConsumerMessageHandler(consumer.clone()); client.set_handler(msg_handler).await; - let mut properties = HashMap::new(); if let Some(filter_input) = self.filter_configuration { if !client.filtering_supported() { return Err(ConsumerCreateError::FilteringNotSupport); } for (index, item) in filter_input.filter_values.iter().enumerate() { let key = format!("filter.{}", index); - properties.insert(key, item.to_owned()); + self.properties.insert(key, item.to_owned()); } let match_unfiltered_key = "match-unfiltered".to_string(); - properties.insert( + self.properties.insert( match_unfiltered_key, filter_input.match_unfiltered.to_string(), ); @@ -184,7 +184,7 @@ impl ConsumerBuilder { stream, self.offset_specification, 1, - properties, + self.properties.clone(), ) .await?; @@ -221,6 +221,11 @@ impl ConsumerBuilder { self.filter_configuration = filter_configuration; self } + + pub fn properties(mut self, properties: HashMap) -> Self { + self.properties = properties; + self + } } impl Consumer { diff --git a/src/environment.rs b/src/environment.rs index 7b34e0a1..290415d1 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -4,6 +4,7 @@ use std::time::Duration; use crate::producer::NoDedup; use crate::types::OffsetSpecification; +use std::collections::HashMap; use crate::{ client::{Client, ClientOptions, MetricsCollector}, @@ -74,6 +75,7 @@ impl Environment { offset_specification: OffsetSpecification::Next, filter_configuration: None, client_provided_name: String::from("rust-stream-consumer"), + properties: HashMap::new(), } } @@ -83,6 +85,7 @@ impl Environment { offset_specification: OffsetSpecification::Next, filter_configuration: None, client_provided_name: String::from("rust-super-stream-consumer"), + properties: HashMap::new(), } } diff --git a/src/superstream_consumer.rs b/src/superstream_consumer.rs index baf2c5b7..708dd421 100644 --- a/src/superstream_consumer.rs +++ b/src/superstream_consumer.rs @@ -34,6 +34,7 @@ pub struct SuperStreamConsumerBuilder { pub(crate) offset_specification: OffsetSpecification, pub(crate) filter_configuration: Option, pub(crate) client_provided_name: String, + pub(crate) properties: HashMap, } impl SuperStreamConsumerBuilder { @@ -63,6 +64,7 @@ impl SuperStreamConsumerBuilder { .offset(self.offset_specification.clone()) .client_provided_name(self.client_provided_name.as_str()) .filter_input(self.filter_configuration.clone()) + .properties(self.properties.clone()) .build(partition.as_str()) .await .unwrap();