Skip to content

Commit c99c90f

Browse files
committed
Merge pull request #215 from intexsys/multiple-consumer-queues-provider
Added multiple consumer queues provider support
2 parents 271e51b + 05beea1 commit c99c90f

File tree

9 files changed

+293
-26
lines changed

9 files changed

+293
-26
lines changed

CHANGELOG

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
- 2014-11-27
2+
* Added interface `OldSound\RabbitMqBundle\Provider\QueuesProviderInterface`
3+
* Added `queues_provider` configuration for multiple consumer
4+
15
- 2014-07-21
26
* Added `reconnect` method into `OldSound\RabbitMqBundle\RabbitMq\BaseAmqp`
37

DependencyInjection/Configuration.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ protected function addMultipleConsumers(ArrayNodeDefinition $node)
139139
->booleanNode('global')->defaultFalse()->end()
140140
->end()
141141
->end()
142+
->scalarNode('queues_provider')->defaultNull()->end()
142143
->end()
143144
->end()
144145
->end()
@@ -243,7 +244,7 @@ protected function getQueueConfiguration()
243244
protected function getMultipleQueuesConfiguration()
244245
{
245246
$node = new ArrayNodeDefinition('queues');
246-
$prototypeNode = $node->requiresAtLeastOneElement()->prototype('array');
247+
$prototypeNode = $node->prototype('array');
247248

248249
$this->addQueueNodeConfiguration($prototypeNode);
249250

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,13 @@ protected function loadMultipleConsumers()
173173
foreach ($this->config['multiple_consumers'] as $key => $consumer) {
174174
$queues = array();
175175

176+
if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
177+
throw new InvalidConfigurationException(
178+
"Error on loading $key multiple consumer. " .
179+
"Either 'queues' or 'queues_provider' parameters should be defined."
180+
);
181+
}
182+
176183
foreach ($consumer['queues'] as $queueName => $queueOptions) {
177184
$queues[$queueOptions['name']] = $queueOptions;
178185
$queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
@@ -185,6 +192,13 @@ protected function loadMultipleConsumers()
185192
->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
186193
->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
187194

195+
if ($consumer['queues_provider']) {
196+
$definition->addMethodCall(
197+
'setQueuesProvider',
198+
array(new Reference($consumer['queues_provider']))
199+
);
200+
}
201+
188202
if (array_key_exists('qos_options', $consumer)) {
189203
$definition->addMethodCall('setQosOptions', array(
190204
$consumer['qos_options']['prefetch_size'],

Provider/QueuesProviderInterface.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Provider;
4+
5+
/**
6+
* Queues provider interface
7+
*
8+
* @author Sergey Chernecov <sergey.chernecov@intexsys.lv>
9+
*/
10+
interface QueuesProviderInterface
11+
{
12+
/**
13+
* Return array of queues
14+
*
15+
* @return array
16+
*/
17+
public function getQueues();
18+
}

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,7 @@ multiple_consumers:
488488
upload:
489489
connection: default
490490
exchange_options: {name: 'upload', type: direct}
491+
queues_provider: queues_provider_service
491492
queues:
492493
upload-picture:
493494
name: upload_picture
@@ -509,6 +510,9 @@ All the options of `queues-options` in the consumer are available for each queue
509510
510511
Be aware that all queues are under the same exchange, it's up to you to set the correct routing for callbacks.
511512
513+
The `queues_provider` is a optional service that dynamically provides queues.
514+
It must implement `QueuesProviderInterface`
515+
512516
### Anonymous Consumers ###
513517
514518
Now, why will we ever need anonymous consumers? This sounds like some internet threat or something… Keep reading.

RabbitMq/MultipleConsumer.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,34 @@
22

33
namespace OldSound\RabbitMqBundle\RabbitMq;
44

5+
use OldSound\RabbitMqBundle\Provider\QueuesProviderInterface;
56
use OldSound\RabbitMqBundle\RabbitMq\Exception\QueueNotFoundException;
67
use PhpAmqpLib\Message\AMQPMessage;
78

89
class MultipleConsumer extends Consumer
910
{
1011
protected $queues = array();
1112

13+
/**
14+
* Queues provider
15+
*
16+
* @var QueuesProviderInterface
17+
*/
18+
protected $queuesProvider = null;
19+
20+
/**
21+
* QueuesProvider setter
22+
*
23+
* @param QueuesProviderInterface $queuesProvider
24+
*
25+
* @return self
26+
*/
27+
public function setQueuesProvider(QueuesProviderInterface $queuesProvider)
28+
{
29+
$this->queuesProvider = $queuesProvider;
30+
return $this;
31+
}
32+
1233
public function getQueueConsumerTag($queue)
1334
{
1435
return sprintf('%s-%s', $this->getConsumerTag(), $queue);
@@ -21,6 +42,8 @@ public function setQueues(array $queues)
2142

2243
protected function setupConsumer()
2344
{
45+
$this->mergeQueues();
46+
2447
if ($this->autoSetupFabric) {
2548
$this->setupFabric();
2649
}
@@ -72,4 +95,17 @@ public function stopConsuming()
7295
$this->getChannel()->basic_cancel($this->getQueueConsumerTag($name));
7396
}
7497
}
98+
99+
/**
100+
* Merges static and provided queues into one array
101+
*/
102+
protected function mergeQueues()
103+
{
104+
if ($this->queuesProvider) {
105+
$this->queues = array_merge(
106+
$this->queues,
107+
$this->queuesProvider->getQueues()
108+
);
109+
}
110+
}
75111
}

Tests/DependencyInjection/Fixtures/test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ old_sound_rabbit_mq:
112112
- 'android.upload'
113113
- 'iphone.upload'
114114
callback: foo.multiple_test2.callback
115+
queues_provider: foo.queues_provider
115116

116117
anon_consumers:
117118
foo_anon_consumer:

Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,12 @@ public function testMultipleConsumerDefinition()
320320
)
321321
)
322322
)
323+
),
324+
array(
325+
'setQueuesProvider',
326+
array(
327+
new Reference('foo.queues_provider')
328+
)
323329
)
324330
),
325331
$definition->getMethodCalls()

0 commit comments

Comments
 (0)