Skip to content

Commit 6116169

Browse files
committed
Extended message queue consumer configuration with parameters "maxIdleTime", "sleep" and "onlySpawnWhenMessageAvailable". Created logic for checking/handling these parameters during firing/executing queue consumers. Updated unit tests.
1 parent 6b6f428 commit 6116169

26 files changed

+802
-75
lines changed

app/code/Magento/AsynchronousOperations/Model/MassConsumer.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,19 @@ public function process($maxNumberOfMessages = null)
6969
$this->registry->register('isSecureArea', true, true);
7070

7171
$queue = $this->configuration->getQueue();
72+
$maxIdleTime = $this->configuration->getMaxIdleTime();
73+
$sleep = $this->configuration->getSleep();
7274

7375
if (!isset($maxNumberOfMessages)) {
7476
$queue->subscribe($this->getTransactionCallback($queue));
7577
} else {
76-
$this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue));
78+
$this->invoker->invoke(
79+
$queue,
80+
$maxNumberOfMessages,
81+
$this->getTransactionCallback($queue),
82+
$maxIdleTime,
83+
$sleep
84+
);
7785
}
7886

7987
$this->registry->unregister('isSecureArea');
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
declare(strict_types=1);
7+
8+
namespace Magento\MessageQueue\Model;
9+
10+
use Magento\Framework\MessageQueue\QueueRepository;
11+
12+
/**
13+
* Class CheckIsAvailableMessagesInQueue for checking messages available in queue
14+
*/
15+
class CheckIsAvailableMessagesInQueue
16+
{
17+
/**
18+
* @var QueueRepository
19+
*/
20+
private $queueRepository;
21+
22+
/**
23+
* Initialize dependencies.
24+
*
25+
* @param QueueRepository $queueRepository
26+
*/
27+
public function __construct(QueueRepository $queueRepository)
28+
{
29+
$this->queueRepository = $queueRepository;
30+
}
31+
32+
/**
33+
* Checks if there is available messages in the queue
34+
*
35+
* @param string $connectionName connection name
36+
* @param string $queueName queue name
37+
* @return bool
38+
* @throws \LogicException if queue is not available
39+
*/
40+
public function execute($connectionName, $queueName)
41+
{
42+
$queue = $this->queueRepository->get($connectionName, $queueName);
43+
$message = $queue->dequeue();
44+
if ($message) {
45+
$queue->reject($message);
46+
return true;
47+
}
48+
return false;
49+
}
50+
}

app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Psr\Log\LoggerInterface;
1515
use Symfony\Component\Process\PhpExecutableFinder;
1616
use Magento\Framework\Lock\LockManagerInterface;
17+
use Magento\MessageQueue\Model\CheckIsAvailableMessagesInQueue;
1718

1819
/**
1920
* Class for running consumers processes by cron
@@ -65,6 +66,11 @@ class ConsumersRunner
6566
*/
6667
private $lockManager;
6768

69+
/**
70+
* @var CheckIsAvailableMessagesInQueue
71+
*/
72+
private $checkIsAvailableMessages;
73+
6874
/**
6975
* @param PhpExecutableFinder $phpExecutableFinder The executable finder specifically designed
7076
* for the PHP executable
@@ -74,6 +80,7 @@ class ConsumersRunner
7480
* @param LockManagerInterface $lockManager The lock manager
7581
* @param ConnectionTypeResolver $mqConnectionTypeResolver Consumer connection resolver
7682
* @param LoggerInterface $logger Logger
83+
* @param CheckIsAvailableMessagesInQueue $checkIsAvailableMessages
7784
*/
7885
public function __construct(
7986
PhpExecutableFinder $phpExecutableFinder,
@@ -82,7 +89,8 @@ public function __construct(
8289
ShellInterface $shellBackground,
8390
LockManagerInterface $lockManager,
8491
ConnectionTypeResolver $mqConnectionTypeResolver = null,
85-
LoggerInterface $logger = null
92+
LoggerInterface $logger = null,
93+
CheckIsAvailableMessagesInQueue $checkIsAvailableMessages = null
8694
) {
8795
$this->phpExecutableFinder = $phpExecutableFinder;
8896
$this->consumerConfig = $consumerConfig;
@@ -93,6 +101,8 @@ public function __construct(
93101
?: ObjectManager::getInstance()->get(ConnectionTypeResolver::class);
94102
$this->logger = $logger
95103
?: ObjectManager::getInstance()->get(LoggerInterface::class);
104+
$this->checkIsAvailableMessages = $checkIsAvailableMessages
105+
?: ObjectManager::getInstance()->get(CheckIsAvailableMessagesInQueue::class);
96106
}
97107

98108
/**
@@ -166,6 +176,25 @@ private function canBeRun(ConsumerConfigItemInterface $consumerConfig, array $al
166176
return false;
167177
}
168178

179+
if ($consumerConfig->getOnlySpawnWhenMessageAvailable()) {
180+
try {
181+
return $this->checkIsAvailableMessages->execute(
182+
$connectionName,
183+
$consumerConfig->getQueue()
184+
);
185+
} catch (\LogicException $e) {
186+
$this->logger->info(
187+
sprintf(
188+
'Consumer "%s" skipped as its related queue "%s" is not available. %s',
189+
$consumerName,
190+
$consumerConfig->getQueue(),
191+
$e->getMessage()
192+
)
193+
);
194+
return false;
195+
}
196+
}
197+
169198
return true;
170199
}
171200
}

app/code/Magento/MessageQueue/Test/Unit/Model/Cron/ConsumersRunnerTest.php

Lines changed: 105 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@
66
namespace Magento\MessageQueue\Test\Unit\Model\Cron;
77

88
use Magento\Framework\MessageQueue\ConnectionTypeResolver;
9-
use \PHPUnit_Framework_MockObject_MockObject as MockObject;
9+
use PHPUnit\Framework\MockObject\MockObject as MockObject;
1010
use Magento\Framework\ShellInterface;
1111
use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfigInterface;
1212
use Magento\Framework\MessageQueue\Consumer\Config\ConsumerConfigItemInterface;
1313
use Magento\Framework\App\DeploymentConfig;
1414
use Magento\MessageQueue\Model\Cron\ConsumersRunner;
15+
use Magento\MessageQueue\Model\CheckIsAvailableMessagesInQueue;
1516
use Symfony\Component\Process\PhpExecutableFinder;
1617
use Magento\Framework\Lock\LockManagerInterface;
1718

@@ -45,10 +46,15 @@ class ConsumersRunnerTest extends \PHPUnit\Framework\TestCase
4546
*/
4647
private $phpExecutableFinderMock;
4748

49+
/**
50+
* @var CheckIsAvailableMessagesInQueue|MockObject
51+
*/
52+
private $checkIsAvailableMessagesMock;
53+
4854
/**
4955
* @var ConnectionTypeResolver
5056
*/
51-
private $connectionTypeResover;
57+
private $connectionTypeResolver;
5258

5359
/**
5460
* @var ConsumersRunner
@@ -74,18 +80,21 @@ protected function setUp()
7480
$this->deploymentConfigMock = $this->getMockBuilder(DeploymentConfig::class)
7581
->disableOriginalConstructor()
7682
->getMock();
77-
$this->connectionTypeResover = $this->getMockBuilder(ConnectionTypeResolver::class)
83+
$this->checkIsAvailableMessagesMock = $this->createMock(CheckIsAvailableMessagesInQueue::class);
84+
$this->connectionTypeResolver = $this->getMockBuilder(ConnectionTypeResolver::class)
7885
->disableOriginalConstructor()
7986
->getMock();
80-
$this->connectionTypeResover->method('getConnectionType')->willReturn('something');
87+
$this->connectionTypeResolver->method('getConnectionType')->willReturn('something');
8188

8289
$this->consumersRunner = new ConsumersRunner(
8390
$this->phpExecutableFinderMock,
8491
$this->consumerConfigMock,
8592
$this->deploymentConfigMock,
8693
$this->shellBackgroundMock,
8794
$this->lockManagerMock,
88-
$this->connectionTypeResover
95+
$this->connectionTypeResolver,
96+
null,
97+
$this->checkIsAvailableMessagesMock
8998
);
9099
}
91100

@@ -259,4 +268,95 @@ public function runDataProvider()
259268
],
260269
];
261270
}
271+
272+
/**
273+
* @param boolean $onlySpawnWhenMessageAvailable
274+
* @param boolean $isMassagesAvailableInTheQueue
275+
* @param int $shellBackgroundExpects
276+
* @dataProvider runBasedOnOnlySpawnWhenMessageAvailableConsumerConfigurationDataProvider
277+
*/
278+
public function testRunBasedOnOnlySpawnWhenMessageAvailableConsumerConfiguration(
279+
$onlySpawnWhenMessageAvailable,
280+
$isMassagesAvailableInTheQueue,
281+
$shellBackgroundExpects
282+
) {
283+
$consumerName = 'consumerName';
284+
$connectionName = 'connectionName';
285+
$queueName = 'queueName';
286+
$this->deploymentConfigMock->expects($this->exactly(3))
287+
->method('get')
288+
->willReturnMap(
289+
[
290+
['cron_consumers_runner/cron_run', true, true],
291+
['cron_consumers_runner/max_messages', 10000, 1000],
292+
['cron_consumers_runner/consumers', [], []],
293+
]
294+
);
295+
296+
/** @var ConsumerConfigInterface|MockObject $firstCunsumer */
297+
$consumer = $this->getMockBuilder(ConsumerConfigItemInterface::class)
298+
->getMockForAbstractClass();
299+
$consumer->expects($this->any())
300+
->method('getName')
301+
->willReturn($consumerName);
302+
$consumer->expects($this->once())
303+
->method('getConnection')
304+
->willReturn($connectionName);
305+
$consumer->expects($this->any())
306+
->method('getQueue')
307+
->willReturn($queueName);
308+
$consumer->expects($this->once())
309+
->method('getOnlySpawnWhenMessageAvailable')
310+
->willReturn($onlySpawnWhenMessageAvailable);
311+
$this->consumerConfigMock->expects($this->once())
312+
->method('getConsumers')
313+
->willReturn([$consumer]);
314+
315+
$this->phpExecutableFinderMock->expects($this->once())
316+
->method('find')
317+
->willReturn('');
318+
319+
$this->lockManagerMock->expects($this->once())
320+
->method('isLocked')
321+
->willReturn(false);
322+
323+
$this->checkIsAvailableMessagesMock->expects($this->exactly((int)$onlySpawnWhenMessageAvailable))
324+
->method('execute')
325+
->willReturn($isMassagesAvailableInTheQueue);
326+
327+
$this->shellBackgroundMock->expects($this->exactly($shellBackgroundExpects))
328+
->method('execute');
329+
330+
$this->consumersRunner->run();
331+
}
332+
333+
/**
334+
* @return array
335+
*/
336+
public function runBasedOnOnlySpawnWhenMessageAvailableConsumerConfigurationDataProvider()
337+
{
338+
return [
339+
[
340+
'onlySpawnWhenMessageAvailable' => true,
341+
'isMassagesAvailableInTheQueue' => true,
342+
'shellBackgroundExpects' => 1
343+
],
344+
[
345+
'onlySpawnWhenMessageAvailable' => true,
346+
'isMassagesAvailableInTheQueue' => false,
347+
'shellBackgroundExpects' => 0
348+
],
349+
[
350+
'onlySpawnWhenMessageAvailable' => false,
351+
'isMassagesAvailableInTheQueue' => true,
352+
'shellBackgroundExpects' => 1
353+
],
354+
[
355+
'onlySpawnWhenMessageAvailable' => false,
356+
'isMassagesAvailableInTheQueue' => false,
357+
'shellBackgroundExpects' => 1
358+
],
359+
360+
];
361+
}
262362
}

lib/internal/Magento/Framework/MessageQueue/CallbackInvoker.php

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,31 @@ public function __construct(
5656
* @param QueueInterface $queue
5757
* @param int $maxNumberOfMessages
5858
* @param \Closure $callback
59+
* @param int|null $maxIdleTime
60+
* @param int|null $sleep
5961
* @return void
62+
*
63+
* @SuppressWarnings(PHPMD.CyclomaticComplexity)
6064
*/
61-
public function invoke(QueueInterface $queue, $maxNumberOfMessages, $callback)
62-
{
65+
public function invoke(
66+
QueueInterface $queue,
67+
$maxNumberOfMessages,
68+
$callback,
69+
$maxIdleTime = null,
70+
$sleep = null
71+
) {
6372
$this->poisonPillVersion = $this->poisonPillRead->getLatestVersion();
73+
$sleep = (int) $sleep ?? 1;
74+
$maxIdleTime = $maxIdleTime ? (int) $maxIdleTime : PHP_INT_MAX;
6475
for ($i = $maxNumberOfMessages; $i > 0; $i--) {
76+
$idleStartTime = microtime(true);
6577
do {
6678
$message = $queue->dequeue();
79+
if (!$message && microtime(true) - $idleStartTime > $maxIdleTime) {
80+
break 2;
81+
}
6782
// phpcs:ignore Magento2.Functions.DiscouragedFunction
68-
} while ($message === null && $this->isWaitingNextMessage() && (sleep(1) === 0));
83+
} while ($message === null && $this->isWaitingNextMessage() && (sleep($sleep) === 0));
6984

7085
if ($message === null) {
7186
break;

lib/internal/Magento/Framework/MessageQueue/CallbackInvokerInterface.php

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
namespace Magento\Framework\MessageQueue;
99

1010
/**
11-
* Callback invoker interface
11+
* Callback invoker interface. Invoke callbacks for consumer classes.
1212
*/
1313
interface CallbackInvokerInterface
1414
{
@@ -18,7 +18,15 @@ interface CallbackInvokerInterface
1818
* @param QueueInterface $queue
1919
* @param int $maxNumberOfMessages
2020
* @param \Closure $callback
21+
* @param int|null $maxIdleTime
22+
* @param int|null $sleep
2123
* @return void
2224
*/
23-
public function invoke(QueueInterface $queue, $maxNumberOfMessages, $callback);
25+
public function invoke(
26+
QueueInterface $queue,
27+
$maxNumberOfMessages,
28+
$callback,
29+
$maxIdleTime = null,
30+
$sleep = null
31+
);
2432
}

lib/internal/Magento/Framework/MessageQueue/Config/Consumer/ConfigReaderPlugin.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,10 @@ private function getConsumerConfigDataFromQueueConfig()
6868
'consumerInstance' => $consumerData['instance_type'],
6969
'handlers' => $handlers,
7070
'connection' => $consumerData['connection'],
71-
'maxMessages' => $consumerData['max_messages']
71+
'maxMessages' => $consumerData['max_messages'],
72+
'maxIdleTime' => null,
73+
'sleep' => null,
74+
'onlySpawnWhenMessageAvailable' => false
7275
];
7376
}
7477

lib/internal/Magento/Framework/MessageQueue/Consumer.php

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,18 @@ public function __construct(
108108
public function process($maxNumberOfMessages = null)
109109
{
110110
$queue = $this->configuration->getQueue();
111-
111+
$maxIdleTime = $this->configuration->getMaxIdleTime();
112+
$sleep = $this->configuration->getSleep();
112113
if (!isset($maxNumberOfMessages)) {
113114
$queue->subscribe($this->getTransactionCallback($queue));
114115
} else {
115-
$this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue));
116+
$this->invoker->invoke(
117+
$queue,
118+
$maxNumberOfMessages,
119+
$this->getTransactionCallback($queue),
120+
$maxIdleTime,
121+
$sleep
122+
);
116123
}
117124
}
118125

0 commit comments

Comments
 (0)