5
5
*/
6
6
namespace Magento \Framework \MessageQueue ;
7
7
8
+ use Exception ;
9
+ use Magento \Framework \App \ObjectManager ;
8
10
use Magento \Framework \App \ResourceConnection ;
11
+ use Magento \Framework \Communication \ConfigInterface as CommunicationConfig ;
9
12
use Magento \Framework \Exception \LocalizedException ;
10
- use Magento \Framework \Phrase ;
13
+ use Magento \Framework \Exception \ NotFoundException ;
11
14
use Magento \Framework \MessageQueue \Consumer \ConfigInterface as ConsumerConfig ;
12
- use Magento \Framework \Communication \ConfigInterface as CommunicationConfig ;
13
- use Magento \Framework \MessageQueue \QueueRepository ;
15
+ use Magento \Framework \Phrase ;
14
16
use Psr \Log \LoggerInterface ;
15
17
16
18
/**
@@ -84,7 +86,13 @@ class Consumer implements ConsumerInterface
84
86
* @param MessageEncoder $messageEncoder
85
87
* @param ResourceConnection $resource
86
88
* @param ConsumerConfigurationInterface $configuration
87
- * @param LoggerInterface $logger
89
+ * @param LoggerInterface|null $logger
90
+ * @param ConsumerConfig|null $consumerConfig
91
+ * @param CommunicationConfig|null $communicationConfig
92
+ * @param QueueRepository|null $queueRepository
93
+ * @param MessageController|null $messageController
94
+ * @param MessageValidator|null $messageValidator
95
+ * @param EnvelopeFactory|null $envelopeFactory
88
96
*
89
97
* @SuppressWarnings(PHPMD.UnusedFormalParameter)
90
98
*/
@@ -93,13 +101,26 @@ public function __construct(
93
101
MessageEncoder $ messageEncoder ,
94
102
ResourceConnection $ resource ,
95
103
ConsumerConfigurationInterface $ configuration ,
96
- LoggerInterface $ logger = null
104
+ LoggerInterface $ logger = null ,
105
+ ConsumerConfig $ consumerConfig = null ,
106
+ CommunicationConfig $ communicationConfig = null ,
107
+ QueueRepository $ queueRepository = null ,
108
+ MessageController $ messageController = null ,
109
+ MessageValidator $ messageValidator = null ,
110
+ EnvelopeFactory $ envelopeFactory = null
97
111
) {
98
112
$ this ->invoker = $ invoker ;
99
113
$ this ->messageEncoder = $ messageEncoder ;
100
114
$ this ->resource = $ resource ;
101
115
$ this ->configuration = $ configuration ;
102
- $ this ->logger = $ logger ?: \Magento \Framework \App \ObjectManager::getInstance ()->get (LoggerInterface::class);
116
+ $ this ->logger = $ logger ?: ObjectManager::getInstance ()->get (LoggerInterface::class);
117
+ $ this ->consumerConfig = $ consumerConfig ?: ObjectManager::getInstance ()->get (ConsumerConfig::class);
118
+ $ this ->communicationConfig = $ communicationConfig
119
+ ?: ObjectManager::getInstance ()->get (CommunicationConfig::class);
120
+ $ this ->queueRepository = $ queueRepository ?: ObjectManager::getInstance ()->get (QueueRepository::class);
121
+ $ this ->messageController = $ messageController ?: ObjectManager::getInstance ()->get (MessageController::class);
122
+ $ this ->messageValidator = $ messageValidator ?: ObjectManager::getInstance ()->get (MessageValidator::class);
123
+ $ this ->envelopeFactory = $ envelopeFactory ?: ObjectManager::getInstance ()->get (EnvelopeFactory::class);
103
124
}
104
125
105
126
/**
@@ -142,7 +163,8 @@ private function dispatchMessage(EnvelopeInterface $message, $isSync = false)
142
163
$ messageSchemaType = $ this ->configuration ->getMessageSchemaType ($ topicName );
143
164
if ($ messageSchemaType == CommunicationConfig::TOPIC_REQUEST_TYPE_METHOD ) {
144
165
foreach ($ handlers as $ callback ) {
145
- $ result = call_user_func_array ($ callback , $ decodedMessage );
166
+ // The `array_values` is a workaround to ensure the same behavior in PHP 7 and 8.
167
+ $ result = call_user_func_array ($ callback , array_values ($ decodedMessage ));
146
168
return $ this ->processSyncResponse ($ topicName , $ result );
147
169
}
148
170
} else {
@@ -168,7 +190,7 @@ private function dispatchMessage(EnvelopeInterface $message, $isSync = false)
168
190
private function processSyncResponse ($ topicName , $ result )
169
191
{
170
192
if (isset ($ result )) {
171
- $ this ->getMessageValidator () ->validate ($ topicName , $ result , false );
193
+ $ this ->messageValidator ->validate ($ topicName , $ result , false );
172
194
return $ this ->messageEncoder ->encode ($ topicName , $ result , false );
173
195
} else {
174
196
throw new LocalizedException (new Phrase ('No reply message resulted in RPC. ' ));
@@ -179,14 +201,15 @@ private function processSyncResponse($topicName, $result)
179
201
* Send RPC response message.
180
202
*
181
203
* @param EnvelopeInterface $envelope
204
+ *
182
205
* @return void
206
+ * @throws LocalizedException
183
207
*/
184
208
private function sendResponse (EnvelopeInterface $ envelope )
185
209
{
186
210
$ messageProperties = $ envelope ->getProperties ();
187
- $ connectionName = $ this ->getConsumerConfig ()
188
- ->getConsumer ($ this ->configuration ->getConsumerName ())->getConnection ();
189
- $ queue = $ this ->getQueueRepository ()->get ($ connectionName , $ messageProperties ['reply_to ' ]);
211
+ $ connectionName = $ this ->consumerConfig ->getConsumer ($ this ->configuration ->getConsumerName ())->getConnection ();
212
+ $ queue = $ this ->queueRepository ->get ($ connectionName , $ messageProperties ['reply_to ' ]);
190
213
$ queue ->push ($ envelope );
191
214
}
192
215
@@ -203,12 +226,12 @@ private function getTransactionCallback(QueueInterface $queue)
203
226
$ lock = null ;
204
227
try {
205
228
$ topicName = $ message ->getProperties ()['topic_name ' ];
206
- $ topicConfig = $ this ->getCommunicationConfig () ->getTopic ($ topicName );
207
- $ lock = $ this ->getMessageController () ->lock ($ message , $ this ->configuration ->getConsumerName ());
229
+ $ topicConfig = $ this ->communicationConfig ->getTopic ($ topicName );
230
+ $ lock = $ this ->messageController ->lock ($ message , $ this ->configuration ->getConsumerName ());
208
231
209
232
if ($ topicConfig [CommunicationConfig::TOPIC_IS_SYNCHRONOUS ]) {
210
233
$ responseBody = $ this ->dispatchMessage ($ message , true );
211
- $ responseMessage = $ this ->getEnvelopeFactory () ->create (
234
+ $ responseMessage = $ this ->envelopeFactory ->create (
212
235
['body ' => $ responseBody , 'properties ' => $ message ->getProperties ()]
213
236
);
214
237
$ this ->sendResponse ($ responseMessage );
@@ -224,15 +247,15 @@ private function getTransactionCallback(QueueInterface $queue)
224
247
$ queue ->acknowledge ($ message );
225
248
} catch (MessageLockException $ exception ) {
226
249
$ queue ->acknowledge ($ message );
227
- } catch (\ Magento \ Framework \ MessageQueue \ ConnectionLostException $ e ) {
250
+ } catch (ConnectionLostException $ e ) {
228
251
if ($ lock ) {
229
252
$ this ->resource ->getConnection ()
230
253
->delete ($ this ->resource ->getTableName ('queue_lock ' ), ['id = ? ' => $ lock ->getId ()]);
231
254
}
232
- } catch (\ Magento \ Framework \ Exception \ NotFoundException $ e ) {
255
+ } catch (NotFoundException $ e ) {
233
256
$ queue ->acknowledge ($ message );
234
257
$ this ->logger ->warning ($ e ->getMessage ());
235
- } catch (\ Exception $ e ) {
258
+ } catch (Exception $ e ) {
236
259
$ queue ->reject ($ message , false , $ e ->getMessage ());
237
260
if ($ lock ) {
238
261
$ this ->resource ->getConnection ()
@@ -241,98 +264,4 @@ private function getTransactionCallback(QueueInterface $queue)
241
264
}
242
265
};
243
266
}
244
-
245
- /**
246
- * Get consumer config.
247
- *
248
- * @return ConsumerConfig
249
- *
250
- * @deprecated 103.0.0
251
- */
252
- private function getConsumerConfig ()
253
- {
254
- if ($ this ->consumerConfig === null ) {
255
- $ this ->consumerConfig = \Magento \Framework \App \ObjectManager::getInstance ()->get (ConsumerConfig::class);
256
- }
257
- return $ this ->consumerConfig ;
258
- }
259
-
260
- /**
261
- * Get communication config.
262
- *
263
- * @return CommunicationConfig
264
- *
265
- * @deprecated 103.0.0
266
- */
267
- private function getCommunicationConfig ()
268
- {
269
- if ($ this ->communicationConfig === null ) {
270
- $ this ->communicationConfig = \Magento \Framework \App \ObjectManager::getInstance ()
271
- ->get (CommunicationConfig::class);
272
- }
273
- return $ this ->communicationConfig ;
274
- }
275
-
276
- /**
277
- * Get queue repository.
278
- *
279
- * @return QueueRepository
280
- *
281
- * @deprecated 103.0.0
282
- */
283
- private function getQueueRepository ()
284
- {
285
- if ($ this ->queueRepository === null ) {
286
- $ this ->queueRepository = \Magento \Framework \App \ObjectManager::getInstance ()->get (QueueRepository::class);
287
- }
288
- return $ this ->queueRepository ;
289
- }
290
-
291
- /**
292
- * Get message controller.
293
- *
294
- * @return MessageController
295
- *
296
- * @deprecated 103.0.0
297
- */
298
- private function getMessageController ()
299
- {
300
- if ($ this ->messageController === null ) {
301
- $ this ->messageController = \Magento \Framework \App \ObjectManager::getInstance ()
302
- ->get (MessageController::class);
303
- }
304
- return $ this ->messageController ;
305
- }
306
-
307
- /**
308
- * Get message validator.
309
- *
310
- * @return MessageValidator
311
- *
312
- * @deprecated 103.0.0
313
- */
314
- private function getMessageValidator ()
315
- {
316
- if ($ this ->messageValidator === null ) {
317
- $ this ->messageValidator = \Magento \Framework \App \ObjectManager::getInstance ()
318
- ->get (MessageValidator::class);
319
- }
320
- return $ this ->messageValidator ;
321
- }
322
-
323
- /**
324
- * Get envelope factory.
325
- *
326
- * @return EnvelopeFactory
327
- *
328
- * @deprecated 103.0.0
329
- */
330
- private function getEnvelopeFactory ()
331
- {
332
- if ($ this ->envelopeFactory === null ) {
333
- $ this ->envelopeFactory = \Magento \Framework \App \ObjectManager::getInstance ()
334
- ->get (EnvelopeFactory::class);
335
- }
336
- return $ this ->envelopeFactory ;
337
- }
338
267
}
0 commit comments