Skip to content

Commit 7b19c5f

Browse files
committed
making consumer_update callback able to call async methods
1 parent 116193f commit 7b19c5f

File tree

5 files changed

+67
-129
lines changed

5 files changed

+67
-129
lines changed

examples/single_active_consumer/example_for_enrico.rs

Lines changed: 0 additions & 96 deletions
This file was deleted.

examples/single_active_consumer/single_active_consumer.rs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ use rabbitmq_stream_client::types::{
44
ByteCapacity, OffsetSpecification, ResponseCode,
55
};
66

7+
78
#[tokio::main]
89
async fn main() -> Result<(), Box<dyn std::error::Error>> {
910
use rabbitmq_stream_client::Environment;
1011
let environment = Environment::builder().build().await?;
1112
let message_count = 1000000;
12-
let stream = "hello-rust-stream";
13+
let stream = "hello-rust-super-stream-2";
1314

1415
let create_response = environment
1516
.stream_creator()
@@ -40,15 +41,31 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4041
.offset(OffsetSpecification::First)
4142
.enable_single_active_consumer(true)
4243
.client_provided_name("my super stream consumer for hello rust")
43-
.consumer_update(move |active, message_context| {
44-
println!("single active consumer: is active: {} on stream {}", active, message_context.get_stream());
45-
OffsetSpecification::First
44+
.consumer_update(move |active, message_context| async move {
45+
let name = message_context.name();
46+
let stream = message_context.stream();
47+
let client = message_context.client();
48+
49+
println!(
50+
"single active consumer: is active: {} on stream: {} with consumer_name: {}",
51+
active, stream, name
52+
);
53+
let stored_offset = client.query_offset(name, stream.as_str()).await;
54+
55+
if let Err(e) = stored_offset {
56+
return OffsetSpecification::First;
57+
}
58+
59+
let stored_offset_u = stored_offset.unwrap();
60+
println!("restarting from stored_offset: {}", stored_offset_u);
61+
OffsetSpecification::Offset(stored_offset_u)
62+
4663
})
4764
.build(stream)
4865
.await
4966
.unwrap();
5067

51-
for _ in 0..message_count {
68+
for i in 0..message_count {
5269
let delivery = consumer.next().await.unwrap();
5370
{
5471
let delivery = delivery.unwrap();
@@ -62,6 +79,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6279
delivery.stream(),
6380
delivery.offset()
6481
);
82+
83+
//store an offset
84+
if i == 10000 {
85+
let _ = consumer
86+
.store_offset(i)
87+
.await
88+
.unwrap_or_else(|e| println!("Err: {}", e));
89+
}
6590
}
6691
}
6792

src/consumer.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ use rabbitmq_stream_protocol::{
1616
};
1717

1818
use core::option::Option::None;
19-
19+
use futures::FutureExt;
20+
use std::future::Future;
2021
use tokio::sync::mpsc::{channel, Receiver, Sender};
2122
use tracing::trace;
2223

@@ -27,14 +28,14 @@ use crate::{
2728
error::{ConsumerCloseError, ConsumerCreateError, ConsumerDeliveryError},
2829
Client, ClientOptions, Environment, MetricsCollector,
2930
};
30-
use futures::{task::AtomicWaker, Stream};
31+
use futures::{future::BoxFuture, task::AtomicWaker, Stream};
3132
use rand::rngs::StdRng;
3233
use rand::{seq::SliceRandom, SeedableRng};
3334

3435
type FilterPredicate = Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>;
3536

3637
pub type ConsumerUpdateListener =
37-
Arc<dyn Fn(u8, &MessageContext) -> OffsetSpecification + Send + Sync>;
38+
Arc<dyn Fn(u8, MessageContext) -> BoxFuture<'static, OffsetSpecification> + Send + Sync>;
3839

3940
/// API for consuming RabbitMQ stream messages
4041
pub struct Consumer {
@@ -277,14 +278,14 @@ impl ConsumerBuilder {
277278
self
278279
}
279280

280-
pub fn consumer_update(
281+
pub fn consumer_update<Fut>(
281282
mut self,
282-
consumer_update_listener: impl Fn(u8, &MessageContext) -> OffsetSpecification
283-
+ Send
284-
+ Sync
285-
+ 'static,
286-
) -> Self {
287-
let f = Arc::new(consumer_update_listener);
283+
consumer_update_listener: impl Fn(u8, MessageContext) -> Fut + Send + Sync + 'static,
284+
) -> Self
285+
where
286+
Fut: Future<Output = OffsetSpecification> + Send + Sync + 'static,
287+
{
288+
let f = Arc::new(move |a, b| consumer_update_listener(a, b).boxed());
288289
self.consumer_update_listener = Some(f);
289290
self
290291
}
@@ -464,7 +465,7 @@ impl MessageHandler for ConsumerMessageHandler {
464465
let consumer_update_listener_callback =
465466
self.0.consumer_update_listener.clone().unwrap();
466467
let offset_specification =
467-
consumer_update_listener_callback(is_active, &message_context);
468+
consumer_update_listener_callback(is_active, message_context).await;
468469
let _ = self
469470
.0
470471
.client

src/superstream_consumer.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ use crate::{
66
error::ConsumerCreateError, ConsumerHandle, Environment, FilterConfiguration, MessageContext,
77
};
88
use futures::task::AtomicWaker;
9-
use futures::{Stream, StreamExt};
9+
use futures::FutureExt;
10+
use futures::Stream;
11+
use futures::StreamExt;
1012
use rabbitmq_stream_protocol::commands::subscribe::OffsetSpecification;
1113
use std::collections::HashMap;
14+
use std::future::Future;
1215
use std::pin::Pin;
1316
use std::sync::atomic::AtomicBool;
1417
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
@@ -130,15 +133,14 @@ impl SuperStreamConsumerBuilder {
130133
self.filter_configuration = filter_configuration;
131134
self
132135
}
133-
134136
pub fn consumer_update<Fut>(
135137
mut self,
136-
consumer_update_listener: impl Fn(u8, &MessageContext) -> OffsetSpecification
137-
+ Send
138-
+ Sync
139-
+ 'static,
140-
) -> Self {
141-
let f = Arc::new(consumer_update_listener);
138+
consumer_update_listener: impl Fn(u8, MessageContext) -> Fut + Send + Sync + 'static,
139+
) -> Self
140+
where
141+
Fut: Future<Output = OffsetSpecification> + Send + Sync + 'static,
142+
{
143+
let f = Arc::new(move |a, b| consumer_update_listener(a, b).boxed());
142144
self.consumer_update_listener = Some(f);
143145
self
144146
}

tests/integration/consumer_test.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -829,10 +829,12 @@ async fn super_stream_single_active_consumer_test_with_callback() {
829829
.consumer_update(move |active, message_context| {
830830
let mut result_stream_name_int = result_stream_name_outer.clone();
831831
let mut result_consumer_name_int = result_name_1_outer.clone();
832-
*result_stream_name_int.lock().unwrap() = message_context.get_stream().clone();
833-
*result_consumer_name_int.lock().unwrap() = message_context.get_name().clone().unwrap();
832+
async move {
833+
*result_stream_name_int.lock().unwrap() = message_context.stream().clone();
834+
*result_consumer_name_int.lock().unwrap() = message_context.name().clone();
834835

835-
OffsetSpecification::First
836+
OffsetSpecification::First
837+
}
836838
})
837839
.build(&env.super_stream)
838840
.await
@@ -847,9 +849,11 @@ async fn super_stream_single_active_consumer_test_with_callback() {
847849
.consumer_update(move |active, message_context| {
848850
let mut result_stream_name_int = result_stream_name_2_outer.clone();
849851
let mut result_consumer_name_int = result_name_2_outer.clone();
850-
*result_stream_name_int.lock().unwrap() = message_context.get_stream().clone();
851-
*result_consumer_name_int.lock().unwrap() = message_context.get_name().clone().unwrap();
852-
OffsetSpecification::First
852+
async move {
853+
*result_stream_name_int.lock().unwrap() = message_context.stream().clone();
854+
*result_consumer_name_int.lock().unwrap() = message_context.name().clone();
855+
OffsetSpecification::First
856+
}
853857
})
854858
.build(&env.super_stream)
855859
.await
@@ -864,9 +868,11 @@ async fn super_stream_single_active_consumer_test_with_callback() {
864868
.consumer_update(move |active, message_context| {
865869
let mut result_stream_name_int = result_stream_name_3_outer.clone();
866870
let mut result_consumer_name_int = result_name_3_outer.clone();
867-
*result_stream_name_int.lock().unwrap() = message_context.get_stream().clone();
868-
*result_consumer_name_int.lock().unwrap() = message_context.get_name().clone().unwrap();
869-
OffsetSpecification::First
871+
async move {
872+
*result_stream_name_int.lock().unwrap() = message_context.stream().clone();
873+
*result_consumer_name_int.lock().unwrap() = message_context.name().clone();
874+
OffsetSpecification::First
875+
}
870876
})
871877
.build(&env.super_stream)
872878
.await

0 commit comments

Comments
 (0)