7
7
8
8
namespace Magento \AsynchronousOperations \Model ;
9
9
10
+ use Exception ;
10
11
use Magento \AsynchronousOperations \Api \Data \BulkSummaryInterface ;
11
12
use Magento \AsynchronousOperations \Api \Data \BulkSummaryInterfaceFactory ;
12
13
use Magento \AsynchronousOperations \Api \Data \OperationInterface ;
14
+ use Magento \AsynchronousOperations \Model \ResourceModel \Operation \Collection ;
13
15
use Magento \AsynchronousOperations \Model \ResourceModel \Operation \CollectionFactory ;
14
16
use Magento \Authorization \Model \UserContextInterface ;
15
17
use Magento \Framework \App \ResourceConnection ;
21
23
22
24
/**
23
25
* Asynchronous Bulk Management
26
+ *
27
+ * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
24
28
*/
25
29
class BulkManagement implements BulkManagementInterface
26
30
{
@@ -109,7 +113,7 @@ public function scheduleBulk($bulkUuid, array $operations, $description, $userId
109
113
$ userType = UserContextInterface::USER_TYPE_ADMIN ;
110
114
}
111
115
try {
112
- /** @var \Magento\AsynchronousOperations\Api\Data\ BulkSummaryInterface $bulkSummary */
116
+ /** @var BulkSummaryInterface $bulkSummary */
113
117
$ bulkSummary = $ this ->bulkSummaryFactory ->create ();
114
118
$ this ->entityManager ->load ($ bulkSummary , $ bulkUuid );
115
119
$ bulkSummary ->setBulkId ($ bulkUuid );
@@ -122,7 +126,7 @@ public function scheduleBulk($bulkUuid, array $operations, $description, $userId
122
126
$ this ->publishOperations ($ operations );
123
127
124
128
$ connection ->commit ();
125
- } catch (\ Exception $ exception ) {
129
+ } catch (Exception $ exception ) {
126
130
$ connection ->rollBack ();
127
131
$ this ->logger ->critical ($ exception ->getMessage ());
128
132
return false ;
@@ -140,57 +144,69 @@ public function scheduleBulk($bulkUuid, array $operations, $description, $userId
140
144
*/
141
145
public function retryBulk ($ bulkUuid , array $ errorCodes )
142
146
{
143
- $ metadata = $ this ->metadataPool ->getMetadata (BulkSummaryInterface::class);
144
-
145
- $ connection = $ this ->resourceConnection ->getConnectionByName ($ metadata ->getEntityConnectionName ());
146
- /** @var \Magento\AsynchronousOperations\Model\ResourceModel\Operation[] $retriablyFailedOperations */
147
- $ retriablyFailedOperations = $ this ->operationCollectionFactory ->create ()
148
- ->addFieldToFilter ('error_code ' , ['in ' => $ errorCodes ])
149
- ->addFieldToFilter ('bulk_uuid ' , ['eq ' => $ bulkUuid ])
147
+ /** @var Collection $collection */
148
+ $ collection = $ this ->operationCollectionFactory ->create ();
149
+ /** @var Operation[] $retriablyFailedOperations */
150
+ $ retriablyFailedOperations = $ collection
151
+ ->addFieldToFilter (OperationInterface::BULK_ID , ['eq ' => $ bulkUuid ])
152
+ ->addFieldToFilter (OperationInterface::ERROR_CODE , ['in ' => $ errorCodes ])
150
153
->getItems ();
151
-
152
- // remove corresponding operations from database (i.e. move them to 'open' status)
153
- $ connection ->beginTransaction ();
154
- try {
155
- $ operationIds = [];
156
- $ currentBatchSize = 0 ;
157
- $ maxBatchSize = 10000 ;
158
- /** @var OperationInterface $operation */
154
+ $ affectedOperations = count ($ retriablyFailedOperations );
155
+ if ($ retriablyFailedOperations ) {
156
+ $ operation = reset ($ retriablyFailedOperations );
157
+ //async consumer expects operations to be in the database
158
+ // thus such operation should not be deleted but reopened
159
+ $ shouldReopen = strpos ($ operation ->getTopicName (), ConfigInterface::TOPIC_PREFIX ) === 0 ;
160
+ $ metadata = $ this ->metadataPool ->getMetadata (OperationInterface::class);
161
+ $ linkField = $ metadata ->getLinkField ();
162
+ $ ids = [];
159
163
foreach ($ retriablyFailedOperations as $ operation ) {
160
- if ($ currentBatchSize === $ maxBatchSize ) {
161
- $ whereCondition = $ connection ->quoteInto ('operation_key IN (?) ' , $ operationIds )
162
- . " AND "
163
- . $ connection ->quoteInto ('bulk_uuid = ? ' , $ bulkUuid );
164
- $ connection ->delete (
165
- $ this ->resourceConnection ->getTableName ('magento_operation ' ),
166
- $ whereCondition
167
- );
168
- $ operationIds = [];
169
- $ currentBatchSize = 0 ;
170
- }
171
- $ currentBatchSize ++;
172
- $ operationIds [] = $ operation ->getId ();
164
+ $ ids [] = (int ) $ operation ->getData ($ linkField );
173
165
}
174
- // remove operations from the last batch
175
- if (!empty ($ operationIds )) {
176
- $ whereCondition = $ connection ->quoteInto ('operation_key IN (?) ' , $ operationIds )
177
- . " AND "
178
- . $ connection ->quoteInto ('bulk_uuid = ? ' , $ bulkUuid );
179
- $ connection ->delete (
180
- $ this ->resourceConnection ->getTableName ('magento_operation ' ),
181
- $ whereCondition
182
- );
166
+ $ batchSize = 10000 ;
167
+ $ chunks = array_chunk ($ ids , $ batchSize );
168
+ $ connection = $ this ->resourceConnection ->getConnectionByName ($ metadata ->getEntityConnectionName ());
169
+ $ connection ->beginTransaction ();
170
+ try {
171
+ if ($ shouldReopen ) {
172
+ foreach ($ chunks as $ chunk ) {
173
+ $ connection ->update (
174
+ $ metadata ->getEntityTable (),
175
+ [
176
+ OperationInterface::STATUS => OperationInterface::STATUS_TYPE_OPEN ,
177
+ OperationInterface::RESULT_SERIALIZED_DATA => null ,
178
+ OperationInterface::ERROR_CODE => null ,
179
+ OperationInterface::RESULT_MESSAGE => null ,
180
+ 'started_at ' => null ,
181
+ ],
182
+ [
183
+ $ linkField . ' IN (?) ' => $ chunk ,
184
+ ]
185
+ );
186
+ }
187
+ } else {
188
+ foreach ($ chunks as $ chunk ) {
189
+ $ connection ->delete (
190
+ $ metadata ->getEntityTable (),
191
+ [
192
+ $ linkField . ' IN (?) ' => $ chunk ,
193
+ ]
194
+ );
195
+ }
196
+ }
197
+ $ connection ->commit ();
198
+ } catch (\Throwable $ exception ) {
199
+ $ connection ->rollBack ();
200
+ $ this ->logger ->critical ($ exception ->getMessage ());
201
+ $ affectedOperations = 0 ;
183
202
}
184
203
185
- $ connection ->commit ();
186
- } catch (\Exception $ exception ) {
187
- $ connection ->rollBack ();
188
- $ this ->logger ->critical ($ exception ->getMessage ());
189
- return 0 ;
204
+ if ($ affectedOperations ) {
205
+ $ this ->publishOperations ($ retriablyFailedOperations );
206
+ }
190
207
}
191
- $ this ->publishOperations ($ retriablyFailedOperations );
192
208
193
- return count ( $ retriablyFailedOperations ) ;
209
+ return $ affectedOperations ;
194
210
}
195
211
196
212
/**
0 commit comments