Skip to content

Commit 5542098

Browse files
authored
Consumer timeout wait (#36)
adding `timeout_wait` specifies how long the consumer will wait without receiving a new message before ensuring the current connection is still valid
1 parent 5a8d35d commit 5542098

File tree

7 files changed

+240
-48
lines changed

7 files changed

+240
-48
lines changed

DependencyInjection/Configuration.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ protected function addConsumers(ArrayNodeDefinition $node)
155155
->scalarNode('callback')->isRequired()->end()
156156
->scalarNode('idle_timeout')->end()
157157
->scalarNode('idle_timeout_exit_code')->end()
158+
->scalarNode('timeout_wait')->end()
158159
->arrayNode('graceful_max_execution')
159160
->canBeUnset()
160161
->children()
@@ -193,6 +194,7 @@ protected function addMultipleConsumers(ArrayNodeDefinition $node)
193194
->scalarNode('connection')->defaultValue('default')->end()
194195
->scalarNode('idle_timeout')->end()
195196
->scalarNode('idle_timeout_exit_code')->end()
197+
->scalarNode('timeout_wait')->end()
196198
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
197199
->arrayNode('graceful_max_execution')
198200
->canBeUnset()
@@ -217,7 +219,7 @@ protected function addMultipleConsumers(ArrayNodeDefinition $node)
217219
->end()
218220
;
219221
}
220-
222+
221223
protected function addDynamicConsumers(ArrayNodeDefinition $node)
222224
{
223225
$node
@@ -233,6 +235,7 @@ protected function addDynamicConsumers(ArrayNodeDefinition $node)
233235
->scalarNode('callback')->isRequired()->end()
234236
->scalarNode('idle_timeout')->end()
235237
->scalarNode('idle_timeout_exit_code')->end()
238+
->scalarNode('timeout_wait')->end()
236239
->arrayNode('graceful_max_execution')
237240
->canBeUnset()
238241
->children()

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ protected function loadConsumers()
213213
if (isset($consumer['idle_timeout_exit_code'])) {
214214
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
215215
}
216+
if (isset($consumer['timeout_wait'])) {
217+
$definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
218+
}
216219
if (isset($consumer['graceful_max_execution'])) {
217220
$definition->addMethodCall(
218221
'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
@@ -290,6 +293,9 @@ protected function loadMultipleConsumers()
290293
if (isset($consumer['idle_timeout_exit_code'])) {
291294
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
292295
}
296+
if (isset($consumer['timeout_wait'])) {
297+
$definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
298+
}
293299
if (isset($consumer['graceful_max_execution'])) {
294300
$definition->addMethodCall(
295301
'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
@@ -363,6 +369,9 @@ protected function loadDynamicConsumers()
363369
if (isset($consumer['idle_timeout_exit_code'])) {
364370
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
365371
}
372+
if (isset($consumer['timeout_wait'])) {
373+
$definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
374+
}
366375
if (isset($consumer['graceful_max_execution'])) {
367376
$definition->addMethodCall(
368377
'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
@@ -457,7 +466,7 @@ protected function loadBatchConsumers()
457466
protected function loadAnonConsumers()
458467
{
459468
foreach ($this->config['anon_consumers'] as $key => $anon) {
460-
$definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
469+
$definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
461470
$definition
462471
->setPublic(true)
463472
->addTag('old_sound_rabbit_mq.base_amqp')

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,23 @@ consumers:
435435
idle_timeout_exit_code: 0
436436
```
437437
438+
#### Timeout wait ####
439+
440+
Set the `timeout_wait` in seconds.
441+
The `timeout_wait` specifies how long the consumer will wait without receiving a new message before ensuring the current connection is still valid.
442+
443+
```yaml
444+
consumers:
445+
upload_picture:
446+
connection: default
447+
exchange_options: {name: 'upload-picture', type: direct}
448+
queue_options: {name: 'upload-picture'}
449+
callback: upload_picture_service
450+
idle_timeout: 60
451+
idle_timeout_exit_code: 0
452+
timeout_wait: 10
453+
```
454+
438455
#### Graceful max execution timeout ####
439456

440457
If you'd like your consumer to be running up to certain time and then gracefully exit, then set the `graceful_max_execution.timeout` in seconds.

RabbitMq/Consumer.php

Lines changed: 65 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@
1313

1414
class Consumer extends BaseConsumer
1515
{
16-
const TIMEOUT_TYPE_IDLE = 'idle';
17-
const TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION = 'graceful-max-execution';
18-
1916
/**
2017
* @var int|null $memoryLimit
2118
*/
@@ -32,6 +29,16 @@ class Consumer extends BaseConsumer
3229
*/
3330
protected $gracefulMaxExecutionTimeoutExitCode = 0;
3431

32+
/**
33+
* @var int|null
34+
*/
35+
protected $timeoutWait;
36+
37+
/**
38+
* @var \DateTime|null
39+
*/
40+
protected $lastActivityDateTime;
41+
3542
/**
3643
* Set the memory limit
3744
*
@@ -67,6 +74,7 @@ public function consume($msgAmount)
6774

6875
$this->setupConsumer();
6976

77+
$this->setLastActivityDateTime(new \DateTime());
7078
while (count($this->getChannel()->callbacks)) {
7179
$this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
7280
$this->maybeStopConsumer();
@@ -76,29 +84,35 @@ public function consume($msgAmount)
7684
* graceful max execution timeout is being used.
7785
*/
7886
$waitTimeout = $this->chooseWaitTimeout();
79-
if (
80-
$waitTimeout['timeoutType'] === self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION
81-
&& $waitTimeout['seconds'] < 1
87+
if ($this->gracefulMaxExecutionDateTime
88+
&& $waitTimeout < 1
8289
) {
8390
return $this->gracefulMaxExecutionTimeoutExitCode;
8491
}
8592

8693
if (!$this->forceStop) {
8794
try {
88-
$this->getChannel()->wait(null, false, $waitTimeout['seconds']);
95+
$this->getChannel()->wait(null, false, $waitTimeout);
96+
$this->setLastActivityDateTime(new \DateTime());
8997
} catch (AMQPTimeoutException $e) {
90-
if (self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION === $waitTimeout['timeoutType']) {
91-
return $this->gracefulMaxExecutionTimeoutExitCode;
92-
}
98+
$now = time();
9399

94-
$idleEvent = new OnIdleEvent($this);
95-
$this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
96-
97-
if ($idleEvent->isForceStop()) {
98-
if (null !== $this->getIdleTimeoutExitCode()) {
99-
return $this->getIdleTimeoutExitCode();
100-
} else {
101-
throw $e;
100+
if ($this->gracefulMaxExecutionDateTime
101+
&& $this->gracefulMaxExecutionDateTime <= new \DateTime("@$now")
102+
) {
103+
return $this->gracefulMaxExecutionTimeoutExitCode;
104+
} elseif ($this->getIdleTimeout()
105+
&& ($this->getLastActivityDateTime()->getTimestamp() + $this->getIdleTimeout() <= $now)
106+
) {
107+
$idleEvent = new OnIdleEvent($this);
108+
$this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
109+
110+
if ($idleEvent->isForceStop()) {
111+
if (null !== $this->getIdleTimeoutExitCode()) {
112+
return $this->getIdleTimeoutExitCode();
113+
} else {
114+
throw $e;
115+
}
102116
}
103117
}
104118
}
@@ -115,7 +129,7 @@ public function purge()
115129
{
116130
$this->getChannel()->queue_purge($this->queueOptions['name'], true);
117131
}
118-
132+
119133
/**
120134
* Delete the queue
121135
*/
@@ -239,6 +253,11 @@ public function setGracefulMaxExecutionTimeoutExitCode($exitCode)
239253
$this->gracefulMaxExecutionTimeoutExitCode = $exitCode;
240254
}
241255

256+
public function setTimeoutWait(int $timeoutWait): void
257+
{
258+
$this->timeoutWait = $timeoutWait;
259+
}
260+
242261
/**
243262
* @return \DateTime|null
244263
*/
@@ -255,20 +274,19 @@ public function getGracefulMaxExecutionTimeoutExitCode()
255274
return $this->gracefulMaxExecutionTimeoutExitCode;
256275
}
257276

277+
public function getTimeoutWait(): ?int
278+
{
279+
return $this->timeoutWait;
280+
}
281+
258282
/**
259-
* Choose the timeout to use for the $this->getChannel()->wait() method.
260-
*
261-
* @return array Of structure
262-
* {
263-
* timeoutType: string; // one of self::TIMEOUT_TYPE_*
264-
* seconds: int;
265-
* }
283+
* Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
266284
*/
267-
private function chooseWaitTimeout()
285+
private function chooseWaitTimeout(): int
268286
{
269287
if ($this->gracefulMaxExecutionDateTime) {
270288
$allowedExecutionDateInterval = $this->gracefulMaxExecutionDateTime->diff(new \DateTime());
271-
$allowedExecutionSeconds = $allowedExecutionDateInterval->days * 86400
289+
$allowedExecutionSeconds = $allowedExecutionDateInterval->days * 86400
272290
+ $allowedExecutionDateInterval->h * 3600
273291
+ $allowedExecutionDateInterval->i * 60
274292
+ $allowedExecutionDateInterval->s;
@@ -281,25 +299,30 @@ private function chooseWaitTimeout()
281299
* Respect the idle timeout if it's set and if it's less than
282300
* the remaining allowed execution.
283301
*/
284-
if (
285-
$this->getIdleTimeout()
302+
if ($this->getIdleTimeout()
286303
&& $this->getIdleTimeout() < $allowedExecutionSeconds
287304
) {
288-
return array(
289-
'timeoutType' => self::TIMEOUT_TYPE_IDLE,
290-
'seconds' => $this->getIdleTimeout(),
291-
);
305+
$waitTimeout = $this->getIdleTimeout();
306+
} else {
307+
$waitTimeout = $allowedExecutionSeconds;
292308
}
309+
} else {
310+
$waitTimeout = $this->getIdleTimeout();
311+
}
293312

294-
return array(
295-
'timeoutType' => self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION,
296-
'seconds' => $allowedExecutionSeconds,
297-
);
313+
if (!is_null($this->getTimeoutWait()) && $this->getTimeoutWait() > 0) {
314+
$waitTimeout = min($waitTimeout, $this->getTimeoutWait());
298315
}
316+
return $waitTimeout;
317+
}
299318

300-
return array(
301-
'timeoutType' => self::TIMEOUT_TYPE_IDLE,
302-
'seconds' => $this->getIdleTimeout(),
303-
);
319+
public function setLastActivityDateTime(\DateTime $dateTime)
320+
{
321+
$this->lastActivityDateTime = $dateTime;
322+
}
323+
324+
protected function getLastActivityDateTime(): ?\DateTime
325+
{
326+
return $this->lastActivityDateTime;
304327
}
305328
}

Tests/DependencyInjection/Fixtures/test.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ old_sound_rabbit_mq:
8989
consumers:
9090
foo_consumer:
9191
connection: foo_connection
92+
timeout_wait: 3
9293
exchange_options:
9394
name: foo_exchange
9495
type: direct
@@ -137,6 +138,7 @@ old_sound_rabbit_mq:
137138
multiple_consumers:
138139
multi_test_consumer:
139140
connection: foo_connection
141+
timeout_wait: 3
140142
exchange_options:
141143
name: foo_multiple_exchange
142144
type: direct
@@ -158,7 +160,7 @@ old_sound_rabbit_mq:
158160
- 'iphone.upload'
159161
callback: foo.multiple_test2.callback
160162
queues_provider: foo.queues_provider
161-
163+
162164
dynamic_consumers:
163165
foo_dyn_consumer:
164166
connection: foo_default

Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,10 @@ public function testFooConsumerDefinition()
367367
array(
368368
'setCallback',
369369
array(array(new Reference('foo.callback'), 'execute'))
370+
),
371+
array(
372+
'setTimeoutWait',
373+
array(3)
370374
)
371375
),
372376
$definition->getMethodCalls()
@@ -518,6 +522,10 @@ public function testMultipleConsumerDefinition()
518522
array(
519523
new Reference('foo.queues_provider')
520524
)
525+
),
526+
array(
527+
'setTimeoutWait',
528+
array(3)
521529
)
522530
),
523531
$definition->getMethodCalls()

0 commit comments

Comments
 (0)