Skip to content

Commit 15a194e

Browse files
committed
MC-18477: Deadlocks when consumers run from cron
1 parent 2b26598 commit 15a194e

File tree

5 files changed

+60
-48
lines changed

5 files changed

+60
-48
lines changed

app/code/Magento/MessageQueue/Console/StartConsumerCommand.php

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public function __construct(
6464
}
6565

6666
/**
67-
* {@inheritdoc}
67+
* @inheritdoc
6868
*/
6969
protected function execute(InputInterface $input, OutputInterface $output)
7070
{
@@ -79,13 +79,13 @@ protected function execute(InputInterface $input, OutputInterface $output)
7979

8080
$singleThread = $input->getOption(self::OPTION_SINGLE_THREAD);
8181

82-
if ($singleThread && $this->lockManager->isLocked(md5($consumerName))) {
82+
if ($singleThread && $this->lockManager->isLocked(md5($consumerName))) { //phpcs:ignore
8383
$output->writeln('<error>Consumer with the same name is running</error>');
8484
return \Magento\Framework\Console\Cli::RETURN_FAILURE;
8585
}
8686

8787
if ($singleThread) {
88-
$this->lockManager->lock(md5($consumerName));
88+
$this->lockManager->lock(md5($consumerName)); //phpcs:ignore
8989
}
9090

9191
$this->appState->setAreaCode($areaCode ?? 'global');
@@ -94,14 +94,14 @@ protected function execute(InputInterface $input, OutputInterface $output)
9494
$consumer->process($numberOfMessages);
9595

9696
if ($singleThread) {
97-
$this->lockManager->unlock(md5($consumerName));
97+
$this->lockManager->unlock(md5($consumerName)); //phpcs:ignore
9898
}
9999

100100
return \Magento\Framework\Console\Cli::RETURN_SUCCESS;
101101
}
102102

103103
/**
104-
* {@inheritdoc}
104+
* @inheritdoc
105105
*/
106106
protected function configure()
107107
{
@@ -136,13 +136,13 @@ protected function configure()
136136
self::OPTION_SINGLE_THREAD,
137137
null,
138138
InputOption::VALUE_NONE,
139-
'Does not allow to run a consumer if it was run before.'
139+
'This option prevents running multiple copies of one consumer simultaneously.'
140140
);
141141
$this->addOption(
142142
self::PID_FILE_PATH,
143143
null,
144144
InputOption::VALUE_REQUIRED,
145-
'The file path for saving PID'
145+
'The file path for saving PID (This option is deprecated, use --single-thread instead)'
146146
);
147147
$this->setHelp(
148148
<<<HELP
@@ -164,7 +164,7 @@ protected function configure()
164164
165165
<comment>%command.full_name% someConsumer --area-code='adminhtml'</comment>
166166
167-
To do not allow to run a consumer if it was run before:
167+
To do not run multiple copies of one consumer simultaneously:
168168
169169
<comment>%command.full_name% someConsumer --single-thread'</comment>
170170

app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class ConsumersRunner
7171
* @param ConsumerConfigInterface $consumerConfig The consumer config provider
7272
* @param DeploymentConfig $deploymentConfig The application deployment configuration
7373
* @param ShellInterface $shellBackground The shell command line wrapper for executing command in background
74+
* @param LockManagerInterface $lockManager The lock manager
7475
* @param ConnectionTypeResolver $mqConnectionTypeResolver Consumer connection resolver
7576
* @param LoggerInterface $logger Logger
7677
*/
@@ -146,20 +147,22 @@ private function canBeRun(ConsumerConfigItemInterface $consumerConfig, array $al
146147
return false;
147148
}
148149

149-
if ($this->lockManager->isLocked(md5($consumerName))) {
150+
if ($this->lockManager->isLocked(md5($consumerName))) { //phpcs:ignore
150151
return false;
151152
}
152153

153154
$connectionName = $consumerConfig->getConnection();
154155
try {
155156
$this->mqConnectionTypeResolver->getConnectionType($connectionName);
156157
} catch (\LogicException $e) {
157-
$this->logger->info(sprintf(
158-
'Consumer "%s" skipped as required connection "%s" is not configured. %s',
159-
$consumerName,
160-
$connectionName,
161-
$e->getMessage()
162-
));
158+
$this->logger->info(
159+
sprintf(
160+
'Consumer "%s" skipped as required connection "%s" is not configured. %s',
161+
$consumerName,
162+
$connectionName,
163+
$e->getMessage()
164+
)
165+
);
163166
return false;
164167
}
165168

app/code/Magento/MessageQueue/Test/Unit/Console/StartConsumerCommandTest.php

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,16 +131,15 @@ public function testExecute(
131131

132132
$this->lockManagerMock->expects($this->exactly($isLockedExpects))
133133
->method('isLocked')
134-
->with(md5($consumerName))
134+
->with(md5($consumerName)) //phpcs:ignore
135135
->willReturn($isLocked);
136136

137137
$this->lockManagerMock->expects($this->exactly($lockExpects))
138138
->method('lock')
139-
->with(md5($consumerName));
139+
->with(md5($consumerName)); //phpcs:ignore
140140
$this->lockManagerMock->expects($this->exactly($unlockExpects))
141141
->method('unlock')
142-
->with(md5($consumerName));
143-
142+
->with(md5($consumerName)); //phpcs:ignore
144143

145144
$this->assertEquals(
146145
$expectedReturn,

app/code/Magento/MessageQueue/Test/Unit/Model/Cron/ConsumersRunnerTest.php

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
use Symfony\Component\Process\PhpExecutableFinder;
1616
use Magento\Framework\Lock\LockManagerInterface;
1717

18+
/**
19+
* Unit tests for ConsumersRunner.
20+
*/
1821
class ConsumersRunnerTest extends \PHPUnit\Framework\TestCase
1922
{
2023
/**
@@ -90,11 +93,13 @@ public function testRunDisabled()
9093
{
9194
$this->deploymentConfigMock->expects($this->once())
9295
->method('get')
93-
->willReturnMap([
94-
['cron_consumers_runner/cron_run', true, false],
95-
['cron_consumers_runner/max_messages', 10000, 10000],
96-
['cron_consumers_runner/consumers', [], []],
97-
]);
96+
->willReturnMap(
97+
[
98+
['cron_consumers_runner/cron_run', true, false],
99+
['cron_consumers_runner/max_messages', 10000, 10000],
100+
['cron_consumers_runner/consumers', [], []],
101+
]
102+
);
98103

99104
$this->consumerConfigMock->expects($this->never())
100105
->method('getConsumers');
@@ -131,11 +136,13 @@ public function testRun(
131136

132137
$this->deploymentConfigMock->expects($this->exactly(3))
133138
->method('get')
134-
->willReturnMap([
135-
['cron_consumers_runner/cron_run', true, true],
136-
['cron_consumers_runner/max_messages', 10000, $maxMessages],
137-
['cron_consumers_runner/consumers', [], $allowedConsumers],
138-
]);
139+
->willReturnMap(
140+
[
141+
['cron_consumers_runner/cron_run', true, true],
142+
['cron_consumers_runner/max_messages', 10000, $maxMessages],
143+
['cron_consumers_runner/consumers', [], $allowedConsumers],
144+
]
145+
);
139146

140147
/** @var ConsumerConfigInterface|MockObject $firstCunsumer */
141148
$consumer = $this->getMockBuilder(ConsumerConfigItemInterface::class)
@@ -154,7 +161,7 @@ public function testRun(
154161

155162
$this->lockManagerMock->expects($this->exactly($isRunExpects))
156163
->method('isLocked')
157-
->with(md5($consumerName))
164+
->with(md5($consumerName)) //phpcs:ignore
158165
->willReturn($isLocked);
159166

160167
$this->shellBackgroundMock->expects($this->exactly($shellBackgroundExpects))

dev/tests/integration/testsuite/Magento/MessageQueue/Model/Cron/ConsumersRunnerTest.php

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -97,16 +97,18 @@ protected function setUp()
9797

9898
$this->shellMock->expects($this->any())
9999
->method('execute')
100-
->willReturnCallback(function ($command, $arguments) {
101-
$command = vsprintf($command, $arguments);
102-
$params = \Magento\TestFramework\Helper\Bootstrap::getInstance()->getAppInitParams();
103-
$params['MAGE_DIRS']['base']['path'] = BP;
104-
$params = 'INTEGRATION_TEST_PARAMS="' . urldecode(http_build_query($params)) . '"';
105-
$command = str_replace('bin/magento', 'dev/tests/integration/bin/magento', $command);
106-
$command = $params . ' ' . $command;
107-
108-
return exec("{$command} >/dev/null &");
109-
});
100+
->willReturnCallback(
101+
function ($command, $arguments) {
102+
$command = vsprintf($command, $arguments);
103+
$params = \Magento\TestFramework\Helper\Bootstrap::getInstance()->getAppInitParams();
104+
$params['MAGE_DIRS']['base']['path'] = BP;
105+
$params = 'INTEGRATION_TEST_PARAMS="' . urldecode(http_build_query($params)) . '"';
106+
$command = str_replace('bin/magento', 'dev/tests/integration/bin/magento', $command);
107+
$command = $params . ' ' . $command;
108+
109+
return exec("{$command} >/dev/null &"); //phpcs:ignore
110+
}
111+
);
110112
}
111113

112114
/**
@@ -120,16 +122,16 @@ public function testSpecificConsumerAndRerun()
120122
$config = $this->config;
121123
$config['cron_consumers_runner'] = ['consumers' => [$specificConsumer], 'max_messages' => 0];
122124
$this->writeConfig($config);
123-
$this->reRunConsumersAndCheckPidFiles($specificConsumer);
124-
$this->reRunConsumersAndCheckPidFiles($specificConsumer);
125-
$this->assertTrue($this->lockManager->isLocked(md5($specificConsumer)));
125+
$this->reRunConsumersAndCheckLocks($specificConsumer);
126+
$this->reRunConsumersAndCheckLocks($specificConsumer);
127+
$this->assertTrue($this->lockManager->isLocked(md5($specificConsumer))); //phpcs:ignore
126128
}
127129

128130
/**
129131
* @param string $specificConsumer
130132
* @return void
131133
*/
132-
private function reRunConsumersAndCheckPidFiles($specificConsumer)
134+
private function reRunConsumersAndCheckLocks($specificConsumer)
133135
{
134136
$this->consumersRunner->run();
135137

@@ -139,9 +141,9 @@ private function reRunConsumersAndCheckPidFiles($specificConsumer)
139141
$consumerName = $consumer->getName();
140142

141143
if ($consumerName === $specificConsumer) {
142-
$this->assertTrue($this->lockManager->isLocked(md5($consumerName)));
144+
$this->assertTrue($this->lockManager->isLocked(md5($consumerName))); //phpcs:ignore
143145
} else {
144-
$this->assertFalse($this->lockManager->isLocked(md5($consumerName)));
146+
$this->assertFalse($this->lockManager->isLocked(md5($consumerName))); //phpcs:ignore
145147
}
146148
}
147149
}
@@ -163,7 +165,7 @@ public function testCronJobDisabled()
163165
sleep(20);
164166

165167
foreach ($this->consumerConfig->getConsumers() as $consumer) {
166-
$this->assertFalse($this->lockManager->isLocked(md5($consumer->getName())));
168+
$this->assertFalse($this->lockManager->isLocked(md5($consumer->getName()))); //phpcs:ignore
167169
}
168170
}
169171

@@ -193,7 +195,7 @@ protected function tearDown()
193195
{
194196
foreach ($this->consumerConfig->getConsumers() as $consumer) {
195197
foreach ($this->getConsumerProcessIds($consumer->getName()) as $consumerProcessId) {
196-
exec("kill {$consumerProcessId}");
198+
exec("kill {$consumerProcessId}"); //phpcs:ignore
197199
}
198200
}
199201

@@ -213,6 +215,7 @@ protected function tearDown()
213215
*/
214216
private function getConsumerProcessIds($consumer)
215217
{
218+
//phpcs:ignore
216219
exec("ps ax | grep -v grep | grep 'queue:consumers:start {$consumer}' | awk '{print $1}'", $output);
217220
return $output;
218221
}

0 commit comments

Comments
 (0)