Skip to content

Commit a7f402f

Browse files
committed
MCP-771: [MCP-608] Multiple consumers prototype development
1 parent e618a1d commit a7f402f

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed

app/code/Magento/MessageQueue/Console/StartConsumerCommand.php

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ protected function execute(InputInterface $input, OutputInterface $output)
7676
}
7777

7878
$singleThread = $input->getOption(self::OPTION_SINGLE_THREAD);
79+
$multiProcess = $input->getOption(self::OPTION_MULTI_PROCESS);
80+
81+
if ($multiProcess && !$this->lockManager->lock(md5($consumerName . '-' . $multiProcess),0)) { //phpcs:ignore
82+
$output->writeln('<error>Consumer with the same name is running</error>');
83+
return \Magento\Framework\Console\Cli::RETURN_FAILURE;
84+
}
7985

8086
if ($singleThread && !$this->lockManager->lock(md5($consumerName),0)) { //phpcs:ignore
8187
$output->writeln('<error>Consumer with the same name is running</error>');
@@ -86,7 +92,13 @@ protected function execute(InputInterface $input, OutputInterface $output)
8692

8793
$consumer = $this->consumerFactory->get($consumerName, $batchSize);
8894
$consumer->process($numberOfMessages);
89-
$this->lockManager->unlock(md5($consumerName)); //phpcs:ignore
95+
96+
if ($singleThread) {
97+
$this->lockManager->unlock(md5($consumerName)); //phpcs:ignore
98+
}
99+
if ($multiProcess) {
100+
$this->lockManager->unlock(md5($consumerName . '-' . $multiProcess)); //phpcs:ignore
101+
}
90102

91103
return \Magento\Framework\Console\Cli::RETURN_SUCCESS;
92104
}

app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public function run()
128128
$numberOfProcesses = $multipleProcesses[$consumer->getName()];
129129

130130
for ($i = 1; $i <= $numberOfProcesses; $i++) {
131-
if ($this->lockManager->isLocked(md5($consumer->getName()))) { //phpcs:ignore
131+
if ($this->lockManager->isLocked(md5($consumer->getName() . '-' . $i))) { //phpcs:ignore
132132
continue;
133133
}
134134
$arguments = [
@@ -146,7 +146,6 @@ public function run()
146146

147147
$this->shellBackground->execute($command, $arguments);
148148
}
149-
$this->lockManager->lock(md5($consumer->getName()));
150149
} else {
151150
if ($this->lockManager->isLocked(md5($consumer->getName()))) { //phpcs:ignore
152151
return false;

0 commit comments

Comments
 (0)