Skip to content

Commit fcc11c1

Browse files
committed
enabling naming for super_stream consumers and setting up sac properties internally
1 parent 7904411 commit fcc11c1

File tree

6 files changed

+73
-27
lines changed

6 files changed

+73
-27
lines changed

examples/single_active_consumer/single_active_consumer_super_stream.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,23 +34,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3434
super_stream
3535
);
3636

37-
let mut properties = HashMap::new();
38-
39-
properties.insert("single-active-consumer".to_string(), "true".to_string());
40-
properties.insert("name".to_string(), "consumer-group-1".to_string());
41-
properties.insert("super-stream".to_string(), "hello-rust-super-stream".to_string());
42-
4337
let mut super_stream_consumer: SuperStreamConsumer = environment
4438
.super_stream_consumer()
39+
// Mandatory if sac is enabled
40+
.name("consumer-group-1")
4541
.offset(OffsetSpecification::First)
42+
.enable_single_active_consumer(true)
4643
.client_provided_name("my super stream consumer for hello rust")
47-
/*We can decide a strategy to manage Offset specification in single active consumer based on is_active flag
48-
By default if this clousure is not present the default strategy OffsetSpecification::NEXT will be set.*/
4944
.consumer_update(move |active, message_context| {
5045
println!("single active consumer: is active: {} on stream {}", active, message_context.get_stream());
5146
OffsetSpecification::First
5247
})
53-
.properties(properties)
5448
.build(super_stream)
5549
.await
5650
.unwrap();

src/consumer.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,20 @@ pub struct ConsumerBuilder {
113113
pub(crate) consumer_update_listener: Option<ConsumerUpdateListener>,
114114
pub(crate) client_provided_name: String,
115115
pub(crate) properties: HashMap<String, String>,
116+
pub(crate) is_single_active_consumer: bool,
116117
}
117118

118119
impl ConsumerBuilder {
119120
pub async fn build(mut self, stream: &str) -> Result<Consumer, ConsumerCreateError> {
121+
if (self.is_single_active_consumer
122+
|| self.properties.contains_key("single-active-consumer"))
123+
&& self.consumer_name.is_none()
124+
{
125+
return Err(ConsumerCreateError::SingleActiveConsumerNotSupported);
126+
}
127+
120128
// Connect to the user specified node first, then look for a random replica to connect to instead.
121129
// This is recommended for load balancing purposes
122-
123130
let mut opt_with_client_provided_name = self.environment.options.client_options.clone();
124131
opt_with_client_provided_name.client_provided_name = self.client_provided_name.clone();
125132

@@ -203,6 +210,13 @@ impl ConsumerBuilder {
203210
);
204211
}
205212

213+
if self.is_single_active_consumer {
214+
self.properties
215+
.insert("single-active-consumer".to_string(), "true".to_string());
216+
self.properties
217+
.insert("name".to_string(), self.consumer_name.clone().unwrap());
218+
}
219+
206220
let response = client
207221
.subscribe(
208222
subscription_id,
@@ -242,6 +256,16 @@ impl ConsumerBuilder {
242256
self
243257
}
244258

259+
pub fn name_optional(mut self, consumer_name: Option<String>) -> Self {
260+
self.consumer_name = consumer_name;
261+
self
262+
}
263+
264+
pub fn enable_single_active_consumer(mut self, is_single_active_consumer: bool) -> Self {
265+
self.is_single_active_consumer = is_single_active_consumer;
266+
self
267+
}
268+
245269
pub fn filter_input(mut self, filter_configuration: Option<FilterConfiguration>) -> Self {
246270
self.filter_configuration = filter_configuration;
247271
self

src/environment.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,17 +77,20 @@ impl Environment {
7777
consumer_update_listener: None,
7878
client_provided_name: String::from("rust-stream-consumer"),
7979
properties: HashMap::new(),
80+
is_single_active_consumer: false,
8081
}
8182
}
8283

8384
pub fn super_stream_consumer(&self) -> SuperStreamConsumerBuilder {
8485
SuperStreamConsumerBuilder {
86+
super_stream_consumer_name: None,
8587
environment: self.clone(),
8688
offset_specification: OffsetSpecification::Next,
8789
filter_configuration: None,
8890
consumer_update_listener: None,
8991
client_provided_name: String::from("rust-super-stream-consumer"),
9092
properties: HashMap::new(),
93+
is_single_active_consumer: false,
9194
}
9295
}
9396

src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,9 @@ pub enum ConsumerCreateError {
159159

160160
#[error("Filtering is not supported by the broker (requires RabbitMQ 3.13+ and stream_filtering feature flag activated)")]
161161
FilteringNotSupport,
162+
163+
#[error("if you set single active consumer a consumer and super_stream consumer name need to be setup")]
164+
SingleActiveConsumerNotSupported,
162165
}
163166

164167
#[derive(Error, Debug)]

src/superstream_consumer.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,30 @@ struct SuperStreamConsumerInternal {
3232

3333
/// Builder for [`Consumer`]
3434
pub struct SuperStreamConsumerBuilder {
35+
pub(crate) super_stream_consumer_name: Option<String>,
3536
pub(crate) environment: Environment,
3637
pub(crate) offset_specification: OffsetSpecification,
3738
pub(crate) filter_configuration: Option<FilterConfiguration>,
3839
pub(crate) consumer_update_listener: Option<ConsumerUpdateListener>,
3940
pub(crate) client_provided_name: String,
41+
pub(crate) is_single_active_consumer: bool,
4042
pub(crate) properties: HashMap<String, String>,
4143
}
4244

4345
impl SuperStreamConsumerBuilder {
4446
pub async fn build(
45-
self,
47+
&mut self,
4648
super_stream: &str,
4749
) -> Result<SuperStreamConsumer, ConsumerCreateError> {
4850
// Connect to the user specified node first, then look for a random replica to connect to instead.
4951
// This is recommended for load balancing purposes.
52+
if (self.is_single_active_consumer
53+
|| self.properties.contains_key("single-active-consumer"))
54+
&& self.super_stream_consumer_name.is_none()
55+
{
56+
return Err(ConsumerCreateError::SingleActiveConsumerNotSupported);
57+
}
58+
5059
let client = self.environment.create_client().await?;
5160
let (tx, rx) = channel(10000);
5261

@@ -58,17 +67,24 @@ impl SuperStreamConsumerBuilder {
5867
};
5968
let partitions = super_stream_metadata.partitions().await;
6069

70+
if self.is_single_active_consumer {
71+
self.properties
72+
.insert("super-stream".to_string(), super_stream.to_string());
73+
}
74+
6175
let mut handlers = Vec::<ConsumerHandle>::new();
6276
for partition in partitions.into_iter() {
6377
let tx_cloned = tx.clone();
6478
let mut consumer = self
6579
.environment
6680
.consumer()
81+
.name_optional(self.super_stream_consumer_name.clone())
6782
.offset(self.offset_specification.clone())
6883
.client_provided_name(self.client_provided_name.as_str())
6984
.filter_input(self.filter_configuration.clone())
7085
.consumer_update_arc(self.consumer_update_listener.clone())
7186
.properties(self.properties.clone())
87+
.enable_single_active_consumer(self.is_single_active_consumer)
7288
.build(partition.as_str())
7389
.await
7490
.unwrap();
@@ -100,6 +116,16 @@ impl SuperStreamConsumerBuilder {
100116
self
101117
}
102118

119+
pub fn name(mut self, consumer_name: &str) -> Self {
120+
self.super_stream_consumer_name = Some(String::from(consumer_name));
121+
self
122+
}
123+
124+
pub fn enable_single_active_consumer(mut self, is_single_active_consumer: bool) -> Self {
125+
self.is_single_active_consumer = is_single_active_consumer;
126+
self
127+
}
128+
103129
pub fn filter_input(mut self, filter_configuration: Option<FilterConfiguration>) -> Self {
104130
self.filter_configuration = filter_configuration;
105131
self

tests/integration/consumer_test.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -687,36 +687,34 @@ async fn super_stream_single_active_consumer_test() {
687687
.await
688688
.unwrap();
689689

690-
let mut properties = HashMap::new();
691690
let notify_received_messages = Arc::new(Notify::new());
692691

693-
properties.insert("single-active-consumer".to_string(), "true".to_string());
694-
properties.insert("name".to_string(), "consumer-group-1".to_string());
695-
properties.insert("super-stream".to_string(), env.super_stream.clone());
696-
697692
let mut super_stream_consumer: SuperStreamConsumer = env
698693
.env
699694
.super_stream_consumer()
695+
.name("super-stream-with-sac-enabled")
696+
.enable_single_active_consumer(true)
700697
.offset(OffsetSpecification::First)
701-
.properties(properties.clone())
702698
.build(&env.super_stream)
703699
.await
704700
.unwrap();
705701

706702
let mut super_stream_consumer_2: SuperStreamConsumer = env
707703
.env
708704
.super_stream_consumer()
705+
.name("super-stream-with-sac-enabled")
706+
.enable_single_active_consumer(true)
709707
.offset(OffsetSpecification::First)
710-
.properties(properties.clone())
711708
.build(&env.super_stream)
712709
.await
713710
.unwrap();
714711

715712
let mut super_stream_consumer_3: SuperStreamConsumer = env
716713
.env
717714
.super_stream_consumer()
715+
.name("super-stream-with-sac-enabled")
716+
.enable_single_active_consumer(true)
718717
.offset(OffsetSpecification::First)
719-
.properties(properties.clone())
720718
.build(&env.super_stream)
721719
.await
722720
.unwrap();
@@ -803,60 +801,58 @@ async fn super_stream_single_active_consumer_test_with_callback() {
803801
.await
804802
.unwrap();
805803

806-
let mut properties = HashMap::new();
807804
let notify_received_messages = Arc::new(Notify::new());
808805

809806
let mut result_stream_name_1 = Arc::new(Mutex::new(String::from("")));
810807
let mut result_stream_name_2 = Arc::new(Mutex::new(String::from("")));
811808
let mut result_stream_name_3 = Arc::new(Mutex::new(String::from("")));
812809

813-
properties.insert("single-active-consumer".to_string(), "true".to_string());
814-
properties.insert("name".to_string(), "consumer-group-1".to_string());
815-
properties.insert("super-stream".to_string(), env.super_stream.clone());
816-
817810
let mut result_stream_name_outer = result_stream_name_1.clone();
818811
let mut result_stream_name_2_outer = result_stream_name_2.clone();
819812
let mut result_stream_name_3_outer = result_stream_name_3.clone();
820813

821814
let mut super_stream_consumer: SuperStreamConsumer = env
822815
.env
823816
.super_stream_consumer()
817+
.name("super-stream-with-sac-enabled")
818+
.enable_single_active_consumer(true)
824819
.offset(OffsetSpecification::First)
825820
.consumer_update(move |active, message_context| {
826821
let mut result_consumer_name_int = result_stream_name_outer.clone();
827822
*result_consumer_name_int.lock().unwrap() = message_context.get_stream().clone();
828823

829824
OffsetSpecification::First
830825
})
831-
.properties(properties.clone())
832826
.build(&env.super_stream)
833827
.await
834828
.unwrap();
835829

836830
let mut super_stream_consumer_2: SuperStreamConsumer = env
837831
.env
838832
.super_stream_consumer()
833+
.name("super-stream-with-sac-enabled")
834+
.enable_single_active_consumer(true)
839835
.offset(OffsetSpecification::First)
840836
.consumer_update(move |active, message_context| {
841837
let mut result_consumer_name_int = result_stream_name_2_outer.clone();
842838
*result_consumer_name_int.lock().unwrap() = message_context.get_stream().clone();
843839
OffsetSpecification::First
844840
})
845-
.properties(properties.clone())
846841
.build(&env.super_stream)
847842
.await
848843
.unwrap();
849844

850845
let mut super_stream_consumer_3: SuperStreamConsumer = env
851846
.env
852847
.super_stream_consumer()
848+
.name("super-stream-with-sac-enabled")
849+
.enable_single_active_consumer(true)
853850
.offset(OffsetSpecification::First)
854851
.consumer_update(move |active, message_context| {
855852
let mut result_consumer_name_int = result_stream_name_3_outer.clone();
856853
*result_consumer_name_int.lock().unwrap() = message_context.get_stream().clone();
857854
OffsetSpecification::First
858855
})
859-
.properties(properties.clone())
860856
.build(&env.super_stream)
861857
.await
862858
.unwrap();

0 commit comments

Comments
 (0)