@@ -142,3 +142,91 @@ impl EventPublisher for RabbitMqEventPublisher {
142
142
}
143
143
}
144
144
}
145
+
146
+ #[ cfg( test) ]
147
+ #[ cfg( feature = "integration-tests-events-rabbitmq" ) ]
148
+ mod integration_tests_events_rabbitmq {
149
+ use super :: * ;
150
+ use lapin:: {
151
+ options:: { BasicAckOptions , BasicConsumeOptions , QueueBindOptions , QueueDeclareOptions } ,
152
+ types:: FieldTable ,
153
+ Channel , Connection ,
154
+ } ;
155
+ use ldk_server_protos:: events:: event_envelope:: Event ;
156
+ use ldk_server_protos:: events:: PaymentForwarded ;
157
+ use std:: io;
158
+ use std:: time:: Duration ;
159
+ use tokio;
160
+
161
+ use futures_util:: stream:: StreamExt ;
162
+ #[ tokio:: test]
163
+ async fn test_publish_and_consume_event ( ) {
164
+ let config = RabbitMqConfig {
165
+ connection_string : "amqp://guest:guest@localhost:5672/%2f" . to_string ( ) ,
166
+ exchange_name : "test_exchange" . to_string ( ) ,
167
+ } ;
168
+
169
+ let publisher = RabbitMqEventPublisher :: new ( config. clone ( ) ) ;
170
+
171
+ let conn = Connection :: connect ( & config. connection_string , ConnectionProperties :: default ( ) )
172
+ . await
173
+ . expect ( "Failed make rabbitmq connection" ) ;
174
+ let channel = conn. create_channel ( ) . await . expect ( "Failed to create rabbitmq channel" ) ;
175
+
176
+ let queue_name = "test_queue" ;
177
+ setup_queue ( & queue_name, & channel, & config) . await ;
178
+
179
+ let event =
180
+ EventEnvelope { event : Some ( Event :: PaymentForwarded ( PaymentForwarded :: default ( ) ) ) } ;
181
+ publisher. publish ( event. clone ( ) ) . await . expect ( "Failed to publish event" ) ;
182
+
183
+ consume_event ( & queue_name, & channel, & event) . await . expect ( "Failed to consume event" ) ;
184
+ }
185
+
186
+ async fn setup_queue ( queue_name : & str , channel : & Channel , config : & RabbitMqConfig ) {
187
+ channel
188
+ . queue_declare ( queue_name, QueueDeclareOptions :: default ( ) , FieldTable :: default ( ) )
189
+ . await
190
+ . unwrap ( ) ;
191
+ channel
192
+ . exchange_declare (
193
+ & config. exchange_name ,
194
+ ExchangeKind :: Fanout ,
195
+ ExchangeDeclareOptions { durable : true , ..Default :: default ( ) } ,
196
+ FieldTable :: default ( ) ,
197
+ )
198
+ . await
199
+ . unwrap ( ) ;
200
+
201
+ channel
202
+ . queue_bind (
203
+ queue_name,
204
+ & config. exchange_name ,
205
+ "" ,
206
+ QueueBindOptions :: default ( ) ,
207
+ FieldTable :: default ( ) ,
208
+ )
209
+ . await
210
+ . unwrap ( ) ;
211
+ }
212
+
213
+ async fn consume_event (
214
+ queue_name : & str , channel : & Channel , expected_event : & EventEnvelope ,
215
+ ) -> io:: Result < ( ) > {
216
+ let mut consumer = channel
217
+ . basic_consume (
218
+ queue_name,
219
+ "test_consumer" ,
220
+ BasicConsumeOptions :: default ( ) ,
221
+ FieldTable :: default ( ) ,
222
+ )
223
+ . await
224
+ . unwrap ( ) ;
225
+ let delivery =
226
+ tokio:: time:: timeout ( Duration :: from_secs ( 10 ) , consumer. next ( ) ) . await ?. unwrap ( ) . unwrap ( ) ;
227
+ let received_event = EventEnvelope :: decode ( & * delivery. data ) ?;
228
+ assert_eq ! ( received_event, * expected_event, "Event mismatch" ) ;
229
+ channel. basic_ack ( delivery. delivery_tag , BasicAckOptions :: default ( ) ) . await . unwrap ( ) ;
230
+ Ok ( ( ) )
231
+ }
232
+ }
0 commit comments