Skip to content

Commit b78ac22

Browse files
committed
Merge pull request #230 from bburnichon/feature/dequeuer-aware-interface
add dequeuer and dequeuer aware interface
2 parents f2124c4 + 1d4fc0f commit b78ac22

File tree

6 files changed

+124
-7
lines changed

6 files changed

+124
-7
lines changed

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,17 @@ protected function loadConsumers()
158158
$this->injectLoggedChannel($definition, $key, $consumer['connection']);
159159
}
160160

161-
$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_consumer', $key), $definition);
161+
$name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
162+
$this->container->setDefinition($name, $definition);
163+
$this->addDequeuerAwareCall($consumer['callback'], $name);
162164
}
163165
}
164166

165167
protected function loadMultipleConsumers()
166168
{
167169
foreach ($this->config['multiple_consumers'] as $key => $consumer) {
168170
$queues = array();
171+
$callbacks = array();
169172

170173
if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
171174
throw new InvalidConfigurationException(
@@ -177,6 +180,7 @@ protected function loadMultipleConsumers()
177180
foreach ($consumer['queues'] as $queueName => $queueOptions) {
178181
$queues[$queueOptions['name']] = $queueOptions;
179182
$queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
183+
$callbacks[] = new Reference($queueOptions['callback']);
180184
}
181185

182186
$definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
@@ -213,7 +217,14 @@ protected function loadMultipleConsumers()
213217
$this->injectLoggedChannel($definition, $key, $consumer['connection']);
214218
}
215219

216-
$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_multiple', $key), $definition);
220+
$name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
221+
$this->container->setDefinition($name, $definition);
222+
if ($consumer['queues_provider']) {
223+
$this->addDequeuerAwareCall($consumer['queues_provider'], $name);
224+
}
225+
foreach ($callbacks as $callback) {
226+
$this->addDequeuerAwareCall($callback, $name);
227+
}
217228
}
218229
}
219230

@@ -231,7 +242,9 @@ protected function loadAnonConsumers()
231242
$this->injectLoggedChannel($definition, $key, $anon['connection']);
232243
}
233244

234-
$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_anon', $key), $definition);
245+
$name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
246+
$this->container->setDefinition($name, $definition);
247+
$this->addDequeuerAwareCall($anon['callback'], $name);
235248
}
236249
}
237250

@@ -355,4 +368,23 @@ public function getAlias()
355368
{
356369
return 'old_sound_rabbit_mq';
357370
}
371+
372+
/**
373+
* Add proper dequeuer aware call
374+
*
375+
* @param string $callback
376+
* @param string $name
377+
*/
378+
protected function addDequeuerAwareCall($callback, $name)
379+
{
380+
if (! $this->container->has($callback)) {
381+
return;
382+
}
383+
384+
$callbackDefinition = $this->container->findDefinition($callback);
385+
$refClass = new \ReflectionClass($callbackDefinition->getClass());
386+
if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
387+
$callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
388+
}
389+
}
358390
}

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,11 @@ All the options of `queues-options` in the consumer are available for each queue
516516
Be aware that all queues are under the same exchange, it's up to you to set the correct routing for callbacks.
517517
518518
The `queues_provider` is a optional service that dynamically provides queues.
519-
It must implement `QueuesProviderInterface`
519+
It must implement `QueuesProviderInterface`.
520+
521+
Be aware that queues providers are responsible for the proper calls to `setDequeuer` and that callbacks are callables
522+
(not `ConsumerInterface`). In case service providing queues implements `DequeuerAwareInterface`, a call to
523+
`setDequeuer` is added to the definition of the service with a `DequeuerInterface` currently being a `MultipleConsumer`.
520524
521525
### Anonymous Consumers ###
522526

RabbitMq/BaseConsumer.php

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22

33
namespace OldSound\RabbitMqBundle\RabbitMq;
44

5-
use OldSound\RabbitMqBundle\RabbitMq\BaseAmqp;
6-
7-
abstract class BaseConsumer extends BaseAmqp
5+
abstract class BaseConsumer extends BaseAmqp implements DequeuerInterface
86
{
97
protected $target;
108

RabbitMq/DequeuerAwareInterface.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\RabbitMq;
4+
5+
interface DequeuerAwareInterface
6+
{
7+
public function setDequeuer(DequeuerInterface $dequeuer);
8+
}

RabbitMq/DequeuerInterface.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\RabbitMq;
4+
5+
interface DequeuerInterface
6+
{
7+
/**
8+
* Stop dequeuing messages.
9+
*
10+
* @return void
11+
*/
12+
public function forceStopConsumer();
13+
14+
/**
15+
* Set idle timeout
16+
*
17+
* @param $idleTimeout
18+
*
19+
* @return void
20+
*/
21+
public function setIdleTimeout($idleTimeout);
22+
23+
/**
24+
* Get current idle timeout
25+
*
26+
* @return int
27+
*/
28+
public function getIdleTimeout();
29+
}

Tests/RabbitMq/BaseConsumerTest.php

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Tests\RabbitMq;
4+
5+
use OldSound\RabbitMqBundle\RabbitMq\BaseConsumer;
6+
7+
class BaseConsumerTest extends \PHPUnit_Framework_TestCase
8+
{
9+
/** @var BaseConsumer */
10+
protected $consumer;
11+
12+
protected function setUp()
13+
{
14+
$amqpConnection = $this->getMockBuilder('\PhpAmqpLib\Connection\AMQPConnection')
15+
->disableOriginalConstructor()
16+
->getMock();
17+
18+
$this->consumer = $this->getMockBuilder('\OldSound\RabbitMqBundle\RabbitMq\BaseConsumer')
19+
->setConstructorArgs(array($amqpConnection))
20+
->getMockForAbstractClass();
21+
}
22+
23+
public function testItExtendsBaseAmqpInterface()
24+
{
25+
$this->assertInstanceOf('OldSound\RabbitMqBundle\RabbitMq\BaseAmqp', $this->consumer);
26+
}
27+
28+
public function testItImplementsDequeuerInterface()
29+
{
30+
$this->assertInstanceOf('OldSound\RabbitMqBundle\RabbitMq\DequeuerInterface', $this->consumer);
31+
}
32+
33+
public function testItsIdleTimeoutIsMutable()
34+
{
35+
$this->assertEquals(0, $this->consumer->getIdleTimeout());
36+
$this->consumer->setIdleTimeout(42);
37+
$this->assertEquals(42, $this->consumer->getIdleTimeout());
38+
}
39+
40+
public function testForceStopConsumer()
41+
{
42+
$this->assertAttributeEquals(false, 'forceStop', $this->consumer);
43+
$this->consumer->forceStopConsumer();
44+
$this->assertAttributeEquals(true, 'forceStop', $this->consumer);
45+
}
46+
}

0 commit comments

Comments
 (0)