Skip to content

Commit 072cdb7

Browse files
committed
MC-42514: [Magento Cloud] Broken pipe error when processing bulk actions
- Introduced prefetch count value to limit the number of unacknowledged messages for each consumer
1 parent bf4cdad commit 072cdb7

File tree

2 files changed

+86
-3
lines changed

2 files changed

+86
-3
lines changed

lib/internal/Magento/Framework/Amqp/Queue.php

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
use Psr\Log\LoggerInterface;
1515

1616
/**
17-
* Class Queue
18-
*
1917
* @api
2018
* @since 103.0.0
2119
*/
@@ -41,24 +39,34 @@ class Queue implements QueueInterface
4139
*/
4240
private $logger;
4341

42+
/**
43+
* The prefetch value is used to specify how many messages that are being sent to the consumer at the same time.
44+
* @see https://www.rabbitmq.com/consumer-prefetch.html
45+
* @var int
46+
*/
47+
private $prefetchCount;
48+
4449
/**
4550
* Initialize dependencies.
4651
*
4752
* @param Config $amqpConfig
4853
* @param EnvelopeFactory $envelopeFactory
4954
* @param string $queueName
5055
* @param LoggerInterface $logger
56+
* @param int $prefetchCount
5157
*/
5258
public function __construct(
5359
Config $amqpConfig,
5460
EnvelopeFactory $envelopeFactory,
5561
$queueName,
56-
LoggerInterface $logger
62+
LoggerInterface $logger,
63+
$prefetchCount = 100
5764
) {
5865
$this->amqpConfig = $amqpConfig;
5966
$this->queueName = $queueName;
6067
$this->envelopeFactory = $envelopeFactory;
6168
$this->logger = $logger;
69+
$this->prefetchCount = (int)$prefetchCount;
6270
}
6371

6472
/**
@@ -144,6 +152,7 @@ public function subscribe($callback)
144152

145153
$channel = $this->amqpConfig->getChannel();
146154
// @codingStandardsIgnoreStart
155+
$channel->basic_qos(0, $this->prefetchCount, false);
147156
$channel->basic_consume($this->queueName, '', false, false, false, false, $callbackConverter);
148157
// @codingStandardsIgnoreEnd
149158
while (count($channel->callbacks)) {
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
declare(strict_types=1);
7+
8+
namespace Magento\Framework\Amqp\Test\Unit;
9+
10+
use Magento\Framework\Amqp\Config;
11+
use Magento\Framework\Amqp\Queue;
12+
use Magento\Framework\MessageQueue\EnvelopeFactory;
13+
use PhpAmqpLib\Channel\AMQPChannel;
14+
use PHPUnit\Framework\MockObject\MockObject;
15+
use PHPUnit\Framework\TestCase;
16+
use Psr\Log\LoggerInterface;
17+
18+
class QueueTest extends TestCase
19+
{
20+
private const PREFETCH_COUNT = 100;
21+
/**
22+
* @var Config|MockObject
23+
*/
24+
private $config;
25+
26+
/**
27+
* @var EnvelopeFactory|MockObject
28+
*/
29+
private $envelopeFactory;
30+
31+
/**
32+
* @var LoggerInterface|MockObject
33+
*/
34+
private $logger;
35+
36+
/**
37+
* @var Queue
38+
*/
39+
private $model;
40+
41+
protected function setUp(): void
42+
{
43+
$this->config = $this->createMock(Config::class);
44+
$this->envelopeFactory = $this->createMock(EnvelopeFactory::class);
45+
$this->logger = $this->createMock(LoggerInterface::class);
46+
47+
$this->model = new Queue(
48+
$this->config,
49+
$this->envelopeFactory,
50+
'testQueue',
51+
$this->logger,
52+
self::PREFETCH_COUNT
53+
);
54+
}
55+
56+
/**
57+
* Test verifies that prefetch value is used to specify how many messages
58+
* are being sent to the consumer at the same time.
59+
*/
60+
public function testSubscribe()
61+
{
62+
$callback = function () {
63+
};
64+
$amqpChannel = $this->createMock(AMQPChannel::class);
65+
$amqpChannel->expects($this->once())
66+
->method('basic_qos')
67+
->with(0, self::PREFETCH_COUNT, false);
68+
$this->config->expects($this->once())
69+
->method('getChannel')
70+
->willReturn($amqpChannel);
71+
72+
$this->model->subscribe($callback);
73+
}
74+
}

0 commit comments

Comments
 (0)