Skip to content

Commit 4dc096a

Browse files
committed
making Delivery export client in order to use store_offset and review super_stream example
1 parent 7b19c5f commit 4dc096a

File tree

3 files changed

+34
-3
lines changed

3 files changed

+34
-3
lines changed

examples/single_active_consumer/single_active_consumer_super_stream.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,24 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4141
.offset(OffsetSpecification::First)
4242
.enable_single_active_consumer(true)
4343
.client_provided_name("my super stream consumer for hello rust")
44-
.consumer_update(move |active, message_context| {
45-
println!("single active consumer: is active: {} on stream {}", active, message_context.get_stream());
46-
OffsetSpecification::First
44+
.consumer_update(move |active, message_context| async move {
45+
let name = message_context.name();
46+
let stream = message_context.stream();
47+
let client = message_context.client();
48+
49+
println!(
50+
"single active consumer: is active: {} on stream: {} with consumer_name: {}",
51+
active, stream, name
52+
);
53+
let stored_offset = client.query_offset(name, stream.as_str()).await;
54+
55+
if let Err(e) = stored_offset {
56+
return OffsetSpecification::First;
57+
}
58+
let stored_offset_u = stored_offset.unwrap();
59+
println!("stored_offset_u {}", stored_offset_u.clone());
60+
OffsetSpecification::Offset(stored_offset_u)
61+
4762
})
4863
.build(super_stream)
4964
.await
@@ -63,6 +78,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6378
delivery.stream(),
6479
delivery.offset()
6580
);
81+
82+
// Store an offset for every consumer
83+
if delivery.consumer_name().is_some() && delivery.offset() == 1000 {
84+
super_stream_consumer.client().store_offset(delivery.consumer_name().unwrap().as_str(), delivery.stream().as_str(), delivery.offset()).await;
85+
}
86+
6687
}
6788
}
6889

src/consumer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ impl MessageHandler for ConsumerMessageHandler {
427427
.0
428428
.sender
429429
.send(Ok(Delivery {
430+
name: self.0.name.clone(),
430431
stream: self.0.stream.clone(),
431432
subscription_id: self.0.subscription_id,
432433
message,
@@ -493,6 +494,7 @@ impl MessageHandler for ConsumerMessageHandler {
493494
/// Envelope from incoming message
494495
#[derive(Debug)]
495496
pub struct Delivery {
497+
name: Option<String>,
496498
stream: String,
497499
subscription_id: u8,
498500
message: Message,
@@ -519,4 +521,8 @@ impl Delivery {
519521
pub fn offset(&self) -> u64 {
520522
self.offset
521523
}
524+
525+
pub fn consumer_name(&self) -> Option<String> {
526+
self.name.clone()
527+
}
522528
}

src/superstream_consumer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,10 @@ impl SuperStreamConsumer {
178178
pub fn handle(&self) -> SuperStreamConsumerHandle {
179179
SuperStreamConsumerHandle(self.internal.clone())
180180
}
181+
182+
pub fn client(&self) -> Client {
183+
self.internal.client.clone()
184+
}
181185
}
182186

183187
impl SuperStreamConsumerInternal {

0 commit comments

Comments
 (0)