|
7 | 7 |
|
8 | 8 | use Magento\Framework\DB\Select;
|
9 | 9 | use Magento\Framework\DB\Sql\Expression;
|
| 10 | +use Magento\Framework\Model\ResourceModel\Db\Context; |
10 | 11 | use Magento\MysqlMq\Model\QueueManagement;
|
11 | 12 |
|
12 | 13 | /**
|
13 | 14 | * Resource model for queue.
|
14 | 15 | */
|
15 | 16 | class Queue extends \Magento\Framework\Model\ResourceModel\Db\AbstractDb
|
16 | 17 | {
|
| 18 | + /** |
| 19 | + * |
| 20 | + */ |
| 21 | + private const CHUNK_SIZE = 10000; |
| 22 | + |
| 23 | + /** |
| 24 | + * @var int|mixed |
| 25 | + */ |
| 26 | + private int $chunkSize; |
| 27 | + |
| 28 | + /** |
| 29 | + * @param Context $context |
| 30 | + * @param null $connectionName |
| 31 | + * @param int|null $chunkSize |
| 32 | + */ |
| 33 | + public function __construct(Context $context, $connectionName = null, int $chunkSize = null) |
| 34 | + { |
| 35 | + parent::__construct($context, $connectionName); |
| 36 | + if ($chunkSize) { |
| 37 | + $this->chunkSize = $chunkSize; |
| 38 | + } else { |
| 39 | + $this->chunkSize = self::CHUNK_SIZE; |
| 40 | + } |
| 41 | + } |
| 42 | + |
17 | 43 | /**
|
18 | 44 | * Model initialization
|
19 | 45 | *
|
@@ -170,13 +196,20 @@ public function getMessages($queueName, $limit = null)
|
170 | 196 | public function deleteMarkedMessages(): void
|
171 | 197 | {
|
172 | 198 | $connection = $this->getConnection();
|
173 |
| - |
174 | 199 | $select = $connection->select()
|
175 | 200 | ->from(['queue_message_status' => $this->getMessageStatusTable()], ['message_id'])
|
176 |
| - ->where('status = ?', QueueManagement::MESSAGE_STATUS_TO_BE_DELETED) |
| 201 | + ->joinLeft( |
| 202 | + ['message_status2' => $this->getMessageStatusTable()], |
| 203 | + 'queue_message_status.message_id = message_status2.message_id AND message_status2.status <> ' . |
| 204 | + QueueManagement::MESSAGE_STATUS_TO_BE_DELETED, |
| 205 | + [] |
| 206 | + ) |
| 207 | + ->where('queue_message_status.status = ?', QueueManagement::MESSAGE_STATUS_TO_BE_DELETED) |
| 208 | + ->where('message_status2.message_id IS NULL') |
177 | 209 | ->distinct();
|
| 210 | + |
178 | 211 | $messageIds = $connection->fetchCol($select);
|
179 |
| - foreach (array_chunk($messageIds, 10000) as $messageIdsChunk) { |
| 212 | + foreach (array_chunk($messageIds, $this->chunkSize) as $messageIdsChunk) { |
180 | 213 | $connection->delete($this->getMessageTable(), ['id IN (?)' => $messageIdsChunk]);
|
181 | 214 | }
|
182 | 215 | }
|
|
0 commit comments