|
1 | 1 | <?php
|
2 | 2 | /**
|
3 |
| - * Copyright © Magento, Inc. All rights reserved. |
4 |
| - * See COPYING.txt for license details. |
| 3 | + * Copyright 2015 Adobe |
| 4 | + * All Rights Reserved. |
5 | 5 | */
|
6 | 6 | namespace Magento\MysqlMq\Model\ResourceModel;
|
7 | 7 |
|
8 | 8 | use Magento\Framework\DB\Select;
|
9 | 9 | use Magento\Framework\DB\Sql\Expression;
|
| 10 | +use Magento\Framework\Model\ResourceModel\Db\Context; |
10 | 11 | use Magento\MysqlMq\Model\QueueManagement;
|
11 | 12 |
|
12 | 13 | /**
|
13 | 14 | * Resource model for queue.
|
14 | 15 | */
|
15 | 16 | class Queue extends \Magento\Framework\Model\ResourceModel\Db\AbstractDb
|
16 | 17 | {
|
| 18 | + /** |
| 19 | + * |
| 20 | + */ |
| 21 | + private const CHUNK_SIZE = 10000; |
| 22 | + |
| 23 | + /** |
| 24 | + * @var int|mixed |
| 25 | + */ |
| 26 | + private int $chunkSize; |
| 27 | + |
| 28 | + /** |
| 29 | + * @param Context $context |
| 30 | + * @param string|null $connectionName |
| 31 | + * @param int|null $chunkSize |
| 32 | + */ |
| 33 | + public function __construct(Context $context, $connectionName = null, int $chunkSize = null) |
| 34 | + { |
| 35 | + parent::__construct($context, $connectionName); |
| 36 | + if ($chunkSize) { |
| 37 | + $this->chunkSize = $chunkSize; |
| 38 | + } else { |
| 39 | + $this->chunkSize = self::CHUNK_SIZE; |
| 40 | + } |
| 41 | + } |
| 42 | + |
17 | 43 | /**
|
18 | 44 | * Model initialization
|
19 | 45 | *
|
@@ -163,22 +189,29 @@ public function getMessages($queueName, $limit = null)
|
163 | 189 | }
|
164 | 190 |
|
165 | 191 | /**
|
166 |
| - * Delete messages if there is no queue whrere the message is not in status TO BE DELETED |
| 192 | + * Delete messages if there is no queue where the message is not in status TO BE DELETED |
167 | 193 | *
|
168 | 194 | * @return void
|
169 | 195 | */
|
170 |
| - public function deleteMarkedMessages() |
| 196 | + public function deleteMarkedMessages(): void |
171 | 197 | {
|
172 | 198 | $connection = $this->getConnection();
|
173 |
| - |
174 | 199 | $select = $connection->select()
|
175 | 200 | ->from(['queue_message_status' => $this->getMessageStatusTable()], ['message_id'])
|
176 |
| - ->where('status <> ?', QueueManagement::MESSAGE_STATUS_TO_BE_DELETED) |
| 201 | + ->joinLeft( |
| 202 | + ['message_status2' => $this->getMessageStatusTable()], |
| 203 | + 'queue_message_status.message_id = message_status2.message_id AND message_status2.status <> ' . |
| 204 | + QueueManagement::MESSAGE_STATUS_TO_BE_DELETED, |
| 205 | + [] |
| 206 | + ) |
| 207 | + ->where('queue_message_status.status = ?', QueueManagement::MESSAGE_STATUS_TO_BE_DELETED) |
| 208 | + ->where('message_status2.message_id IS NULL') |
177 | 209 | ->distinct();
|
178 |
| - $messageIds = $connection->fetchCol($select); |
179 | 210 |
|
180 |
| - $condition = count($messageIds) > 0 ? ['id NOT IN (?)' => $messageIds] : null; |
181 |
| - $connection->delete($this->getMessageTable(), $condition); |
| 211 | + $messageIds = $connection->fetchCol($select); |
| 212 | + foreach (array_chunk($messageIds, $this->chunkSize) as $messageIdsChunk) { |
| 213 | + $connection->delete($this->getMessageTable(), ['id IN (?)' => $messageIdsChunk]); |
| 214 | + } |
182 | 215 | }
|
183 | 216 |
|
184 | 217 | /**
|
|
0 commit comments