1
1
use crate :: consumer:: Delivery ;
2
- use crate :: error:: ConsumerDeliveryError ;
2
+ use crate :: error:: { ConsumerCloseError , ConsumerDeliveryError } ;
3
3
use crate :: superstream:: DefaultSuperStreamMetadata ;
4
- use crate :: { error:: ConsumerCreateError , Environment } ;
4
+ use crate :: { error:: ConsumerCreateError , ConsumerHandle , Environment } ;
5
+ use futures:: task:: AtomicWaker ;
5
6
use futures:: { Stream , StreamExt } ;
6
7
use rabbitmq_stream_protocol:: commands:: subscribe:: OffsetSpecification ;
7
8
use std:: pin:: Pin ;
9
+ use std:: sync:: atomic:: AtomicBool ;
10
+ use std:: sync:: atomic:: Ordering :: { Relaxed , SeqCst } ;
11
+ use std:: sync:: Arc ;
8
12
use std:: task:: { Context , Poll } ;
9
13
use tokio:: sync:: mpsc:: { channel, Receiver } ;
10
14
use tokio:: task;
@@ -13,11 +17,14 @@ use tokio::task;
13
17
14
18
/// API for consuming RabbitMQ stream messages
15
19
pub struct SuperStreamConsumer {
16
- internal : SuperStreamConsumerInternal ,
20
+ internal : Arc < SuperStreamConsumerInternal > ,
21
+ receiver : Receiver < Result < Delivery , ConsumerDeliveryError > > ,
17
22
}
18
23
19
24
struct SuperStreamConsumerInternal {
20
- receiver : Receiver < Result < Delivery , ConsumerDeliveryError > > ,
25
+ closed : Arc < AtomicBool > ,
26
+ handlers : Vec < ConsumerHandle > ,
27
+ waker : AtomicWaker ,
21
28
}
22
29
23
30
/// Builder for [`Consumer`]
@@ -44,6 +51,7 @@ impl SuperStreamConsumerBuilder {
44
51
} ;
45
52
let partitions = super_stream_metadata. partitions ( ) . await ;
46
53
54
+ let mut handlers = Vec :: < ConsumerHandle > :: new ( ) ;
47
55
for partition in partitions. into_iter ( ) {
48
56
let tx_cloned = tx. clone ( ) ;
49
57
let mut consumer = self
@@ -54,17 +62,24 @@ impl SuperStreamConsumerBuilder {
54
62
. await
55
63
. unwrap ( ) ;
56
64
65
+ handlers. push ( consumer. handle ( ) ) ;
66
+
57
67
task:: spawn ( async move {
58
68
while let Some ( d) = consumer. next ( ) . await {
59
69
_ = tx_cloned. send ( d) . await ;
60
70
}
61
71
} ) ;
62
72
}
63
73
64
- let super_stream_consumer_internal = SuperStreamConsumerInternal { receiver : rx } ;
74
+ let super_stream_consumer_internal = SuperStreamConsumerInternal {
75
+ closed : Arc :: new ( AtomicBool :: new ( false ) ) ,
76
+ handlers,
77
+ waker : AtomicWaker :: new ( ) ,
78
+ } ;
65
79
66
80
Ok ( SuperStreamConsumer {
67
- internal : super_stream_consumer_internal,
81
+ internal : Arc :: new ( super_stream_consumer_internal) ,
82
+ receiver : rx,
68
83
} )
69
84
}
70
85
@@ -78,6 +93,46 @@ impl Stream for SuperStreamConsumer {
78
93
type Item = Result < Delivery , ConsumerDeliveryError > ;
79
94
80
95
fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
81
- Pin :: new ( & mut self . internal . receiver ) . poll_recv ( cx)
96
+ self . internal . waker . register ( cx. waker ( ) ) ;
97
+ let poll = Pin :: new ( & mut self . receiver ) . poll_recv ( cx) ;
98
+ match ( self . is_closed ( ) , poll. is_ready ( ) ) {
99
+ ( true , false ) => Poll :: Ready ( None ) ,
100
+ _ => poll,
101
+ }
102
+ }
103
+ }
104
+
105
+ impl SuperStreamConsumer {
106
+ /// Check if the consumer is closed
107
+ pub fn is_closed ( & self ) -> bool {
108
+ self . internal . is_closed ( )
109
+ }
110
+
111
+ pub fn handle ( & self ) -> SuperStreamConsumerHandle {
112
+ SuperStreamConsumerHandle ( self . internal . clone ( ) )
113
+ }
114
+ }
115
+
116
+ impl SuperStreamConsumerInternal {
117
+ fn is_closed ( & self ) -> bool {
118
+ self . closed . load ( Relaxed )
119
+ }
120
+ }
121
+
122
+ pub struct SuperStreamConsumerHandle ( Arc < SuperStreamConsumerInternal > ) ;
123
+
124
+ impl SuperStreamConsumerHandle {
125
+ /// Close the [`Consumer`] associated to this handle
126
+ pub async fn close ( self ) -> Result < ( ) , ConsumerCloseError > {
127
+ self . 0 . waker . wake ( ) ;
128
+ match self . 0 . closed . compare_exchange ( false , true , SeqCst , SeqCst ) {
129
+ Ok ( false ) => {
130
+ for handle in & self . 0 . handlers {
131
+ handle. internal_close ( ) . await . unwrap ( ) ;
132
+ }
133
+ Ok ( ( ) )
134
+ }
135
+ _ => Err ( ConsumerCloseError :: AlreadyClosed ) ,
136
+ }
82
137
}
83
138
}
0 commit comments