10
10
11
11
use Magento \Framework \App \ResourceConnection ;
12
12
use Psr \Log \LoggerInterface ;
13
- use Magento \Framework \Exception \LocalizedException ;
14
- use Magento \Framework \Exception \TemporaryStateExceptionInterface ;
15
- use Magento \Framework \DB \Adapter \ConnectionException ;
16
- use Magento \Framework \DB \Adapter \DeadlockException ;
17
- use Magento \Framework \DB \Adapter \LockWaitException ;
18
13
use Magento \Framework \MessageQueue \MessageLockException ;
19
14
use Magento \Framework \MessageQueue \ConnectionLostException ;
20
15
use Magento \Framework \Exception \NotFoundException ;
21
- use Magento \Framework \Exception \NoSuchEntityException ;
22
16
use Magento \Framework \MessageQueue \CallbackInvoker ;
23
- use Magento \Framework \MessageQueue \MessageValidator ;
24
- use Magento \Framework \MessageQueue \MessageEncoder ;
25
17
use Magento \Framework \MessageQueue \ConsumerConfigurationInterface ;
26
18
use Magento \Framework \MessageQueue \EnvelopeInterface ;
27
19
use Magento \Framework \MessageQueue \QueueInterface ;
28
20
use Magento \Framework \MessageQueue \LockInterface ;
29
21
use Magento \Framework \MessageQueue \MessageController ;
30
22
use Magento \Framework \MessageQueue \ConsumerInterface ;
31
- use Magento \Framework \Serialize \Serializer \Json ;
32
- use Magento \AsynchronousOperations \Api \Data \OperationInterface ;
33
- use Magento \Framework \Bulk \OperationManagementInterface ;
34
- use Magento \AsynchronousOperations \Model \ConfigInterface as AsyncConfig ;
23
+ use Magento \AsynchronousOperations \Model \OperationProcessorFactory ;
35
24
36
25
/**
37
26
* Class Consumer used to process OperationInterface messages.
38
27
*/
39
28
class MassConsumer implements ConsumerInterface
40
29
{
41
-
42
30
/**
43
31
* @var \Magento\Framework\MessageQueue\CallbackInvoker
44
32
*/
45
33
private $ invoker ;
46
34
47
- /**
48
- * @var \Magento\Framework\MessageQueue\MessageEncoder
49
- */
50
- private $ messageEncoder ;
51
-
52
- /**
53
- * @var \Magento\Framework\MessageQueue\MessageValidator
54
- */
55
- private $ messageValidator ;
56
-
57
35
/**
58
36
* @var \Magento\Framework\App\ResourceConnection
59
37
*/
@@ -64,16 +42,6 @@ class MassConsumer implements ConsumerInterface
64
42
*/
65
43
private $ configuration ;
66
44
67
- /**
68
- * @var \Magento\Framework\Serialize\Serializer\Json
69
- */
70
- private $ jsonHelper ;
71
-
72
- /**
73
- * @var \Magento\Framework\Bulk\OperationManagementInterface
74
- */
75
- private $ operationManagement ;
76
-
77
45
/**
78
46
* @var \Magento\Framework\MessageQueue\MessageController
79
47
*/
@@ -84,39 +52,37 @@ class MassConsumer implements ConsumerInterface
84
52
*/
85
53
private $ logger ;
86
54
55
+ /**
56
+ * @var OperationProcessor
57
+ */
58
+ private $ operationProcessor ;
59
+
87
60
/**
88
61
* Initialize dependencies.
89
62
*
90
- * @param \Magento\Framework\MessageQueue\CallbackInvoker $invoker
91
- * @param \Magento\Framework\MessageQueue\MessageValidator $messageValidator
92
- * @param \Magento\Framework\MessageQueue\MessageEncoder $messageEncoder
93
- * @param \Magento\Framework\App\ResourceConnection $resource
94
- * @param \Magento\Framework\MessageQueue\ConsumerConfigurationInterface $configuration
95
- * @param \Magento\Framework\Serialize\Serializer\Json $jsonHelper
96
- * @param \Magento\Framework\Bulk\OperationManagementInterface $operationManagement
97
- * @param \Magento\Framework\MessageQueue\MessageController $messageController
98
- * @param \Psr\Log\LoggerInterface|null $logger
63
+ * @param CallbackInvoker $invoker
64
+ * @param ResourceConnection $resource
65
+ * @param MessageController $messageController
66
+ * @param ConsumerConfigurationInterface $configuration
67
+ * @param OperationProcessorFactory $operationProcessorFactory
68
+ * @param LoggerInterface $logger
99
69
*/
100
70
public function __construct (
101
71
CallbackInvoker $ invoker ,
102
- MessageValidator $ messageValidator ,
103
- MessageEncoder $ messageEncoder ,
104
72
ResourceConnection $ resource ,
105
- ConsumerConfigurationInterface $ configuration ,
106
- Json $ jsonHelper ,
107
- OperationManagementInterface $ operationManagement ,
108
73
MessageController $ messageController ,
109
- LoggerInterface $ logger = null
74
+ ConsumerConfigurationInterface $ configuration ,
75
+ OperationProcessorFactory $ operationProcessorFactory ,
76
+ LoggerInterface $ logger
110
77
) {
111
78
$ this ->invoker = $ invoker ;
112
- $ this ->messageValidator = $ messageValidator ;
113
- $ this ->messageEncoder = $ messageEncoder ;
114
79
$ this ->resource = $ resource ;
115
- $ this ->configuration = $ configuration ;
116
- $ this ->jsonHelper = $ jsonHelper ;
117
- $ this ->operationManagement = $ operationManagement ;
118
80
$ this ->messageController = $ messageController ;
119
- $ this ->logger = $ logger ? : \Magento \Framework \App \ObjectManager::getInstance ()->get (LoggerInterface::class);
81
+ $ this ->configuration = $ configuration ;
82
+ $ this ->operationProcessor = $ operationProcessorFactory ->create ([
83
+ 'configuration ' => $ configuration
84
+ ]);
85
+ $ this ->logger = $ logger ;
120
86
}
121
87
122
88
/**
@@ -150,12 +116,11 @@ private function getTransactionCallback(QueueInterface $queue)
150
116
151
117
$ allowedTopics = $ this ->configuration ->getTopicNames ();
152
118
if (in_array ($ topicName , $ allowedTopics )) {
153
- $ this ->dispatchMessage ($ message );
119
+ $ this ->operationProcessor -> process ($ message-> getBody () );
154
120
} else {
155
121
$ queue ->reject ($ message );
156
122
return ;
157
123
}
158
-
159
124
$ queue ->acknowledge ($ message );
160
125
} catch (MessageLockException $ exception ) {
161
126
$ queue ->acknowledge ($ message );
@@ -176,105 +141,4 @@ private function getTransactionCallback(QueueInterface $queue)
176
141
}
177
142
};
178
143
}
179
-
180
- /**
181
- * Decode OperationInterface message and process them.
182
- * Invokes service contract handler with the input params.
183
- * Updates the status of the mass operation.
184
- *
185
- * @param EnvelopeInterface $message
186
- * @throws LocalizedException
187
- */
188
- private function dispatchMessage (EnvelopeInterface $ message )
189
- {
190
- $ operation = $ this ->messageEncoder ->decode (AsyncConfig::SYSTEM_TOPIC_NAME , $ message ->getBody ());
191
- $ this ->messageValidator ->validate (AsyncConfig::SYSTEM_TOPIC_NAME , $ operation );
192
-
193
- $ status = OperationInterface::STATUS_TYPE_COMPLETE ;
194
- $ errorCode = null ;
195
- $ messages = [];
196
- $ topicName = $ operation ->getTopicName ();
197
- $ handlers = $ this ->configuration ->getHandlers ($ topicName );
198
- try {
199
- $ data = $ this ->jsonHelper ->unserialize ($ operation ->getSerializedData ());
200
- $ entityParams = $ this ->messageEncoder ->decode ($ topicName , $ data ['meta_information ' ]);
201
- $ this ->messageValidator ->validate ($ topicName , $ entityParams );
202
- } catch (\Exception $ e ) {
203
- $ this ->logger ->error ($ e ->getMessage ());
204
- $ status = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED ;
205
- $ errorCode = $ e ->getCode ();
206
- $ messages [] = $ e ->getMessage ();
207
- }
208
-
209
- if ($ errorCode === null ) {
210
- foreach ($ handlers as $ callback ) {
211
- $ result = $ this ->executeHandler ($ callback , $ entityParams );
212
- $ status = $ result ['status ' ];
213
- $ errorCode = $ result ['error_code ' ];
214
- $ messages = array_merge ($ messages , $ result ['messages ' ]);
215
- }
216
- }
217
-
218
- $ serializedData = (isset ($ errorCode )) ? $ operation ->getSerializedData () : null ;
219
- $ this ->operationManagement ->changeOperationStatus (
220
- $ operation ->getId (),
221
- $ status ,
222
- $ errorCode ,
223
- implode ('; ' , $ messages ),
224
- $ serializedData
225
- );
226
- }
227
-
228
- /**
229
- * Execute topic handler
230
- *
231
- * @param $callback
232
- * @param $entityParams
233
- * @return array
234
- */
235
- private function executeHandler ($ callback , $ entityParams )
236
- {
237
- $ result = [
238
- 'status ' => OperationInterface::STATUS_TYPE_COMPLETE ,
239
- 'error_code ' => null ,
240
- 'messages ' => []
241
- ];
242
- try {
243
- call_user_func_array ($ callback , $ entityParams );
244
- $ messages [] = sprintf ('Service execution success %s::%s ' , get_class ($ callback [0 ]), $ callback [1 ]);
245
- } catch (\Zend_Db_Adapter_Exception $ e ) {
246
- $ this ->logger ->critical ($ e ->getMessage ());
247
- if ($ e instanceof LockWaitException
248
- || $ e instanceof DeadlockException
249
- || $ e instanceof ConnectionException
250
- ) {
251
- $ result ['status ' ] = OperationInterface::STATUS_TYPE_RETRIABLY_FAILED ;
252
- $ result ['error_code ' ] = $ e ->getCode ();
253
- $ result ['messages ' ][] = __ ($ e ->getMessage ());
254
- } else {
255
- $ result ['status ' ] = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED ;
256
- $ result ['error_code ' ] = $ e ->getCode ();
257
- $ result ['messages ' ][] =
258
- __ ('Sorry, something went wrong during product prices update. Please see log for details. ' );
259
- }
260
- } catch (NoSuchEntityException $ e ) {
261
- $ this ->logger ->error ($ e ->getMessage ());
262
- $ result ['status ' ] = ($ e instanceof TemporaryStateExceptionInterface) ?
263
- OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED :
264
- OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED ;
265
- $ result ['error_code ' ] = $ e ->getCode ();
266
- $ result ['messages ' ][] = $ e ->getMessage ();
267
- } catch (LocalizedException $ e ) {
268
- $ this ->logger ->error ($ e ->getMessage ());
269
- $ result ['status ' ] = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED ;
270
- $ result ['error_code ' ] = $ e ->getCode ();
271
- $ result ['messages ' ][] = $ e ->getMessage ();
272
- } catch (\Exception $ e ) {
273
- $ this ->logger ->error ($ e ->getMessage ());
274
- $ result ['status ' ] = OperationInterface::STATUS_TYPE_NOT_RETRIABLY_FAILED ;
275
- $ result ['error_code ' ] = $ e ->getCode ();
276
- $ result ['messages ' ][] = $ e ->getMessage ();
277
- }
278
- return $ result ;
279
- }
280
144
}
0 commit comments