Skip to content

Commit 915f68f

Browse files
committed
Merge pull request #266 from tiborb/rabbitmq-dynamic-consumer
Added rabbitmq:dynamic-consumer command
2 parents b88c68d + 80862d6 commit 915f68f

File tree

13 files changed

+424
-15
lines changed

13 files changed

+424
-15
lines changed

Command/BaseConsumerCommand.php

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,18 @@ protected function execute(InputInterface $input, OutputInterface $output)
8787
if (0 > $this->amount) {
8888
throw new \InvalidArgumentException("The -m option should be null or greater than 0");
8989
}
90-
90+
$this->initConsumer($input);
91+
92+
$this->consumer->consume($this->amount);
93+
}
94+
95+
protected function initConsumer($input) {
9196
$this->consumer = $this->getContainer()
92-
->get(sprintf($this->getConsumerService(), $input->getArgument('name')));
97+
->get(sprintf($this->getConsumerService(), $input->getArgument('name')));
9398

94-
if (!is_null($input->getOption('memory-limit')) && ctype_digit((string)$input->getOption('memory-limit')) && $input->getOption('memory-limit') > 0) {
99+
if (!is_null($input->getOption('memory-limit')) && ctype_digit((string) $input->getOption('memory-limit')) && $input->getOption('memory-limit') > 0) {
95100
$this->consumer->setMemoryLimit($input->getOption('memory-limit'));
96101
}
97102
$this->consumer->setRoutingKey($input->getOption('route'));
98-
$this->consumer->consume($this->amount);
99103
}
100104
}

Command/DynamicConsumerCommand.php

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
/**
4+
* DynamicConsumerCommand
5+
*
6+
* The context argument is passed to the consumer instance
7+
* which can decide about the queue and routings it uses.
8+
*
9+
* @author Tibor Barna <tibor.barna@wiredminds.de>
10+
*/
11+
namespace OldSound\RabbitMqBundle\Command;
12+
13+
use OldSound\RabbitMqBundle\Command\BaseConsumerCommand;
14+
use Symfony\Component\Console\Input\InputArgument;
15+
16+
class DynamicConsumerCommand extends BaseConsumerCommand
17+
{
18+
protected function configure()
19+
{
20+
parent::configure();
21+
22+
$this
23+
->setName('rabbitmq:dynamic-consumer')
24+
->setDescription('Context-aware consumer')
25+
->addArgument('context', InputArgument::REQUIRED, 'Context the consumer runs in')
26+
;
27+
}
28+
29+
protected function getConsumerService()
30+
{
31+
return 'old_sound_rabbit_mq.%s_dynamic';
32+
}
33+
34+
protected function initConsumer($input)
35+
{
36+
parent::initConsumer($input);
37+
$this->consumer->setContext($input->getArgument('context'));
38+
}
39+
}

DependencyInjection/Configuration.php

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public function getConfigTreeBuilder()
3131
$this->addProducers($rootNode);
3232
$this->addConsumers($rootNode);
3333
$this->addMultipleConsumers($rootNode);
34+
$this->addDynamicConsumers($rootNode);
3435
$this->addAnonConsumers($rootNode);
3536
$this->addRpcClients($rootNode);
3637
$this->addRpcServers($rootNode);
@@ -152,6 +153,37 @@ protected function addMultipleConsumers(ArrayNodeDefinition $node)
152153
->end()
153154
;
154155
}
156+
157+
protected function addDynamicConsumers(ArrayNodeDefinition $node)
158+
{
159+
$node
160+
->fixXmlConfig('dynamic_consumer')
161+
->children()
162+
->arrayNode('dynamic_consumers')
163+
->canBeUnset()
164+
->useAttributeAsKey('key')
165+
->prototype('array')
166+
->append($this->getExchangeConfiguration())
167+
->children()
168+
->scalarNode('connection')->defaultValue('default')->end()
169+
->scalarNode('callback')->isRequired()->end()
170+
->scalarNode('idle_timeout')->end()
171+
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
172+
->arrayNode('qos_options')
173+
->canBeUnset()
174+
->children()
175+
->scalarNode('prefetch_size')->defaultValue(0)->end()
176+
->scalarNode('prefetch_count')->defaultValue(0)->end()
177+
->booleanNode('global')->defaultFalse()->end()
178+
->end()
179+
->end()
180+
->scalarNode('queue_options_provider')->isRequired()->end()
181+
->end()
182+
->end()
183+
->end()
184+
->end()
185+
;
186+
}
155187

156188
protected function addAnonConsumers(ArrayNodeDefinition $node)
157189
{

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public function load(array $configs, ContainerBuilder $container)
4949
$this->loadProducers();
5050
$this->loadConsumers();
5151
$this->loadMultipleConsumers();
52+
$this->loadDynamicConsumers();
5253
$this->loadAnonConsumers();
5354
$this->loadRpcClients();
5455
$this->loadRpcServers();
@@ -233,6 +234,57 @@ protected function loadMultipleConsumers()
233234
}
234235
}
235236
}
237+
238+
protected function loadDynamicConsumers()
239+
{
240+
foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
241+
242+
if (empty($consumer['queue_options_provider'])) {
243+
throw new InvalidConfigurationException(
244+
"Error on loading $key dynamic consumer. " .
245+
"'queue_provider' parameter should be defined."
246+
);
247+
}
248+
249+
$definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
250+
$definition
251+
->addTag('old_sound_rabbit_mq.base_amqp')
252+
->addTag('old_sound_rabbit_mq.consumer')
253+
->addTag('old_sound_rabbit_mq.dynamic_consumer')
254+
->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
255+
->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
256+
257+
if (array_key_exists('qos_options', $consumer)) {
258+
$definition->addMethodCall('setQosOptions', array(
259+
$consumer['qos_options']['prefetch_size'],
260+
$consumer['qos_options']['prefetch_count'],
261+
$consumer['qos_options']['global']
262+
));
263+
}
264+
265+
$definition->addMethodCall(
266+
'setQueueOptionsProvider',
267+
array(new Reference($consumer['queue_options_provider']))
268+
);
269+
270+
if(isset($consumer['idle_timeout'])) {
271+
$definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
272+
}
273+
if (!$consumer['auto_setup_fabric']) {
274+
$definition->addMethodCall('disableAutoSetupFabric');
275+
}
276+
277+
$this->injectConnection($definition, $consumer['connection']);
278+
if ($this->collectorEnabled) {
279+
$this->injectLoggedChannel($definition, $key, $consumer['connection']);
280+
}
281+
282+
$name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
283+
$this->container->setDefinition($name, $definition);
284+
$this->addDequeuerAwareCall($consumer['callback'], $name);
285+
$this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
286+
}
287+
}
236288

237289
protected function loadAnonConsumers()
238290
{
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Provider;
4+
5+
/**
6+
* Queue provider interface
7+
*
8+
* @author Tibor Barna <tibor.barna@wiredminds.de>
9+
*/
10+
interface QueueOptionsProviderInterface
11+
{
12+
/**
13+
* Return queue options
14+
*
15+
* Example:
16+
* array(
17+
* 'name' => 'example_context',
18+
* 'durable' => true,
19+
* 'routing_keys' => array('key.*')
20+
* )
21+
*
22+
* @return array
23+
*
24+
*/
25+
public function getQueueOptions($context = null);
26+
}

README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,32 @@ Be aware that queues providers are responsible for the proper calls to `setDeque
516516
(not `ConsumerInterface`). In case service providing queues implements `DequeuerAwareInterface`, a call to
517517
`setDequeuer` is added to the definition of the service with a `DequeuerInterface` currently being a `MultipleConsumer`.
518518

519+
### Dynamic Consumers ###
520+
521+
Sometimes you have to change the consumer's configuration on the fly.
522+
Dynamic consumers allow you to define the consumers queue options programatically, based on the context.
523+
524+
e.g. In a scenario when the defined consumer must be responsible for a dynamic number of topics and you do not want (or can't) change it's configuration every time.
525+
526+
Define a service `queue_options_provider` that implements the `QueueOptionsProviderInterface`, and add it to your `dynamic_consumers` configuration.
527+
528+
```yaml
529+
dynamic_consumers:
530+
proc_logs:
531+
connection: default
532+
exchange_options: {name: 'logs', type: topic}
533+
callback: parse_logs_service
534+
queue_options_provider: queue_options_provider_service
535+
```
536+
537+
Example Usage:
538+
539+
```bash
540+
$ ./app/console rabbitmq:dynamic-consumer proc_logs server1
541+
```
542+
543+
In this case the `proc_logs` consumer runs for `server1` and it can decide over the queue options it uses.
544+
519545
### Anonymous Consumers ###
520546

521547
Now, why will we ever need anonymous consumers? This sounds like some internet threat or something… Keep reading.

RabbitMq/DynamicConsumer.php

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\RabbitMq;
4+
5+
use OldSound\RabbitMqBundle\Provider\QueueOptionsProviderInterface;
6+
7+
class DynamicConsumer extends Consumer{
8+
9+
/**
10+
* Queue provider
11+
*
12+
* @var QueueOptionsProviderInterface
13+
*/
14+
protected $queueOptionsProvider = null;
15+
16+
/**
17+
* Context the consumer runs in
18+
*
19+
* @var string
20+
*/
21+
protected $context = null;
22+
23+
/**
24+
* QueueOptionsProvider setter
25+
*
26+
* @param QueueOptionsProviderInterface $queueOptionsProvider
27+
*
28+
* @return self
29+
*/
30+
public function setQueueOptionsProvider(QueueOptionsProviderInterface $queueOptionsProvider)
31+
{
32+
$this->queueOptionsProvider = $queueOptionsProvider;
33+
return $this;
34+
}
35+
36+
public function setContext($context)
37+
{
38+
$this->context = $context;
39+
}
40+
41+
42+
protected function setupConsumer()
43+
{
44+
$this->mergeQueueOptions();
45+
parent::setupConsumer();
46+
}
47+
48+
protected function mergeQueueOptions()
49+
{
50+
$this->queueOptions = array_merge($this->queueOptions, $this->queueOptionsProvider->getQueueOptions($this->context));
51+
}
52+
}

Resources/config/rabbitmq.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
<parameter key="old_sound_rabbit_mq.producer.class">OldSound\RabbitMqBundle\RabbitMq\Producer</parameter>
1111
<parameter key="old_sound_rabbit_mq.consumer.class">OldSound\RabbitMqBundle\RabbitMq\Consumer</parameter>
1212
<parameter key="old_sound_rabbit_mq.multi_consumer.class">OldSound\RabbitMqBundle\RabbitMq\MultipleConsumer</parameter>
13+
<parameter key="old_sound_rabbit_mq.dynamic_consumer.class">OldSound\RabbitMqBundle\RabbitMq\DynamicConsumer</parameter>
1314
<parameter key="old_sound_rabbit_mq.anon_consumer.class">OldSound\RabbitMqBundle\RabbitMq\AnonConsumer</parameter>
1415
<parameter key="old_sound_rabbit_mq.rpc_client.class">OldSound\RabbitMqBundle\RabbitMq\RpcClient</parameter>
1516
<parameter key="old_sound_rabbit_mq.rpc_server.class">OldSound\RabbitMqBundle\RabbitMq\RpcServer</parameter>
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Tests\Command;
4+
5+
use OldSound\RabbitMqBundle\Command\DynamicConsumerCommand;
6+
7+
use Symfony\Component\Console\Input\InputOption;
8+
9+
class DynamicConsumerCommandTest extends BaseCommandTest{
10+
11+
protected function setUp()
12+
{
13+
parent::setUp();
14+
$this->definition->expects($this->any())
15+
->method('getOptions')
16+
->will($this->returnValue(array(
17+
new InputOption('--verbose', '-v', InputOption::VALUE_NONE, 'Increase verbosity of messages.'),
18+
new InputOption('--env', '-e', InputOption::VALUE_REQUIRED, 'The Environment name.', 'dev'),
19+
new InputOption('--no-debug', null, InputOption::VALUE_NONE, 'Switches off debug mode.'),
20+
)));
21+
$this->application->expects($this->once())
22+
->method('getHelperSet')
23+
->will($this->returnValue($this->helperSet));
24+
25+
$this->command = new DynamicConsumerCommand();
26+
$this->command->setApplication($this->application);
27+
}
28+
29+
/**
30+
* testInputsDefinitionCommand
31+
*/
32+
public function testInputsDefinitionCommand()
33+
{
34+
// check argument
35+
$this->assertTrue($this->command->getDefinition()->hasArgument('name'));
36+
$this->assertTrue($this->command->getDefinition()->getArgument('name')->isRequired()); // Name is required to find the service
37+
38+
$this->assertTrue($this->command->getDefinition()->hasArgument('context'));
39+
$this->assertTrue($this->command->getDefinition()->getArgument('context')->isRequired()); // Context is required for the queue options provider
40+
41+
//check options
42+
$this->assertTrue($this->command->getDefinition()->hasOption('messages'));
43+
$this->assertTrue($this->command->getDefinition()->getOption('messages')->isValueOptional()); // It should accept value
44+
45+
$this->assertTrue($this->command->getDefinition()->hasOption('route'));
46+
$this->assertTrue($this->command->getDefinition()->getOption('route')->isValueOptional()); // It should accept value
47+
48+
$this->assertTrue($this->command->getDefinition()->hasOption('without-signals'));
49+
$this->assertFalse($this->command->getDefinition()->getOption('without-signals')->acceptValue()); // It shouldn't accept value because it is a true/false input
50+
51+
$this->assertTrue($this->command->getDefinition()->hasOption('debug'));
52+
$this->assertFalse($this->command->getDefinition()->getOption('debug')->acceptValue()); // It shouldn't accept value because it is a true/false input
53+
}
54+
}

Tests/DependencyInjection/Fixtures/test.yml

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,23 @@ old_sound_rabbit_mq:
122122
- 'iphone.upload'
123123
callback: foo.multiple_test2.callback
124124
queues_provider: foo.queues_provider
125-
125+
126+
dynamic_consumers:
127+
foo_dyn_consumer:
128+
connection: foo_default
129+
exchange_options:
130+
name: foo_dynamic_exchange
131+
type: direct
132+
callback: foo.dynamic.callback
133+
queue_options_provider: foo.dynamic.provider
134+
bar_dyn_consumer:
135+
connection: bar_default
136+
exchange_options:
137+
name: bar_dynamic_exchange
138+
type: direct
139+
callback: bar.dynamic.callback
140+
queue_options_provider: bar.dynamic.provider
141+
126142
anon_consumers:
127143
foo_anon_consumer:
128144
connection: foo_connection

0 commit comments

Comments
 (0)