35
35
36
36
/**
37
37
* Class Consumer used to process OperationInterface messages.
38
- * This could be used for both synchronous and asynchronous processing, depending on topic.
39
- *
40
- * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
41
38
*/
42
39
class MassConsumer implements ConsumerInterface
43
40
{
@@ -119,7 +116,6 @@ public function __construct(
119
116
$ this ->jsonHelper = $ jsonHelper ;
120
117
$ this ->operationManagement = $ operationManagement ;
121
118
$ this ->messageController = $ messageController ;
122
-
123
119
$ this ->logger = $ logger ? : \Magento \Framework \App \ObjectManager::getInstance ()->get (LoggerInterface::class);
124
120
}
125
121
@@ -188,7 +184,6 @@ private function getTransactionCallback(QueueInterface $queue)
188
184
*
189
185
* @param EnvelopeInterface $message
190
186
* @throws LocalizedException
191
- * @SuppressWarnings(PHPMD.CyclomaticComplexity)
192
187
*/
193
188
private function dispatchMessage (EnvelopeInterface $ message )
194
189
{
@@ -213,42 +208,10 @@ private function dispatchMessage(EnvelopeInterface $message)
213
208
214
209
if ($ errorCode === null ) {
215
210
foreach ($ handlers as $ callback ) {
216
- try {
217
- call_user_func_array ($ callback , $ entityParams );
218
- $ messages [] = sprintf ('Service execution success %s::%s ' , get_class ($ callback [0 ]), $ callback [1 ]);
219
- } catch (\Zend_Db_Adapter_Exception $ e ) {
220
- $ this ->logger ->critical ($ e ->getMessage ());
221
- if ($ e instanceof LockWaitException
222
- || $ e instanceof DeadlockException
223
- || $ e instanceof ConnectionException
224
- ) {
225
- $ status = OperationInterface::STATUS_TYPE_RETRIABLY_FAILED ;
226
- $ errorCode = $ e ->getCode ();
227
- $ messages [] = __ ($ e ->getMessage ());
228
- } else {
229
- $ status = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED ;
230
- $ errorCode = $ e ->getCode ();
231
- $ messages [] =
232
- __ ('Sorry, something went wrong during product prices update. Please see log for details. ' );
233
- }
234
- } catch (NoSuchEntityException $ e ) {
235
- $ this ->logger ->error ($ e ->getMessage ());
236
- $ status = ($ e instanceof TemporaryStateExceptionInterface) ?
237
- OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED :
238
- OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED ;
239
- $ errorCode = $ e ->getCode ();
240
- $ messages [] = $ e ->getMessage ();
241
- } catch (LocalizedException $ e ) {
242
- $ this ->logger ->error ($ e ->getMessage ());
243
- $ status = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED ;
244
- $ errorCode = $ e ->getCode ();
245
- $ messages [] = $ e ->getMessage ();
246
- } catch (\Exception $ e ) {
247
- $ this ->logger ->error ($ e ->getMessage ());
248
- $ status = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED ;
249
- $ errorCode = $ e ->getCode ();
250
- $ messages [] = $ e ->getMessage ();
251
- }
211
+ $ result = $ this ->executeHandler ($ callback , $ entityParams );
212
+ $ status = $ result ['status ' ];
213
+ $ errorCode = $ result ['error_code ' ];
214
+ $ messages = array_merge ($ messages , $ result ['messages ' ]);
252
215
}
253
216
}
254
217
@@ -261,4 +224,57 @@ private function dispatchMessage(EnvelopeInterface $message)
261
224
$ serializedData
262
225
);
263
226
}
227
+
228
+ /**
229
+ * Execute topic handler
230
+ *
231
+ * @param $callback
232
+ * @param $entityParams
233
+ * @return array
234
+ */
235
+ private function executeHandler ($ callback , $ entityParams )
236
+ {
237
+ $ result = [
238
+ 'status ' => OperationInterface::STATUS_TYPE_COMPLETE ,
239
+ 'error_code ' => null ,
240
+ 'messages ' => []
241
+ ];
242
+ try {
243
+ call_user_func_array ($ callback , $ entityParams );
244
+ $ messages [] = sprintf ('Service execution success %s::%s ' , get_class ($ callback [0 ]), $ callback [1 ]);
245
+ } catch (\Zend_Db_Adapter_Exception $ e ) {
246
+ $ this ->logger ->critical ($ e ->getMessage ());
247
+ if ($ e instanceof LockWaitException
248
+ || $ e instanceof DeadlockException
249
+ || $ e instanceof ConnectionException
250
+ ) {
251
+ $ result ['status ' ] = OperationInterface::STATUS_TYPE_RETRIABLY_FAILED ;
252
+ $ result ['error_code ' ] = $ e ->getCode ();
253
+ $ result ['messages ' ][] = __ ($ e ->getMessage ());
254
+ } else {
255
+ $ result ['status ' ] = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED ;
256
+ $ result ['error_code ' ] = $ e ->getCode ();
257
+ $ result ['messages ' ][] =
258
+ __ ('Sorry, something went wrong during product prices update. Please see log for details. ' );
259
+ }
260
+ } catch (NoSuchEntityException $ e ) {
261
+ $ this ->logger ->error ($ e ->getMessage ());
262
+ $ result ['status ' ] = ($ e instanceof TemporaryStateExceptionInterface) ?
263
+ OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED :
264
+ OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED ;
265
+ $ result ['error_code ' ] = $ e ->getCode ();
266
+ $ result ['messages ' ][] = $ e ->getMessage ();
267
+ } catch (LocalizedException $ e ) {
268
+ $ this ->logger ->error ($ e ->getMessage ());
269
+ $ result ['status ' ] = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED ;
270
+ $ result ['error_code ' ] = $ e ->getCode ();
271
+ $ result ['messages ' ][] = $ e ->getMessage ();
272
+ } catch (\Exception $ e ) {
273
+ $ this ->logger ->error ($ e ->getMessage ());
274
+ $ result ['status ' ] = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED ;
275
+ $ result ['error_code ' ] = $ e ->getCode ();
276
+ $ result ['messages ' ][] = $ e ->getMessage ();
277
+ }
278
+ return $ result ;
279
+ }
264
280
}
0 commit comments