Skip to content

Commit 61a1b8b

Browse files
committed
Merge branch 'B2B-2176' into foxes-pr
2 parents 8ecb5d0 + d1ae71e commit 61a1b8b

File tree

8 files changed

+119
-24
lines changed

8 files changed

+119
-24
lines changed

app/code/Magento/AsynchronousOperations/Model/MassPublisher.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public function __construct(
7979
}
8080

8181
/**
82-
* {@inheritdoc}
82+
* @inheritdoc
8383
*/
8484
public function publish($topicName, $data)
8585
{
@@ -91,6 +91,7 @@ public function publish($topicName, $data)
9191
[
9292
'body' => $message,
9393
'properties' => [
94+
'topic_name' => $topicName,
9495
'delivery_mode' => 2,
9596
'message_id' => $this->messageIdGenerator->generate($topicName),
9697
]

app/code/Magento/MysqlMq/Model/Driver/Bulk/Exchange.php

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
* Copyright © Magento, Inc. All rights reserved.
44
* See COPYING.txt for license details.
55
*/
6+
declare(strict_types=1);
67

78
namespace Magento\MysqlMq\Model\Driver\Bulk;
89

910
use Magento\Framework\MessageQueue\Bulk\ExchangeInterface;
11+
use Magento\Framework\MessageQueue\Topology\Config\ExchangeConfigItem\BindingInterface;
1012
use Magento\Framework\MessageQueue\Topology\ConfigInterface as MessageQueueConfig;
1113
use Magento\MysqlMq\Model\ConnectionTypeResolver;
1214
use Magento\MysqlMq\Model\QueueManagement;
@@ -59,8 +61,7 @@ public function enqueue($topic, array $envelopes)
5961
$connection = $exchange->getConnection();
6062
if ($this->connectionTypeResolver->getConnectionType($connection)) {
6163
foreach ($exchange->getBindings() as $binding) {
62-
// This only supports exact matching of topics.
63-
if ($binding->getTopic() === $topic) {
64+
if ($this->isMatchedBinding($binding, $topic)) {
6465
$queueNames[] = $binding->getDestination();
6566
}
6667
}
@@ -77,4 +78,17 @@ function ($envelope) {
7778

7879
return null;
7980
}
81+
82+
/**
83+
* Check if the binding is matched by the topic
84+
*
85+
* @param BindingInterface $binding
86+
* @param string $topic
87+
* @return bool
88+
*/
89+
private function isMatchedBinding(BindingInterface $binding, string $topic): bool
90+
{
91+
$pattern = '/^' . str_replace(['.', '*', '#'], ['\.', '[^.]+?', '(.*?)'], $binding->getTopic()) . '$/';
92+
return preg_match($pattern, $topic) ? true : false;
93+
}
8094
}

app/code/Magento/WebapiAsync/Code/Generator/Config/RemoteServiceReader/Publisher.php

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
namespace Magento\WebapiAsync\Code\Generator\Config\RemoteServiceReader;
1010

1111
use Magento\AsynchronousOperations\Model\ConfigInterface as WebApiAsyncConfig;
12+
use Magento\Framework\App\ObjectManager;
13+
use Magento\Framework\MessageQueue\DefaultValueProvider;
1214

1315
/**
1416
* Remote service reader with auto generated configuration for queue_publisher.xml
@@ -20,15 +22,24 @@ class Publisher implements \Magento\Framework\Config\ReaderInterface
2022
*/
2123
private $webapiAsyncConfig;
2224

25+
/**
26+
* @var DefaultValueProvider
27+
*/
28+
private $defaultValueProvider;
29+
2330
/**
2431
* Initialize dependencies.
2532
*
2633
* @param WebApiAsyncConfig $webapiAsyncConfig
34+
* @param DefaultValueProvider|null $defaultValueProvider
2735
*/
2836
public function __construct(
29-
WebApiAsyncConfig $webapiAsyncConfig
37+
WebApiAsyncConfig $webapiAsyncConfig,
38+
DefaultValueProvider $defaultValueProvider = null
3039
) {
3140
$this->webapiAsyncConfig = $webapiAsyncConfig;
41+
$this->defaultValueProvider = $defaultValueProvider
42+
?? ObjectManager::getInstance()->get(DefaultValueProvider::class);
3243
}
3344

3445
/**
@@ -49,8 +60,8 @@ public function read($scope = null)
4960
'topic' => $topicName,
5061
'disabled' => false,
5162
'connections' => [
52-
'amqp' => [
53-
'name' => 'amqp',
63+
$this->defaultValueProvider->getConnection() => [
64+
'name' => $this->defaultValueProvider->getConnection(),
5465
'exchange' => 'magento',
5566
'disabled' => false,
5667
],

app/code/Magento/WebapiAsync/etc/queue_consumer.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@
77
-->
88
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
99
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
10-
<consumer name="async.operations.all" queue="async.operations.all" connection="amqp"
10+
<consumer name="async.operations.all" queue="async.operations.all"
1111
consumerInstance="Magento\AsynchronousOperations\Model\MassConsumer"/>
1212
</config>

app/code/Magento/WebapiAsync/etc/queue_topology.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
*/
77
-->
88
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
9-
<exchange name="magento" connection="amqp">
9+
<exchange name="magento">
1010
<binding id="async.operations.all" topic="async.#" destination="async.operations.all"/>
1111
</exchange>
1212
</config>

dev/tests/integration/etc/post-install-setup-command-config.php.dist

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,9 @@ return [
1616
'--remote-storage-region' => 'us-east-1'
1717
*/
1818
]
19+
],
20+
[
21+
'command' => 'setup:upgrade',
22+
'config' => []
1923
]
2024
];
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
declare(strict_types=1);
7+
8+
namespace Magento\TestFramework\MessageQueue;
9+
10+
use Magento\Framework\Exception\LocalizedException;
11+
use Magento\Framework\MessageQueue\Consumer\Config\ConsumerConfigItemInterface;
12+
use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfig;
13+
use Magento\Framework\MessageQueue\ConsumerFactory;
14+
use Magento\Framework\MessageQueue\QueueRepository;
15+
16+
/**
17+
* The processor to clear message queue
18+
*/
19+
class ClearQueueProcessor
20+
{
21+
/**
22+
* @var ConsumerConfig
23+
*/
24+
private $consumerConfig;
25+
26+
/**
27+
* @var ConsumerFactory
28+
*/
29+
private $consumerFactory;
30+
31+
/**
32+
* @var QueueRepository
33+
*/
34+
private $queueRepository;
35+
36+
/**
37+
* ClearQueueProcessor constructor.
38+
*
39+
* @param ConsumerConfig $consumerConfig
40+
* @param ConsumerFactory $consumerFactory
41+
* @param QueueRepository $queueRepository
42+
*/
43+
public function __construct(
44+
ConsumerConfig $consumerConfig,
45+
ConsumerFactory $consumerFactory,
46+
QueueRepository $queueRepository
47+
) {
48+
$this->consumerConfig = $consumerConfig;
49+
$this->consumerFactory = $consumerFactory;
50+
$this->queueRepository = $queueRepository;
51+
}
52+
53+
/**
54+
* Clear queue
55+
*
56+
* @param string $consumerName
57+
* @throws LocalizedException
58+
* return void
59+
*/
60+
public function execute(string $consumerName): void
61+
{
62+
/** @var ConsumerConfigItemInterface $consumerConfig */
63+
$consumerConfig = $this->consumerConfig->getConsumer($consumerName);
64+
$queue = $this->queueRepository->get($consumerConfig->getConnection(), $consumerConfig->getQueue());
65+
while ($message = $queue->dequeue()) {
66+
$queue->acknowledge($message);
67+
}
68+
}
69+
}

dev/tests/integration/framework/Magento/TestFramework/MessageQueue/PublisherConsumerController.php

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,9 @@
1010

1111
use Magento\Framework\MessageQueue\PublisherInterface;
1212
use Magento\Framework\OsInfo;
13+
use Magento\TestFramework\Helper\Bootstrap;
1314
use Magento\TestFramework\Helper\Amqp;
1415

15-
/**
16-
* Publisher Consumer Controller
17-
*/
1816
class PublisherConsumerController
1917
{
2018
/**
@@ -52,6 +50,11 @@ class PublisherConsumerController
5250
*/
5351
private $amqpHelper;
5452

53+
/**
54+
* @var ClearQueueProcessor
55+
*/
56+
private $clearQueueProcessor;
57+
5558
/**
5659
* PublisherConsumerController constructor.
5760
* @param PublisherInterface $publisher
@@ -61,6 +64,7 @@ class PublisherConsumerController
6164
* @param array $consumers
6265
* @param array $appInitParams
6366
* @param null|int $maxMessages
67+
* @param ClearQueueProcessor $clearQueueProcessor
6468
*/
6569
public function __construct(
6670
PublisherInterface $publisher,
@@ -69,7 +73,8 @@ public function __construct(
6973
$logFilePath,
7074
$consumers,
7175
$appInitParams,
72-
$maxMessages = null
76+
$maxMessages = null,
77+
ClearQueueProcessor $clearQueueProcessor = null
7378
) {
7479
$this->consumers = $consumers;
7580
$this->publisher = $publisher;
@@ -78,6 +83,8 @@ public function __construct(
7883
$this->osInfo = $osInfo;
7984
$this->appInitParams = $appInitParams;
8085
$this->amqpHelper = $amqpHelper;
86+
$this->clearQueueProcessor = $clearQueueProcessor
87+
?: Bootstrap::getObjectManager()->get(ClearQueueProcessor::class);
8188
}
8289

8390
/**
@@ -90,12 +97,7 @@ public function initialize()
9097
{
9198
$this->validateEnvironmentPreconditions();
9299

93-
$connections = $this->amqpHelper->getConnections();
94-
foreach (array_keys($connections) as $connectionName) {
95-
$this->amqpHelper->deleteConnection($connectionName);
96-
}
97-
$this->amqpHelper->clearQueue("async.operations.all");
98-
100+
$this->clearQueueProcessor->execute("async.operations.all");
99101
$this->stopConsumers();
100102
$this->startConsumers();
101103

@@ -123,12 +125,6 @@ private function validateEnvironmentPreconditions()
123125
"This test relies on *nix shell and should be skipped in Windows environment."
124126
);
125127
}
126-
127-
if (!$this->amqpHelper->isAvailable()) {
128-
throw new PreconditionFailedException(
129-
'This test relies on RabbitMQ Management Plugin.'
130-
);
131-
}
132128
}
133129

134130
/**

0 commit comments

Comments
 (0)