Skip to content

Commit 5c30493

Browse files
committed
B2B-2155: Use Redis as message queue
- fix integration test failures
1 parent c0a3d59 commit 5c30493

File tree

1 file changed

+66
-0
lines changed

1 file changed

+66
-0
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
/**
3+
* Copyright © Magento, Inc. All rights reserved.
4+
* See COPYING.txt for license details.
5+
*/
6+
declare(strict_types=1);
7+
8+
namespace Magento\TestFramework\MessageQueue;
9+
10+
use Magento\Framework\Exception\LocalizedException;
11+
use Magento\Framework\MessageQueue\Consumer\Config\ConsumerConfigItemInterface;
12+
use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfig;
13+
use Magento\Framework\MessageQueue\ConsumerFactory;
14+
use Magento\Framework\MessageQueue\QueueRepository;
15+
16+
class ClearQueueProcessor
17+
{
18+
/**
19+
* @var ConsumerConfig
20+
*/
21+
private $consumerConfig;
22+
23+
/**
24+
* @var ConsumerFactory
25+
*/
26+
private $consumerFactory;
27+
28+
/**
29+
* @var QueueRepository
30+
*/
31+
private $queueRepository;
32+
33+
/**
34+
* ClearQueueProcessor constructor.
35+
*
36+
* @param ConsumerConfig $consumerConfig
37+
* @param ConsumerFactory $consumerFactory
38+
* @param QueueRepository $queueRepository
39+
*/
40+
public function __construct(
41+
ConsumerConfig $consumerConfig,
42+
ConsumerFactory $consumerFactory,
43+
QueueRepository $queueRepository
44+
) {
45+
$this->consumerConfig = $consumerConfig;
46+
$this->consumerFactory = $consumerFactory;
47+
$this->queueRepository = $queueRepository;
48+
}
49+
50+
/**
51+
* Clear queue
52+
*
53+
* @param string $consumerName
54+
* @throws LocalizedException
55+
* return void
56+
*/
57+
public function execute(string $consumerName): void
58+
{
59+
/** @var ConsumerConfigItemInterface $consumerConfig */
60+
$consumerConfig = $this->consumerConfig->getConsumer($consumerName);
61+
$queue = $this->queueRepository->get($consumerConfig->getConnection(), $consumerConfig->getQueue());
62+
while ($message = $queue->dequeue()) {
63+
$queue->acknowledge($message);
64+
}
65+
}
66+
}

0 commit comments

Comments
 (0)