Skip to content

Commit 05beea1

Browse files
author
Sergey Chernecov
committed
Added multiple consumer queues provider support
1 parent d49ead3 commit 05beea1

File tree

9 files changed

+293
-26
lines changed

9 files changed

+293
-26
lines changed

CHANGELOG

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
- 2014-11-27
2+
* Added interface `OldSound\RabbitMqBundle\Provider\QueuesProviderInterface`
3+
* Added `queues_provider` configuration for multiple consumer
4+
15
- 2014-07-21
26
* Added `reconnect` method into `OldSound\RabbitMqBundle\RabbitMq\BaseAmqp`
37

DependencyInjection/Configuration.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ protected function addMultipleConsumers(ArrayNodeDefinition $node)
139139
->booleanNode('global')->defaultFalse()->end()
140140
->end()
141141
->end()
142+
->scalarNode('queues_provider')->defaultNull()->end()
142143
->end()
143144
->end()
144145
->end()
@@ -243,7 +244,7 @@ protected function getQueueConfiguration()
243244
protected function getMultipleQueuesConfiguration()
244245
{
245246
$node = new ArrayNodeDefinition('queues');
246-
$prototypeNode = $node->requiresAtLeastOneElement()->prototype('array');
247+
$prototypeNode = $node->prototype('array');
247248

248249
$this->addQueueNodeConfiguration($prototypeNode);
249250

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,13 @@ protected function loadMultipleConsumers()
173173
foreach ($this->config['multiple_consumers'] as $key => $consumer) {
174174
$queues = array();
175175

176+
if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
177+
throw new InvalidConfigurationException(
178+
"Error on loading $key multiple consumer. " .
179+
"Either 'queues' or 'queues_provider' parameters should be defined."
180+
);
181+
}
182+
176183
foreach ($consumer['queues'] as $queueName => $queueOptions) {
177184
$queues[$queueOptions['name']] = $queueOptions;
178185
$queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
@@ -185,6 +192,13 @@ protected function loadMultipleConsumers()
185192
->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
186193
->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
187194

195+
if ($consumer['queues_provider']) {
196+
$definition->addMethodCall(
197+
'setQueuesProvider',
198+
array(new Reference($consumer['queues_provider']))
199+
);
200+
}
201+
188202
if (array_key_exists('qos_options', $consumer)) {
189203
$definition->addMethodCall('setQosOptions', array(
190204
$consumer['qos_options']['prefetch_size'],

Provider/QueuesProviderInterface.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Provider;
4+
5+
/**
6+
* Queues provider interface
7+
*
8+
* @author Sergey Chernecov <sergey.chernecov@intexsys.lv>
9+
*/
10+
interface QueuesProviderInterface
11+
{
12+
/**
13+
* Return array of queues
14+
*
15+
* @return array
16+
*/
17+
public function getQueues();
18+
}

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,7 @@ multiple_consumers:
487487
upload:
488488
connection: default
489489
exchange_options: {name: 'upload', type: direct}
490+
queues_provider: queues_provider_service
490491
queues:
491492
upload-picture:
492493
name: upload_picture
@@ -508,6 +509,9 @@ All the options of `queues-options` in the consumer are available for each queue
508509
509510
Be aware that all queues are under the same exchange, it's up to you to set the correct routing for callbacks.
510511
512+
The `queues_provider` is a optional service that dynamically provides queues.
513+
It must implement `QueuesProviderInterface`
514+
511515
### Anonymous Consumers ###
512516
513517
Now, why will we ever need anonymous consumers? This sounds like some internet threat or something… Keep reading.

RabbitMq/MultipleConsumer.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,34 @@
22

33
namespace OldSound\RabbitMqBundle\RabbitMq;
44

5+
use OldSound\RabbitMqBundle\Provider\QueuesProviderInterface;
56
use OldSound\RabbitMqBundle\RabbitMq\Exception\QueueNotFoundException;
67
use PhpAmqpLib\Message\AMQPMessage;
78

89
class MultipleConsumer extends Consumer
910
{
1011
protected $queues = array();
1112

13+
/**
14+
* Queues provider
15+
*
16+
* @var QueuesProviderInterface
17+
*/
18+
protected $queuesProvider = null;
19+
20+
/**
21+
* QueuesProvider setter
22+
*
23+
* @param QueuesProviderInterface $queuesProvider
24+
*
25+
* @return self
26+
*/
27+
public function setQueuesProvider(QueuesProviderInterface $queuesProvider)
28+
{
29+
$this->queuesProvider = $queuesProvider;
30+
return $this;
31+
}
32+
1233
public function getQueueConsumerTag($queue)
1334
{
1435
return sprintf('%s-%s', $this->getConsumerTag(), $queue);
@@ -21,6 +42,8 @@ public function setQueues(array $queues)
2142

2243
protected function setupConsumer()
2344
{
45+
$this->mergeQueues();
46+
2447
if ($this->autoSetupFabric) {
2548
$this->setupFabric();
2649
}
@@ -72,4 +95,17 @@ public function stopConsuming()
7295
$this->getChannel()->basic_cancel($this->getQueueConsumerTag($name));
7396
}
7497
}
98+
99+
/**
100+
* Merges static and provided queues into one array
101+
*/
102+
protected function mergeQueues()
103+
{
104+
if ($this->queuesProvider) {
105+
$this->queues = array_merge(
106+
$this->queues,
107+
$this->queuesProvider->getQueues()
108+
);
109+
}
110+
}
75111
}

Tests/DependencyInjection/Fixtures/test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ old_sound_rabbit_mq:
112112
- 'android.upload'
113113
- 'iphone.upload'
114114
callback: foo.multiple_test2.callback
115+
queues_provider: foo.queues_provider
115116

116117
anon_consumers:
117118
foo_anon_consumer:

Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,12 @@ public function testMultipleConsumerDefinition()
320320
)
321321
)
322322
)
323+
),
324+
array(
325+
'setQueuesProvider',
326+
array(
327+
new Reference('foo.queues_provider')
328+
)
323329
)
324330
),
325331
$definition->getMethodCalls()

0 commit comments

Comments
 (0)