@@ -158,14 +158,17 @@ protected function loadConsumers()
158
158
$ this ->injectLoggedChannel ($ definition , $ key , $ consumer ['connection ' ]);
159
159
}
160
160
161
- $ this ->container ->setDefinition (sprintf ('old_sound_rabbit_mq.%s_consumer ' , $ key ), $ definition );
161
+ $ name = sprintf ('old_sound_rabbit_mq.%s_consumer ' , $ key );
162
+ $ this ->container ->setDefinition ($ name , $ definition );
163
+ $ this ->addDequeuerAwareCall ($ consumer ['callback ' ], $ name );
162
164
}
163
165
}
164
166
165
167
protected function loadMultipleConsumers ()
166
168
{
167
169
foreach ($ this ->config ['multiple_consumers ' ] as $ key => $ consumer ) {
168
170
$ queues = array ();
171
+ $ callbacks = array ();
169
172
170
173
if (empty ($ consumer ['queues ' ]) && empty ($ consumer ['queues_provider ' ])) {
171
174
throw new InvalidConfigurationException (
@@ -177,6 +180,7 @@ protected function loadMultipleConsumers()
177
180
foreach ($ consumer ['queues ' ] as $ queueName => $ queueOptions ) {
178
181
$ queues [$ queueOptions ['name ' ]] = $ queueOptions ;
179
182
$ queues [$ queueOptions ['name ' ]]['callback ' ] = array (new Reference ($ queueOptions ['callback ' ]), 'execute ' );
183
+ $ callbacks [] = new Reference ($ queueOptions ['callback ' ]);
180
184
}
181
185
182
186
$ definition = new Definition ('%old_sound_rabbit_mq.multi_consumer.class% ' );
@@ -213,7 +217,14 @@ protected function loadMultipleConsumers()
213
217
$ this ->injectLoggedChannel ($ definition , $ key , $ consumer ['connection ' ]);
214
218
}
215
219
216
- $ this ->container ->setDefinition (sprintf ('old_sound_rabbit_mq.%s_multiple ' , $ key ), $ definition );
220
+ $ name = sprintf ('old_sound_rabbit_mq.%s_multiple ' , $ key );
221
+ $ this ->container ->setDefinition ($ name , $ definition );
222
+ if ($ consumer ['queues_provider ' ]) {
223
+ $ this ->addDequeuerAwareCall ($ consumer ['queues_provider ' ], $ name );
224
+ }
225
+ foreach ($ callbacks as $ callback ) {
226
+ $ this ->addDequeuerAwareCall ($ callback , $ name );
227
+ }
217
228
}
218
229
}
219
230
@@ -231,7 +242,9 @@ protected function loadAnonConsumers()
231
242
$ this ->injectLoggedChannel ($ definition , $ key , $ anon ['connection ' ]);
232
243
}
233
244
234
- $ this ->container ->setDefinition (sprintf ('old_sound_rabbit_mq.%s_anon ' , $ key ), $ definition );
245
+ $ name = sprintf ('old_sound_rabbit_mq.%s_anon ' , $ key );
246
+ $ this ->container ->setDefinition ($ name , $ definition );
247
+ $ this ->addDequeuerAwareCall ($ anon ['callback ' ], $ name );
235
248
}
236
249
}
237
250
@@ -355,4 +368,23 @@ public function getAlias()
355
368
{
356
369
return 'old_sound_rabbit_mq ' ;
357
370
}
371
+
372
+ /**
373
+ * Add proper dequeuer aware call
374
+ *
375
+ * @param string $callback
376
+ * @param string $name
377
+ */
378
+ protected function addDequeuerAwareCall ($ callback , $ name )
379
+ {
380
+ if (! $ this ->container ->has ($ callback )) {
381
+ return ;
382
+ }
383
+
384
+ $ callbackDefinition = $ this ->container ->findDefinition ($ callback );
385
+ $ refClass = new \ReflectionClass ($ callbackDefinition ->getClass ());
386
+ if ($ refClass ->implementsInterface ('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface ' )) {
387
+ $ callbackDefinition ->addMethodCall ('setDequeuer ' , array (new Reference ($ name )));
388
+ }
389
+ }
358
390
}
0 commit comments