Skip to content

add ability to include properties to the consumers during subscriptions #249

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ pub struct ConsumerBuilder {
pub(crate) offset_specification: OffsetSpecification,
pub(crate) filter_configuration: Option<FilterConfiguration>,
pub(crate) client_provided_name: String,
pub(crate) properties: HashMap<String, String>,
}

impl ConsumerBuilder {
pub async fn build(self, stream: &str) -> Result<Consumer, ConsumerCreateError> {
pub async fn build(mut self, stream: &str) -> Result<Consumer, ConsumerCreateError> {
// Connect to the user specified node first, then look for a random replica to connect to instead.
// This is recommended for load balancing purposes

Expand Down Expand Up @@ -161,18 +162,17 @@ impl ConsumerBuilder {
let msg_handler = ConsumerMessageHandler(consumer.clone());
client.set_handler(msg_handler).await;

let mut properties = HashMap::new();
if let Some(filter_input) = self.filter_configuration {
if !client.filtering_supported() {
return Err(ConsumerCreateError::FilteringNotSupport);
}
for (index, item) in filter_input.filter_values.iter().enumerate() {
let key = format!("filter.{}", index);
properties.insert(key, item.to_owned());
self.properties.insert(key, item.to_owned());
}

let match_unfiltered_key = "match-unfiltered".to_string();
properties.insert(
self.properties.insert(
match_unfiltered_key,
filter_input.match_unfiltered.to_string(),
);
Expand All @@ -184,7 +184,7 @@ impl ConsumerBuilder {
stream,
self.offset_specification,
1,
properties,
self.properties.clone(),
)
.await?;

Expand Down Expand Up @@ -221,6 +221,11 @@ impl ConsumerBuilder {
self.filter_configuration = filter_configuration;
self
}

pub fn properties(mut self, properties: HashMap<String, String>) -> Self {
self.properties = properties;
self
}
}

impl Consumer {
Expand Down
3 changes: 3 additions & 0 deletions src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;

use crate::producer::NoDedup;
use crate::types::OffsetSpecification;
use std::collections::HashMap;

use crate::{
client::{Client, ClientOptions, MetricsCollector},
Expand Down Expand Up @@ -74,6 +75,7 @@ impl Environment {
offset_specification: OffsetSpecification::Next,
filter_configuration: None,
client_provided_name: String::from("rust-stream-consumer"),
properties: HashMap::new(),
}
}

Expand All @@ -83,6 +85,7 @@ impl Environment {
offset_specification: OffsetSpecification::Next,
filter_configuration: None,
client_provided_name: String::from("rust-super-stream-consumer"),
properties: HashMap::new(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/superstream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct SuperStreamConsumerBuilder {
pub(crate) offset_specification: OffsetSpecification,
pub(crate) filter_configuration: Option<FilterConfiguration>,
pub(crate) client_provided_name: String,
pub(crate) properties: HashMap<String, String>,
}

impl SuperStreamConsumerBuilder {
Expand Down Expand Up @@ -63,6 +64,7 @@ impl SuperStreamConsumerBuilder {
.offset(self.offset_specification.clone())
.client_provided_name(self.client_provided_name.as_str())
.filter_input(self.filter_configuration.clone())
.properties(self.properties.clone())
.build(partition.as_str())
.await
.unwrap();
Expand Down
Loading