Skip to content

Commit dc96d6e

Browse files
makasimskafandri
authored andcommitted
Add idle event and ability to configure exit or not. (#419)
* Add idle event and ability to configure exit or not. * fix tests on php 5.3. remove ::class const * Update documentation about idle event
1 parent a6b6474 commit dc96d6e

File tree

6 files changed

+193
-5
lines changed

6 files changed

+193
-5
lines changed

Event/AMQPEvent.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
class AMQPEvent extends Event
1616
{
1717
const ON_CONSUME = 'on_consume';
18+
const ON_IDLE = 'on_idle';
1819
const BEFORE_PROCESSING_MESSAGE = 'before_processing';
1920
const AFTER_PROCESSING_MESSAGE = 'after_processing';
2021

Event/OnIdleEvent.php

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Event;
4+
5+
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
6+
7+
/**
8+
* Class OnIdleEvent
9+
*
10+
* @package OldSound\RabbitMqBundle\Command
11+
*/
12+
class OnIdleEvent extends AMQPEvent
13+
{
14+
const NAME = AMQPEvent::ON_IDLE;
15+
16+
/**
17+
* @var bool
18+
*/
19+
private $forceStop;
20+
21+
/**
22+
* OnConsumeEvent constructor.
23+
*
24+
* @param Consumer $consumer
25+
*/
26+
public function __construct(Consumer $consumer)
27+
{
28+
$this->setConsumer($consumer);
29+
30+
$this->forceStop = true;
31+
}
32+
33+
/**
34+
* @return boolean
35+
*/
36+
public function isForceStop()
37+
{
38+
return $this->forceStop;
39+
}
40+
41+
/**
42+
* @param boolean $forceStop
43+
*/
44+
public function setForceStop($forceStop)
45+
{
46+
$this->forceStop = $forceStop;
47+
}
48+
}

README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,32 @@ class AfterProcessingMessageEvent extends AMQPEvent
351351
Event raised after processing a AMQPMessage.
352352
If the process message will throw an Exception the event will not raise.
353353
354+
##### IDLE MESSAGE #####
355+
356+
```php
357+
<?php
358+
class OnIdleEvent extends AMQPEvent
359+
{
360+
const NAME = AMQPEvent::ON_IDLE;
361+
362+
/**
363+
* OnIdleEvent constructor.
364+
*
365+
* @param AMQPMessage $AMQPMessage
366+
*/
367+
public function __construct(Consumer $consumer)
368+
{
369+
$this->setConsumer($consumer);
370+
371+
$this->forceStop = true;
372+
}
373+
}
374+
```
375+
376+
Event raised when `wait` method exit by timeout without receiving a message.
377+
In order to make use of this event a consumer `idle_timeout` has to be [configured](#idle-timeout).
378+
By default process exit on idle timeout, you can prevent it by setting `$event->setForceStop(false)` in a listener.
379+
354380
#### Idle timeout ####
355381
356382
If you need to set a timeout when there are no messages from your queue during a period of time, you can set the `idle_timeout` in seconds.

RabbitMq/Consumer.php

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
66
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
77
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
8+
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
89
use OldSound\RabbitMqBundle\MemoryChecker\MemoryConsumptionChecker;
910
use OldSound\RabbitMqBundle\MemoryChecker\NativeMemoryUsageProvider;
1011
use PhpAmqpLib\Exception\AMQPTimeoutException;
@@ -55,10 +56,15 @@ public function consume($msgAmount)
5556
try {
5657
$this->getChannel()->wait(null, false, $this->getIdleTimeout());
5758
} catch (AMQPTimeoutException $e) {
58-
if (null !== $this->getIdleTimeoutExitCode()) {
59-
return $this->getIdleTimeoutExitCode();
60-
} else {
61-
throw $e;
59+
$idleEvent = new OnIdleEvent($this);
60+
$this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
61+
62+
if ($idleEvent->isForceStop()) {
63+
if (null !== $this->getIdleTimeoutExitCode()) {
64+
return $this->getIdleTimeoutExitCode();
65+
} else {
66+
throw $e;
67+
}
6268
}
6369
}
6470
}

Tests/Event/OnIdleEventTest.php

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Tests\Event;
4+
5+
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
6+
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
7+
8+
/**
9+
* Class OnIdleEventTest
10+
*
11+
* @package OldSound\RabbitMqBundle\Tests\Event
12+
*/
13+
class OnIdleEventTest extends \PHPUnit_Framework_TestCase
14+
{
15+
protected function getConsumer()
16+
{
17+
return new Consumer(
18+
$this->getMockBuilder('\PhpAmqpLib\Connection\AMQPConnection')
19+
->disableOriginalConstructor()
20+
->getMock(),
21+
$this->getMockBuilder('\PhpAmqpLib\Channel\AMQPChannel')
22+
->disableOriginalConstructor()
23+
->getMock()
24+
);
25+
}
26+
27+
public function testShouldAllowGetConsumerSetInConstructor()
28+
{
29+
$consumer = $this->getConsumer();
30+
$event = new OnIdleEvent($consumer);
31+
32+
$this->assertSame($consumer, $event->getConsumer());
33+
}
34+
35+
public function testShouldSetForceStopToTrueInConstructor()
36+
{
37+
$consumer = $this->getConsumer();
38+
$event = new OnIdleEvent($consumer);
39+
40+
$this->assertTrue($event->isForceStop());
41+
}
42+
43+
public function testShouldReturnPreviouslySetForceStop()
44+
{
45+
$consumer = $this->getConsumer();
46+
$event = new OnIdleEvent($consumer);
47+
48+
//guard
49+
$this->assertTrue($event->isForceStop());
50+
51+
$event->setForceStop(false);
52+
$this->assertFalse($event->isForceStop());
53+
}
54+
}

Tests/RabbitMq/ConsumerTest.php

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
66
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
77
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
8+
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
89
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
910
use PhpAmqpLib\Exception\AMQPTimeoutException;
1011
use PhpAmqpLib\Message\AMQPMessage;
@@ -144,7 +145,7 @@ public function testConsume($data)
144145
$amqpChannel->callbacks = $consumerCallBacks;
145146

146147
/**
147-
* Mock ait method and use a callback to remove one element each time from callbacks
148+
* Mock wait method and use a callback to remove one element each time from callbacks
148149
* This will simulate a basic consumer consume with provided messages count
149150
*/
150151
$amqpChannel->expects($this->exactly(count($consumerCallBacks)))
@@ -202,4 +203,56 @@ public function testIdleTimeoutExitCode()
202203

203204
$this->assertTrue(2 == $consumer->consume(1));
204205
}
206+
207+
public function testShouldAllowContinueConsumptionAfterIdleTimeout()
208+
{
209+
// set up amqp connection
210+
$amqpConnection = $this->prepareAMQPConnection();
211+
// set up amqp channel
212+
$amqpChannel = $this->prepareAMQPChannel();
213+
$amqpChannel->expects($this->atLeastOnce())
214+
->method('getChannelId')
215+
->with()
216+
->willReturn(true);
217+
$amqpChannel->expects($this->once())
218+
->method('basic_consume')
219+
->withAnyParameters()
220+
->willReturn(true);
221+
222+
// set up consumer
223+
$consumer = $this->getConsumer($amqpConnection, $amqpChannel);
224+
// disable autosetup fabric so we do not mock more objects
225+
$consumer->disableAutoSetupFabric();
226+
$consumer->setChannel($amqpChannel);
227+
$consumer->setIdleTimeout(2);
228+
$amqpChannel->callbacks = array('idle_timeout_exit_code');
229+
230+
$amqpChannel->expects($this->exactly(2))
231+
->method('wait')
232+
->with(null, false, $consumer->getIdleTimeout())
233+
->willThrowException(new AMQPTimeoutException());
234+
235+
// set up event dispatcher
236+
$eventDispatcher = $this->getMockBuilder('Symfony\Component\EventDispatcher\EventDispatcher')
237+
->disableOriginalConstructor()
238+
->getMock();
239+
240+
$eventDispatcher->expects($this->at(1))
241+
->method('dispatch')
242+
->with(OnIdleEvent::NAME, $this->isInstanceOf('OldSound\RabbitMqBundle\Event\OnIdleEvent'))
243+
->willReturnCallback(function($eventName, OnIdleEvent $event) {
244+
$event->setForceStop(false);
245+
});
246+
$eventDispatcher->expects($this->at(3))
247+
->method('dispatch')
248+
->with(OnIdleEvent::NAME, $this->isInstanceOf('OldSound\RabbitMqBundle\Event\OnIdleEvent'))
249+
->willReturn(function($eventName, OnIdleEvent $event) {
250+
$event->setForceStop(true);
251+
});
252+
253+
$consumer->setEventDispatcher($eventDispatcher);
254+
255+
$this->setExpectedException('PhpAmqpLib\Exception\AMQPTimeoutException');
256+
$consumer->consume(10);
257+
}
205258
}

0 commit comments

Comments
 (0)