Skip to content

Commit b553bc3

Browse files
committed
ACP2E-2138: [Cloud]RabbitMQ Broken pipe or closed connection errors during indexing
1 parent 0a834cc commit b553bc3

File tree

6 files changed

+189
-26
lines changed

6 files changed

+189
-26
lines changed

app/code/Magento/Indexer/Model/ProcessManager.php

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
namespace Magento\Indexer\Model;
99

10+
use Magento\Framework\Amqp\ConfigPool as AmqpConfigPool;
1011
use Magento\Framework\App\ObjectManager;
1112
use Psr\Log\LoggerInterface;
1213

@@ -18,7 +19,7 @@ class ProcessManager
1819
/**
1920
* Threads count environment variable name
2021
*/
21-
const THREADS_COUNT = 'MAGE_INDEXER_THREADS_COUNT';
22+
public const THREADS_COUNT = 'MAGE_INDEXER_THREADS_COUNT';
2223

2324
/** @var bool */
2425
private $failInChildProcess = false;
@@ -37,17 +38,24 @@ class ProcessManager
3738
*/
3839
private $logger;
3940

41+
/**
42+
* @var AmqpConfigPool
43+
*/
44+
private AmqpConfigPool $amqpConfigPool;
45+
4046
/**
4147
* @param \Magento\Framework\App\ResourceConnection $resource
4248
* @param \Magento\Framework\Registry $registry
4349
* @param int|null $threadsCount
4450
* @param LoggerInterface|null $logger
51+
* @param AmqpConfigPool|null $amqpConfigPool
4552
*/
4653
public function __construct(
4754
\Magento\Framework\App\ResourceConnection $resource,
4855
\Magento\Framework\Registry $registry = null,
4956
int $threadsCount = null,
50-
LoggerInterface $logger = null
57+
LoggerInterface $logger = null,
58+
AmqpConfigPool $amqpConfigPool = null
5159
) {
5260
$this->resource = $resource;
5361
if (null === $registry) {
@@ -60,6 +68,7 @@ public function __construct(
6068
$this->logger = $logger ?? ObjectManager::getInstance()->get(
6169
LoggerInterface::class
6270
);
71+
$this->amqpConfigPool = $amqpConfigPool ?? ObjectManager::getInstance()->get(AmqpConfigPool::class);
6372
}
6473

6574
/**
@@ -99,6 +108,8 @@ private function simpleThreadExecute($userFunctions)
99108
private function multiThreadsExecute($userFunctions)
100109
{
101110
$this->resource->closeConnection(null);
111+
$this->amqpConfigPool->closeConnections();
112+
102113
$threadNumber = 0;
103114
foreach ($userFunctions as $userFunction) {
104115
// phpcs:ignore Magento2.Functions.DiscouragedFunction

app/code/Magento/Indexer/Test/Unit/Model/ProcessManagerTest.php

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77

88
namespace Magento\Indexer\Test\Unit\Model;
99

10+
use Magento\Framework\Amqp\ConfigPool as AmqpConfigPool;
1011
use Magento\Framework\App\ResourceConnection;
1112
use Magento\Indexer\Model\ProcessManager;
1213
use PHPUnit\Framework\TestCase;
14+
use Psr\Log\LoggerInterface;
1315

1416
/**
1517
* Class covers process manager execution test logic
@@ -28,12 +30,21 @@ class ProcessManagerTest extends TestCase
2830
public function testFailureInChildProcessHandleMultiThread(array $userFunctions, int $threadsCount): void
2931
{
3032
$connectionMock = $this->createMock(ResourceConnection::class);
33+
$loggerMock = $this->createMock(LoggerInterface::class);
34+
$amqpConfigPoolMock = $this->createMock(AmqpConfigPool::class);
3135
$processManager = new ProcessManager(
3236
$connectionMock,
3337
null,
34-
$threadsCount
38+
$threadsCount,
39+
$loggerMock,
40+
$amqpConfigPoolMock
3541
);
3642

43+
$connectionMock->expects($this->once())
44+
->method('closeConnection');
45+
$amqpConfigPoolMock->expects($this->once())
46+
->method('closeConnections');
47+
3748
try {
3849
$processManager->execute($userFunctions);
3950
$this->fail('Exception was not handled');
@@ -111,12 +122,21 @@ function () {
111122
public function testSuccessChildProcessHandleMultiThread(array $userFunctions, int $threadsCount): void
112123
{
113124
$connectionMock = $this->createMock(ResourceConnection::class);
125+
$loggerMock = $this->createMock(LoggerInterface::class);
126+
$amqpConfigPoolMock = $this->createMock(AmqpConfigPool::class);
114127
$processManager = new ProcessManager(
115128
$connectionMock,
116129
null,
117-
$threadsCount
130+
$threadsCount,
131+
$loggerMock,
132+
$amqpConfigPoolMock
118133
);
119134

135+
$connectionMock->expects($this->once())
136+
->method('closeConnection');
137+
$amqpConfigPoolMock->expects($this->once())
138+
->method('closeConnections');
139+
120140
try {
121141
$processManager->execute($userFunctions);
122142
} catch (\RuntimeException $exception) {

lib/internal/Magento/Framework/Amqp/Config.php

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,16 @@ private function createConnection(): AbstractConnection
169169
*/
170170
public function getChannel()
171171
{
172-
if (!isset($this->connection) || !isset($this->channel)) {
172+
if (!isset($this->connection)) {
173173
$this->connection = $this->createConnection();
174-
174+
}
175+
if (!isset($this->channel)
176+
|| !$this->channel->getConnection()
177+
|| !$this->channel->getConnection()->isConnected()
178+
) {
175179
$this->channel = $this->connection->channel();
176180
}
181+
177182
return $this->channel;
178183
}
179184

lib/internal/Magento/Framework/Amqp/ConfigPool.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,18 @@ public function get($connectionName)
4343
}
4444
return $this->pool[$connectionName];
4545
}
46+
47+
/**
48+
* Close all opened connections.
49+
*
50+
* @return void
51+
*/
52+
public function closeConnections(): void
53+
{
54+
foreach ($this->pool as $config) {
55+
$connection = $config->getChannel()->getConnection();
56+
$config->getChannel()->close();
57+
$connection?->close();
58+
}
59+
}
4660
}

lib/internal/Magento/Framework/Amqp/Test/Unit/ConfigPoolTest.php

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,58 @@
1010
use Magento\Framework\Amqp\Config;
1111
use Magento\Framework\Amqp\ConfigFactory;
1212
use Magento\Framework\Amqp\ConfigPool;
13+
use PhpAmqpLib\Channel\AMQPChannel;
14+
use PhpAmqpLib\Connection\AbstractConnection;
15+
use PHPUnit\Framework\MockObject\MockObject;
1316
use PHPUnit\Framework\TestCase;
1417

1518
class ConfigPoolTest extends TestCase
1619
{
20+
/**
21+
* @var ConfigFactory|MockObject
22+
*/
23+
private $factory;
24+
25+
/**
26+
* @var ConfigPool
27+
*/
28+
private $model;
29+
30+
protected function setUp(): void
31+
{
32+
$this->factory = $this->createMock(ConfigFactory::class);
33+
$this->model = new ConfigPool($this->factory);
34+
}
35+
1736
public function testGetConnection()
1837
{
19-
$factory = $this->createMock(ConfigFactory::class);
2038
$config = $this->createMock(Config::class);
21-
$factory->expects($this->once())->method('create')->with(['connectionName' => 'amqp'])->willReturn($config);
22-
$model = new ConfigPool($factory);
23-
$this->assertEquals($config, $model->get('amqp'));
39+
$this->factory->expects($this->once())->method('create')->with(['connectionName' => 'amqp'])->willReturn($config);
40+
$this->assertEquals($config, $this->model->get('amqp'));
2441
//test that object is cached
25-
$this->assertEquals($config, $model->get('amqp'));
42+
$this->assertEquals($config, $this->model->get('amqp'));
43+
}
44+
45+
public function testCloseConnections(): void
46+
{
47+
$config = $this->createMock(Config::class);
48+
$this->factory->method('create')
49+
->willReturn($config);
50+
$this->model->get('amqp');
51+
52+
$channel = $this->createMock(AMQPChannel::class);
53+
$config->expects($this->atLeastOnce())
54+
->method('getChannel')
55+
->willReturn($channel);
56+
$connection = $this->createMock(AbstractConnection::class);
57+
$channel->expects($this->atLeastOnce())
58+
->method('getConnection')
59+
->willReturn($connection);
60+
$channel->expects($this->once())
61+
->method('close');
62+
$connection->expects($this->once())
63+
->method('close');
64+
65+
$this->model->closeConnections();
2666
}
2767
}

lib/internal/Magento/Framework/Amqp/Test/Unit/ConfigTest.php

Lines changed: 88 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,20 @@
1111
use Magento\Framework\Amqp\Connection\Factory as ConnectionFactory;
1212
use Magento\Framework\Amqp\Connection\FactoryOptions;
1313
use Magento\Framework\App\DeploymentConfig;
14+
use PhpAmqpLib\Channel\AMQPChannel;
15+
use PhpAmqpLib\Connection\AbstractConnection;
1416
use PHPUnit\Framework\MockObject\MockObject;
1517
use PHPUnit\Framework\TestCase;
1618

1719
class ConfigTest extends TestCase
1820
{
21+
private const DEFAULT_CONFIG = [
22+
Config::HOST => 'localhost',
23+
Config::PORT => '5672',
24+
Config::USERNAME => 'user',
25+
Config::PASSWORD => 'pass',
26+
];
27+
1928
/**
2029
* @var MockObject
2130
*/
@@ -176,30 +185,94 @@ public function configDataProvider(): array
176185
{
177186
return [
178187
[
179-
[
180-
Config::HOST => 'localhost',
181-
Config::PORT => '5672',
182-
Config::USERNAME => 'user',
183-
Config::PASSWORD => 'pass',
184-
Config::VIRTUALHOST => '/',
185-
],
188+
self::DEFAULT_CONFIG,
186189
[
187190
'isSslEnabled' => false
188191
]
189192
],
190193
[
191-
[
192-
Config::HOST => 'localhost',
193-
Config::PORT => '5672',
194-
Config::USERNAME => 'user',
195-
Config::PASSWORD => 'pass',
196-
Config::VIRTUALHOST => '/',
197-
Config::SSL => ' true ',
198-
],
194+
self::DEFAULT_CONFIG + [Config::SSL => ' true '],
199195
[
200196
'isSslEnabled' => true
201197
]
202198
]
203199
];
204200
}
201+
202+
public function testGetChannel(): void
203+
{
204+
$this->deploymentConfigMock->expects($this->once())
205+
->method('getConfigData')
206+
->with(Config::QUEUE_CONFIG)
207+
->willReturn([Config::AMQP_CONFIG => self::DEFAULT_CONFIG]);
208+
$connectionMock = $this->createMock(AbstractConnection::class);
209+
$this->connectionFactory->expects($this->once())
210+
->method('create')
211+
->willReturn($connectionMock);
212+
213+
$channelMock = $this->createMock(AMQPChannel::class);
214+
$connectionMock->expects($this->once())
215+
->method('channel')
216+
->willReturn($channelMock);
217+
$channelMock->expects($this->atLeastOnce())
218+
->method('getConnection')
219+
->willReturn($connectionMock);
220+
$connectionMock->expects($this->atLeastOnce())
221+
->method('isConnected')
222+
->willReturn(true);
223+
224+
$this->assertEquals($channelMock, $this->amqpConfig->getChannel());
225+
$this->assertEquals($channelMock, $this->amqpConfig->getChannel());
226+
}
227+
228+
public function testGetChannelWithoutConnection(): void
229+
{
230+
$this->deploymentConfigMock->expects($this->once())
231+
->method('getConfigData')
232+
->with(Config::QUEUE_CONFIG)
233+
->willReturn([Config::AMQP_CONFIG => self::DEFAULT_CONFIG]);
234+
$connectionMock = $this->createMock(AbstractConnection::class);
235+
$this->connectionFactory->expects($this->once())
236+
->method('create')
237+
->willReturn($connectionMock);
238+
239+
$channel1Mock = $this->createMock(AMQPChannel::class);
240+
$channel2Mock = $this->createMock(AMQPChannel::class);
241+
$connectionMock->expects($this->exactly(2))
242+
->method('channel')
243+
->willReturnOnConsecutiveCalls($channel1Mock, $channel2Mock);
244+
$this->amqpConfig->getChannel();
245+
$channel1Mock->expects($this->once())
246+
->method('getConnection')
247+
->willReturn(null);
248+
249+
$this->assertEquals($channel2Mock, $this->amqpConfig->getChannel());
250+
}
251+
252+
public function testGetChannelWithDisconnectedConnection(): void
253+
{
254+
$this->deploymentConfigMock->expects($this->once())
255+
->method('getConfigData')
256+
->with(Config::QUEUE_CONFIG)
257+
->willReturn([Config::AMQP_CONFIG => self::DEFAULT_CONFIG]);
258+
$connectionMock = $this->createMock(AbstractConnection::class);
259+
$this->connectionFactory->expects($this->once())
260+
->method('create')
261+
->willReturn($connectionMock);
262+
263+
$channel1Mock = $this->createMock(AMQPChannel::class);
264+
$channel2Mock = $this->createMock(AMQPChannel::class);
265+
$connectionMock->expects($this->exactly(2))
266+
->method('channel')
267+
->willReturnOnConsecutiveCalls($channel1Mock, $channel2Mock);
268+
$this->amqpConfig->getChannel();
269+
$channel1Mock->expects($this->atLeastOnce())
270+
->method('getConnection')
271+
->willReturn($connectionMock);
272+
$connectionMock->expects($this->atLeastOnce())
273+
->method('isConnected')
274+
->willReturn(false);
275+
276+
$this->assertEquals($channel2Mock, $this->amqpConfig->getChannel());
277+
}
205278
}

0 commit comments

Comments
 (0)