Skip to content

Commit 2b56d90

Browse files
committed
B2B-2171: Enable WebapiAsync work with redis connection
1 parent 6e52223 commit 2b56d90

File tree

8 files changed

+43
-172
lines changed

8 files changed

+43
-172
lines changed

app/code/Magento/AsynchronousOperations/Model/MassPublisher.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public function publish($topicName, $data)
9191
[
9292
'body' => $message,
9393
'properties' => [
94+
'topic_name' => $topicName,
9495
'delivery_mode' => 2,
9596
'message_id' => $this->messageIdGenerator->generate($topicName),
9697
]

app/code/Magento/MysqlMq/Model/Driver/Bulk/Exchange.php

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
* Copyright © Magento, Inc. All rights reserved.
44
* See COPYING.txt for license details.
55
*/
6+
declare(strict_types=1);
67

78
namespace Magento\MysqlMq\Model\Driver\Bulk;
89

910
use Magento\Framework\MessageQueue\Bulk\ExchangeInterface;
11+
use Magento\Framework\MessageQueue\Topology\Config\ExchangeConfigItem\BindingInterface;
1012
use Magento\Framework\MessageQueue\Topology\ConfigInterface as MessageQueueConfig;
1113
use Magento\MysqlMq\Model\ConnectionTypeResolver;
1214
use Magento\MysqlMq\Model\QueueManagement;
@@ -59,8 +61,7 @@ public function enqueue($topic, array $envelopes)
5961
$connection = $exchange->getConnection();
6062
if ($this->connectionTypeResolver->getConnectionType($connection)) {
6163
foreach ($exchange->getBindings() as $binding) {
62-
// This only supports exact matching of topics.
63-
if ($binding->getTopic() === $topic) {
64+
if ($this->isMatchedBinding($binding, $topic)) {
6465
$queueNames[] = $binding->getDestination();
6566
}
6667
}
@@ -77,4 +78,17 @@ function ($envelope) {
7778

7879
return null;
7980
}
81+
82+
/**
83+
* Check if the binding is matched by the topic
84+
*
85+
* @param BindingInterface $binding
86+
* @param string $topic
87+
* @return bool
88+
*/
89+
private function isMatchedBinding(BindingInterface $binding, string $topic): bool
90+
{
91+
$pattern = '/^' . str_replace (['.', '*', '#'], ['\.', '[^.]+?', '(.*?)'], $binding->getTopic()) . '$/';
92+
return preg_match($pattern, $topic) ? true : false;
93+
}
8094
}

app/code/Magento/WebapiAsync/Code/Generator/Config/RemoteServiceReader/Consumer.php

Lines changed: 0 additions & 71 deletions
This file was deleted.

app/code/Magento/WebapiAsync/Code/Generator/Config/RemoteServiceReader/Publisher.php

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ public function read($scope = null)
6060
'topic' => $topicName,
6161
'disabled' => false,
6262
'connections' => [
63-
$this->getConnection() => [
64-
'name' => $this->getConnection(),
63+
$this->defaultValueProvider->getConnection() => [
64+
'name' => $this->defaultValueProvider->getConnection(),
6565
'exchange' => 'magento',
6666
'disabled' => false,
6767
],
@@ -71,16 +71,4 @@ public function read($scope = null)
7171

7272
return $result;
7373
}
74-
75-
/**
76-
* Get connection
77-
*
78-
* @return string
79-
*/
80-
private function getConnection()
81-
{
82-
$connection = $this->defaultValueProvider->getConnection();
83-
// if db connection, return amqp instead.
84-
return $connection === 'db' ? WebApiAsyncConfig::DEFAULT_CONSUMER_CONNECTION : $connection;
85-
}
8674
}

app/code/Magento/WebapiAsync/Code/Generator/Config/RemoteServiceReader/Topology.php

Lines changed: 0 additions & 83 deletions
This file was deleted.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
/**
4+
* Copyright © Magento, Inc. All rights reserved.
5+
* See COPYING.txt for license details.
6+
*/
7+
-->
8+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
9+
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
10+
<consumer name="async.operations.all" queue="async.operations.all"
11+
consumerInstance="Magento\AsynchronousOperations\Model\MassConsumer"/>
12+
</config>
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
/**
4+
* Copyright © Magento, Inc. All rights reserved.
5+
* See COPYING.txt for license details.
6+
*/
7+
-->
8+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
9+
<exchange name="magento">
10+
<binding id="async.operations.all" topic="async.#" destination="async.operations.all"/>
11+
</exchange>
12+
</config>

app/etc/di.xml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1682,7 +1682,6 @@
16821682
<type name="Magento\Framework\MessageQueue\Consumer\Config\CompositeReader">
16831683
<arguments>
16841684
<argument name="readers" xsi:type="array">
1685-
<item name="asyncServiceReader" xsi:type="object" sortOrder="0">Magento\WebapiAsync\Code\Generator\Config\RemoteServiceReader\Consumer</item>
16861685
<item name="xmlReader" xsi:type="object" sortOrder="10">Magento\Framework\MessageQueue\Consumer\Config\Xml\Reader</item>
16871686
<item name="envReader" xsi:type="object" sortOrder="20">Magento\Framework\MessageQueue\Consumer\Config\Env\Reader</item>
16881687
</argument>
@@ -1728,7 +1727,6 @@
17281727
<type name="Magento\Framework\MessageQueue\Topology\Config\CompositeReader">
17291728
<arguments>
17301729
<argument name="readers" xsi:type="array">
1731-
<item name="asyncServiceReader" xsi:type="object" sortOrder="0">Magento\WebapiAsync\Code\Generator\Config\RemoteServiceReader\Topology</item>
17321730
<item name="remoteServiceReader" xsi:type="object" sortOrder="10">Magento\Framework\MessageQueue\Topology\Config\RemoteService\Reader</item>
17331731
<item name="xmlReader" xsi:type="object" sortOrder="20">Magento\Framework\MessageQueue\Topology\Config\Xml\Reader</item>
17341732
</argument>

0 commit comments

Comments
 (0)