Skip to content

Commit 96fc241

Browse files
committed
MCP-771: [MCP-608] Multiple consumers prototype development
1 parent 04fd266 commit 96fc241

File tree

2 files changed

+52
-19
lines changed

2 files changed

+52
-19
lines changed

app/code/Magento/MessageQueue/Console/StartConsumerCommand.php

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class StartConsumerCommand extends Command
2323
const OPTION_BATCH_SIZE = 'batch-size';
2424
const OPTION_AREACODE = 'area-code';
2525
const OPTION_SINGLE_THREAD = 'single-thread';
26+
const OPTION_MULTI_PROCESS = 'multi-process';
2627
const PID_FILE_PATH = 'pid-file-path';
2728
const COMMAND_QUEUE_CONSUMERS_START = 'queue:consumers:start';
2829

@@ -88,9 +89,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
8889

8990
$consumer = $this->consumerFactory->get($consumerName, $batchSize);
9091
$consumer->process($numberOfMessages);
91-
if ($singleThread) {
92-
$this->lockManager->unlock(md5($consumerName)); //phpcs:ignore
93-
}
92+
$this->lockManager->unlock(md5($consumerName)); //phpcs:ignore
9493

9594
return \Magento\Framework\Console\Cli::RETURN_SUCCESS;
9695
}
@@ -133,6 +132,12 @@ protected function configure()
133132
InputOption::VALUE_NONE,
134133
'This option prevents running multiple copies of one consumer simultaneously.'
135134
);
135+
$this->addOption(
136+
self::OPTION_MULTI_PROCESS,
137+
null,
138+
InputOption::VALUE_OPTIONAL,
139+
'The number of processes per consumer.'
140+
);
136141
$this->addOption(
137142
self::PID_FILE_PATH,
138143
null,
@@ -166,6 +171,9 @@ protected function configure()
166171
To save PID enter path (This option is deprecated, use --single-thread instead):
167172
168173
<comment>%command.full_name% someConsumer --pid-file-path='/var/someConsumer.pid'</comment>
174+
To define the number of processes per consumer:
175+
176+
<comment>%command.full_name% someConsumer --multi-process'</comment>
169177
HELP
170178
);
171179
parent::configure();

app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ public function __construct(
111111
public function run()
112112
{
113113
$runByCron = $this->deploymentConfig->get('cron_consumers_runner/cron_run', true);
114+
$multipleProcesses = $this->deploymentConfig->get('cron_consumers_runner/multiple_processes', []);
114115

115116
if (!$runByCron) {
116117
return;
@@ -125,19 +126,47 @@ public function run()
125126
continue;
126127
}
127128

128-
$arguments = [
129-
$consumer->getName(),
130-
'--single-thread'
131-
];
132-
133-
if ($maxMessages) {
134-
$arguments[] = '--max-messages=' . min($consumer->getMaxMessages() ?? $maxMessages, $maxMessages);
129+
if (array_key_exists($consumer->getName(), $multipleProcesses)) {
130+
$numberOfProcesses = $multipleProcesses[$consumer->getName()];
131+
132+
for ($i = 1; $i <= $numberOfProcesses; $i++) {
133+
if ($this->lockManager->isLocked(md5($consumer->getName()))) { //phpcs:ignore
134+
continue;
135+
}
136+
$arguments = [
137+
$consumer->getName(),
138+
'--multi-process=' . $i
139+
];
140+
141+
if ($maxMessages) {
142+
$arguments[] = '--max-messages=' . min($consumer->getMaxMessages() ?? $maxMessages, $maxMessages);
143+
}
144+
145+
$command = $php . ' ' . BP . '/bin/magento queue:consumers:start %s %s'
146+
. ($maxMessages ? ' %s' : '');
147+
148+
$this->shellBackground->execute($command, $arguments);
149+
}
150+
$this->lockManager->lock(md5($consumer->getName()));
151+
} else {
152+
if ($this->lockManager->isLocked(md5($consumer->getName()))) { //phpcs:ignore
153+
return false;
154+
}
155+
156+
$arguments = [
157+
$consumer->getName(),
158+
'--single-thread'
159+
];
160+
161+
if ($maxMessages) {
162+
$arguments[] = '--max-messages=' . min($consumer->getMaxMessages() ?? $maxMessages, $maxMessages);
163+
}
164+
165+
$command = $php . ' ' . BP . '/bin/magento queue:consumers:start %s %s'
166+
. ($maxMessages ? ' %s' : '');
167+
168+
$this->shellBackground->execute($command, $arguments);
135169
}
136-
137-
$command = $php . ' ' . BP . '/bin/magento queue:consumers:start %s %s'
138-
. ($maxMessages ? ' %s' : '');
139-
140-
$this->shellBackground->execute($command, $arguments);
141170
}
142171
}
143172

@@ -157,10 +186,6 @@ private function canBeRun(ConsumerConfigItemInterface $consumerConfig, array $al
157186
return false;
158187
}
159188

160-
if ($this->lockManager->isLocked(md5($consumerName))) { //phpcs:ignore
161-
return false;
162-
}
163-
164189
$connectionName = $consumerConfig->getConnection();
165190
try {
166191
$this->mqConnectionTypeResolver->getConnectionType($connectionName);

0 commit comments

Comments
 (0)