@@ -15,6 +15,8 @@ use rabbitmq_stream_protocol::{
15
15
commands:: subscribe:: OffsetSpecification , message:: Message , ResponseKind ,
16
16
} ;
17
17
18
+ use core:: option:: Option :: None ;
19
+
18
20
use tokio:: sync:: mpsc:: { channel, Receiver , Sender } ;
19
21
use tracing:: trace;
20
22
@@ -26,13 +28,13 @@ use crate::{
26
28
Client , ClientOptions , Environment , MetricsCollector ,
27
29
} ;
28
30
use futures:: { task:: AtomicWaker , Stream } ;
29
- use rabbitmq_stream_protocol:: commands:: consumer_update:: ConsumerUpdateCommand ;
30
31
use rand:: rngs:: StdRng ;
31
32
use rand:: { seq:: SliceRandom , SeedableRng } ;
32
33
33
34
type FilterPredicate = Option < Arc < dyn Fn ( & Message ) -> bool + Send + Sync > > ;
34
35
35
- type ConsumerUpdateListener = Option < Arc < dyn Fn ( bool , & MessageContext ) -> u64 + Send + Sync > > ;
36
+ pub type ConsumerUpdateListener =
37
+ Arc < dyn Fn ( u8 , & MessageContext ) -> OffsetSpecification + Send + Sync > ;
36
38
37
39
/// API for consuming RabbitMQ stream messages
38
40
pub struct Consumer {
@@ -43,6 +45,7 @@ pub struct Consumer {
43
45
}
44
46
45
47
struct ConsumerInternal {
48
+ name : Option < String > ,
46
49
client : Client ,
47
50
stream : String ,
48
51
offset_specification : OffsetSpecification ,
@@ -52,6 +55,7 @@ struct ConsumerInternal {
52
55
waker : AtomicWaker ,
53
56
metrics_collector : Arc < dyn MetricsCollector > ,
54
57
filter_configuration : Option < FilterConfiguration > ,
58
+ consumer_update_listener : Option < ConsumerUpdateListener > ,
55
59
}
56
60
57
61
impl ConsumerInternal {
@@ -86,22 +90,17 @@ impl FilterConfiguration {
86
90
}
87
91
88
92
pub struct MessageContext {
89
- consumer : Consumer ,
90
- subscriber_name : String ,
91
- reference : String ,
93
+ consumer_name : Option < String > ,
94
+ stream : String ,
92
95
}
93
96
94
97
impl MessageContext {
95
- pub fn get_consumer ( self ) -> Consumer {
96
- self . consumer
98
+ pub fn get_name ( self ) -> Option < String > {
99
+ self . consumer_name
97
100
}
98
101
99
- pub fn get_subscriber_name ( self ) -> String {
100
- self . subscriber_name
101
- }
102
-
103
- pub fn get_reference ( self ) -> String {
104
- self . reference
102
+ pub fn get_stream ( self ) -> String {
103
+ self . stream
105
104
}
106
105
}
107
106
@@ -111,6 +110,7 @@ pub struct ConsumerBuilder {
111
110
pub ( crate ) environment : Environment ,
112
111
pub ( crate ) offset_specification : OffsetSpecification ,
113
112
pub ( crate ) filter_configuration : Option < FilterConfiguration > ,
113
+ pub ( crate ) consumer_update_listener : Option < ConsumerUpdateListener > ,
114
114
pub ( crate ) client_provided_name : String ,
115
115
pub ( crate ) properties : HashMap < String , String > ,
116
116
}
@@ -172,6 +172,7 @@ impl ConsumerBuilder {
172
172
let subscription_id = 1 ;
173
173
let ( tx, rx) = channel ( 10000 ) ;
174
174
let consumer = Arc :: new ( ConsumerInternal {
175
+ name : self . consumer_name . clone ( ) ,
175
176
subscription_id,
176
177
stream : stream. to_string ( ) ,
177
178
client : client. clone ( ) ,
@@ -181,6 +182,7 @@ impl ConsumerBuilder {
181
182
waker : AtomicWaker :: new ( ) ,
182
183
metrics_collector : collector,
183
184
filter_configuration : self . filter_configuration . clone ( ) ,
185
+ consumer_update_listener : None ,
184
186
} ) ;
185
187
let msg_handler = ConsumerMessageHandler ( consumer. clone ( ) ) ;
186
188
client. set_handler ( msg_handler) . await ;
@@ -213,7 +215,7 @@ impl ConsumerBuilder {
213
215
214
216
if response. is_ok ( ) {
215
217
Ok ( Consumer {
216
- name : self . consumer_name ,
218
+ name : self . consumer_name . clone ( ) ,
217
219
receiver : rx,
218
220
internal : consumer,
219
221
} )
@@ -245,6 +247,26 @@ impl ConsumerBuilder {
245
247
self
246
248
}
247
249
250
+ pub fn consumer_update (
251
+ mut self ,
252
+ consumer_update_listener : impl Fn ( u8 , & MessageContext ) -> OffsetSpecification
253
+ + Send
254
+ + Sync
255
+ + ' static ,
256
+ ) -> Self {
257
+ let f = Arc :: new ( consumer_update_listener) ;
258
+ self . consumer_update_listener = Some ( f) ;
259
+ self
260
+ }
261
+
262
+ pub fn consumer_update_arc (
263
+ mut self ,
264
+ consumer_update_listener : Option < crate :: consumer:: ConsumerUpdateListener > ,
265
+ ) -> Self {
266
+ self . consumer_update_listener = consumer_update_listener;
267
+ self
268
+ }
269
+
248
270
pub fn properties ( mut self , properties : HashMap < String , String > ) -> Self {
249
271
self . properties = properties;
250
272
self
@@ -386,8 +408,38 @@ impl MessageHandler for ConsumerMessageHandler {
386
408
// TODO handle credit fail
387
409
let _ = self . 0 . client . credit ( self . 0 . subscription_id , 1 ) . await ;
388
410
self . 0 . metrics_collector . consume ( len as u64 ) . await ;
389
- } else {
390
- println ! ( "other message arrived" ) ;
411
+ } else if let ResponseKind :: ConsumerUpdate ( consumer_update) = response. kind_ref ( ) {
412
+ println ! ( "Consumer update arrived" ) ;
413
+
414
+ if self . 0 . consumer_update_listener . is_none ( ) {
415
+ let offset_specification = OffsetSpecification :: Next ;
416
+ let _ = self
417
+ . 0
418
+ . client
419
+ . consumer_update (
420
+ consumer_update. get_correlation_id ( ) ,
421
+ offset_specification,
422
+ )
423
+ . await ;
424
+ } else {
425
+ let is_active = consumer_update. is_active ( ) ;
426
+ let message_context = MessageContext {
427
+ consumer_name : self . 0 . name . clone ( ) ,
428
+ stream : self . 0 . stream . clone ( ) ,
429
+ } ;
430
+ let consumer_update_listener_callback =
431
+ self . 0 . consumer_update_listener . clone ( ) . unwrap ( ) ;
432
+ let offset_specification =
433
+ consumer_update_listener_callback ( is_active, & message_context) ;
434
+ let _ = self
435
+ . 0
436
+ . client
437
+ . consumer_update (
438
+ consumer_update. get_correlation_id ( ) ,
439
+ offset_specification,
440
+ )
441
+ . await ;
442
+ }
391
443
}
392
444
}
393
445
Some ( Err ( err) ) => {
0 commit comments