Skip to content

Commit 68f81e0

Browse files
BaDosshiftedreality
authored andcommitted
MAGECLOUD-4071: Re-work consumers to terminate as soon as there is nothing left to process (#594)
1 parent ec1f037 commit 68f81e0

14 files changed

+605
-40
lines changed

dist/.magento.env.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,24 @@
307307
# number_of_shards: 3 #
308308
# number_of_replicas: 3 #
309309
#######################################################################################################################
310+
# CONSUMERS_WAIT_FOR_MAX_MESSAGES - use this variable to configure how consumers process messages #
311+
# If this value is true, a consumer waits to process the number of messages #
312+
# (max_messages) from the message queue specified in the CRONS_CONSUMERS_RUNNER #
313+
# variable before closing the connection and terminating consumer process. If you #
314+
# use workers to run consumers instead of using a cron job, set #
315+
# the CONSUMERS_WAIT_FOR_MAX_MESSAGES variable to true. #
316+
# If this value is false, consumers process available messages in the queue, close #
317+
# the TCP connection and terminate. Consumers do not wait for additional messages #
318+
# to enter the queue, even if the number of processed messages is less than #
319+
# the max_messages value. #
320+
# Magento Version: 2.2.0 and later #
321+
# Default value: false #
322+
# Stages: deploy #
323+
# Example: #
324+
# stage: #
325+
# deploy: #
326+
# CONSUMERS_WAIT_FOR_MAX_MESSAGES: true #
327+
#######################################################################################################################
310328
# CRON_CONSUMERS_RUNNER - use this variable to make sure message queues are running after a deployment. #
311329
# By default, the deployment process overwrites all settings in the env.php file #
312330
# cron_run — a boolean value that enables or disables the consumers_runner cron job. #

patches.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@
176176
"2.2.0": "MAGECLOUD-3806__error_code_fix_for_setup_upgrade__2.2.0.patch",
177177
"2.2.1 - 2.2.9": "MAGECLOUD-3806__error_code_fix_for_setup_upgrade__2.2.1.patch",
178178
"2.3.0 - 2.3.2": "MAGECLOUD-3806__error_code_fix_for_setup_upgrade__2.3.0.patch"
179+
},
180+
"Re-work consumers to terminate as soon as there is nothing left to process": {
181+
"2.2.0 - 2.3.1": "MAGECLOUD-4071__terminate_consumers_if_the_queue_is_empty__2.2.0.patch",
182+
"2.3.2 - 2.3.3": "MAGECLOUD-4071__terminate_consumers_if_the_queue_is_empty__2.3.2.patch"
179183
}
180184
},
181185
"monolog/monolog": {
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
diff -Naur a/vendor/magento/framework-message-queue/CallbackInvoker.php b/vendor/magento/framework-message-queue/CallbackInvoker.php
2+
--- a/vendor/magento/framework-message-queue/CallbackInvoker.php
3+
+++ b/vendor/magento/framework-message-queue/CallbackInvoker.php
4+
@@ -6,11 +6,28 @@
5+
6+
namespace Magento\Framework\MessageQueue;
7+
8+
+use Magento\Framework\App\DeploymentConfig;
9+
+
10+
/**
11+
* Class CallbackInvoker to invoke callbacks for consumer classes
12+
*/
13+
class CallbackInvoker
14+
{
15+
+ /**
16+
+ * @var DeploymentConfig
17+
+ */
18+
+ private $deploymentConfig;
19+
+
20+
+ /**
21+
+ * CallbackInvoker constructor.
22+
+ * @param DeploymentConfig $deploymentConfig
23+
+ */
24+
+ public function __construct(
25+
+ DeploymentConfig $deploymentConfig
26+
+ ) {
27+
+ $this->deploymentConfig = $deploymentConfig;
28+
+ }
29+
+
30+
/**
31+
* Run short running process
32+
*
33+
@@ -24,8 +41,23 @@ class CallbackInvoker
34+
for ($i = $maxNumberOfMessages; $i > 0; $i--) {
35+
do {
36+
$message = $queue->dequeue();
37+
- } while ($message === null && (sleep(1) === 0));
38+
+ } while ($message === null && $this->isWaitingNextMessage() && (sleep(1) === 0));
39+
+
40+
+ if ($message === null) {
41+
+ break;
42+
+ }
43+
+
44+
$callback($message);
45+
}
46+
}
47+
+
48+
+ /**
49+
+ * Checks if consumers should wait for message from the queue
50+
+ *
51+
+ * @return bool
52+
+ */
53+
+ private function isWaitingNextMessage(): bool
54+
+ {
55+
+ return $this->deploymentConfig->get('queue/consumers_wait_for_messages', 1) === 1;
56+
+ }
57+
}
58+
diff -Naur a/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php b/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php
59+
new file mode 100644
60+
--- /dev/null
61+
+++ b/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php
62+
@@ -0,0 +1,108 @@
63+
+<?php
64+
+/**
65+
+ * Copyright © Magento, Inc. All rights reserved.
66+
+ * See COPYING.txt for license details.
67+
+ */
68+
+declare(strict_types=1);
69+
+
70+
+namespace Magento\MessageQueue\Setup;
71+
+
72+
+use Magento\Framework\Setup\ConfigOptionsListInterface;
73+
+use Magento\Framework\Setup\Option\SelectConfigOption;
74+
+use Magento\Framework\App\DeploymentConfig;
75+
+use Magento\Framework\Config\Data\ConfigData;
76+
+use Magento\Framework\Config\File\ConfigFilePool;
77+
+
78+
+/**
79+
+ * Deployment configuration consumers options needed for Setup application
80+
+ */
81+
+class ConfigOptionsList implements ConfigOptionsListInterface
82+
+{
83+
+ /**
84+
+ * Input key for the option
85+
+ */
86+
+ const INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES ='consumers-wait-for-messages';
87+
+
88+
+ /**
89+
+ * Path to the value in the deployment config
90+
+ */
91+
+ const CONFIG_PATH_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES = 'queue/consumers_wait_for_messages';
92+
+
93+
+ /**
94+
+ * Default value
95+
+ */
96+
+ const DEFAULT_CONSUMERS_WAIT_FOR_MESSAGES = 1;
97+
+
98+
+ /**
99+
+ * The available configuration values
100+
+ *
101+
+ * @var array
102+
+ */
103+
+ private $selectOptions = [0, 1];
104+
+
105+
+ /**
106+
+ * @inheritdoc
107+
+ */
108+
+ public function getOptions()
109+
+ {
110+
+ return [
111+
+ new SelectConfigOption(
112+
+ self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES,
113+
+ SelectConfigOption::FRONTEND_WIZARD_SELECT,
114+
+ $this->selectOptions,
115+
+ self::CONFIG_PATH_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES,
116+
+ 'Should consumers wait for a message from the queue? 1 - Yes, 0 - No',
117+
+ self::DEFAULT_CONSUMERS_WAIT_FOR_MESSAGES
118+
+ ),
119+
+ ];
120+
+ }
121+
+
122+
+ /**
123+
+ * @inheritdoc
124+
+ * @SuppressWarnings(PHPMD.UnusedFormalParameter)
125+
+ */
126+
+ public function createConfig(array $data, DeploymentConfig $deploymentConfig)
127+
+ {
128+
+ $configData = new ConfigData(ConfigFilePool::APP_ENV);
129+
+
130+
+ if (!$this->isDataEmpty($data, self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES)) {
131+
+ $configData->set(
132+
+ self::CONFIG_PATH_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES,
133+
+ (int)$data[self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES]
134+
+ );
135+
+ }
136+
+
137+
+ return [$configData];
138+
+ }
139+
+
140+
+ /**
141+
+ * @inheritdoc
142+
+ */
143+
+ public function validate(array $options, DeploymentConfig $deploymentConfig)
144+
+ {
145+
+ $errors = [];
146+
+
147+
+ if (!$this->isDataEmpty($options, self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES)
148+
+ && !in_array($options[self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES], $this->selectOptions)) {
149+
+ $errors[] = 'You can use only 1 or 0 for ' . self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES . ' option';
150+
+ }
151+
+
152+
+ return $errors;
153+
+ }
154+
+
155+
+ /**
156+
+ * Check if data ($data) with key ($key) is empty
157+
+ *
158+
+ * @param array $data
159+
+ * @param string $key
160+
+ * @return bool
161+
+ */
162+
+ private function isDataEmpty(array $data, $key)
163+
+ {
164+
+ if (isset($data[$key]) && $data[$key] !== '') {
165+
+ return false;
166+
+ }
167+
+
168+
+ return true;
169+
+ }
170+
+}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
diff -Naur a/vendor/magento/framework-message-queue/CallbackInvoker.php b/vendor/magento/framework-message-queue/CallbackInvoker.php
2+
--- a/vendor/magento/framework-message-queue/CallbackInvoker.php
3+
+++ b/vendor/magento/framework-message-queue/CallbackInvoker.php
4+
@@ -8,6 +8,7 @@ namespace Magento\Framework\MessageQueue;
5+
6+
use Magento\Framework\MessageQueue\PoisonPill\PoisonPillCompareInterface;
7+
use Magento\Framework\MessageQueue\PoisonPill\PoisonPillReadInterface;
8+
+use Magento\Framework\App\DeploymentConfig;
9+
10+
/**
11+
* Class CallbackInvoker to invoke callbacks for consumer classes
12+
@@ -29,16 +30,24 @@ class CallbackInvoker implements CallbackInvokerInterface
13+
*/
14+
private $poisonPillCompare;
15+
16+
+ /**
17+
+ * @var DeploymentConfig
18+
+ */
19+
+ private $deploymentConfig;
20+
+
21+
/**
22+
* @param PoisonPillReadInterface $poisonPillRead
23+
* @param PoisonPillCompareInterface $poisonPillCompare
24+
+ * @param DeploymentConfig $deploymentConfig
25+
*/
26+
public function __construct(
27+
PoisonPillReadInterface $poisonPillRead,
28+
- PoisonPillCompareInterface $poisonPillCompare
29+
+ PoisonPillCompareInterface $poisonPillCompare,
30+
+ DeploymentConfig $deploymentConfig
31+
) {
32+
$this->poisonPillRead = $poisonPillRead;
33+
$this->poisonPillCompare = $poisonPillCompare;
34+
+ $this->deploymentConfig = $deploymentConfig;
35+
}
36+
37+
/**
38+
@@ -56,13 +65,29 @@ class CallbackInvoker implements CallbackInvokerInterface
39+
do {
40+
$message = $queue->dequeue();
41+
// phpcs:ignore Magento2.Functions.DiscouragedFunction
42+
- } while ($message === null && (sleep(1) === 0));
43+
+ } while ($message === null && $this->isWaitingNextMessage() && (sleep(1) === 0));
44+
+
45+
+ if ($message === null) {
46+
+ break;
47+
+ }
48+
+
49+
if (false === $this->poisonPillCompare->isLatestVersion($this->poisonPillVersion)) {
50+
$queue->reject($message);
51+
// phpcs:ignore Magento2.Security.LanguageConstruct.ExitUsage
52+
exit(0);
53+
}
54+
+
55+
$callback($message);
56+
}
57+
}
58+
+
59+
+ /**
60+
+ * Checks if consumers should wait for message from the queue
61+
+ *
62+
+ * @return bool
63+
+ */
64+
+ private function isWaitingNextMessage(): bool
65+
+ {
66+
+ return $this->deploymentConfig->get('queue/consumers_wait_for_messages', 1) === 1;
67+
+ }
68+
}
69+
diff -Naur a/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php b/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php
70+
new file mode 100644
71+
--- /dev/null
72+
+++ b/vendor/magento/module-message-queue/Setup/ConfigOptionsList.php
73+
@@ -0,0 +1,108 @@
74+
+<?php
75+
+/**
76+
+ * Copyright © Magento, Inc. All rights reserved.
77+
+ * See COPYING.txt for license details.
78+
+ */
79+
+declare(strict_types=1);
80+
+
81+
+namespace Magento\MessageQueue\Setup;
82+
+
83+
+use Magento\Framework\Setup\ConfigOptionsListInterface;
84+
+use Magento\Framework\Setup\Option\SelectConfigOption;
85+
+use Magento\Framework\App\DeploymentConfig;
86+
+use Magento\Framework\Config\Data\ConfigData;
87+
+use Magento\Framework\Config\File\ConfigFilePool;
88+
+
89+
+/**
90+
+ * Deployment configuration consumers options needed for Setup application
91+
+ */
92+
+class ConfigOptionsList implements ConfigOptionsListInterface
93+
+{
94+
+ /**
95+
+ * Input key for the option
96+
+ */
97+
+ const INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES ='consumers-wait-for-messages';
98+
+
99+
+ /**
100+
+ * Path to the value in the deployment config
101+
+ */
102+
+ const CONFIG_PATH_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES = 'queue/consumers_wait_for_messages';
103+
+
104+
+ /**
105+
+ * Default value
106+
+ */
107+
+ const DEFAULT_CONSUMERS_WAIT_FOR_MESSAGES = 1;
108+
+
109+
+ /**
110+
+ * The available configuration values
111+
+ *
112+
+ * @var array
113+
+ */
114+
+ private $selectOptions = [0, 1];
115+
+
116+
+ /**
117+
+ * @inheritdoc
118+
+ */
119+
+ public function getOptions()
120+
+ {
121+
+ return [
122+
+ new SelectConfigOption(
123+
+ self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES,
124+
+ SelectConfigOption::FRONTEND_WIZARD_SELECT,
125+
+ $this->selectOptions,
126+
+ self::CONFIG_PATH_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES,
127+
+ 'Should consumers wait for a message from the queue? 1 - Yes, 0 - No',
128+
+ self::DEFAULT_CONSUMERS_WAIT_FOR_MESSAGES
129+
+ ),
130+
+ ];
131+
+ }
132+
+
133+
+ /**
134+
+ * @inheritdoc
135+
+ * @SuppressWarnings(PHPMD.UnusedFormalParameter)
136+
+ */
137+
+ public function createConfig(array $data, DeploymentConfig $deploymentConfig)
138+
+ {
139+
+ $configData = new ConfigData(ConfigFilePool::APP_ENV);
140+
+
141+
+ if (!$this->isDataEmpty($data, self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES)) {
142+
+ $configData->set(
143+
+ self::CONFIG_PATH_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES,
144+
+ (int)$data[self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES]
145+
+ );
146+
+ }
147+
+
148+
+ return [$configData];
149+
+ }
150+
+
151+
+ /**
152+
+ * @inheritdoc
153+
+ */
154+
+ public function validate(array $options, DeploymentConfig $deploymentConfig)
155+
+ {
156+
+ $errors = [];
157+
+
158+
+ if (!$this->isDataEmpty($options, self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES)
159+
+ && !in_array($options[self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES], $this->selectOptions)) {
160+
+ $errors[] = 'You can use only 1 or 0 for ' . self::INPUT_KEY_QUEUE_CONSUMERS_WAIT_FOR_MESSAGES . ' option';
161+
+ }
162+
+
163+
+ return $errors;
164+
+ }
165+
+
166+
+ /**
167+
+ * Check if data ($data) with key ($key) is empty
168+
+ *
169+
+ * @param array $data
170+
+ * @param string $key
171+
+ * @return bool
172+
+ */
173+
+ private function isDataEmpty(array $data, $key)
174+
+ {
175+
+ if (isset($data[$key]) && $data[$key] !== '') {
176+
+ return false;
177+
+ }
178+
+
179+
+ return true;
180+
+ }
181+
+}

src/App/Container.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ function () {
390390

391391
$this->container->when(CronKill::class)
392392
->needs(ProcessInterface::class)
393-
->give(DeployProcess\CronProcessKill::class);
393+
->give(DeployProcess\BackgroundProcessKill::class);
394394
$this->container->when(ModuleRefresh::class)
395395
->needs(ProcessInterface::class)
396396
->give(BuildProcess\RefreshModules::class);

0 commit comments

Comments
 (0)