File tree Expand file tree Collapse file tree 2 files changed +40
-0
lines changed Expand file tree Collapse file tree 2 files changed +40
-0
lines changed Original file line number Diff line number Diff line change 40
40
use Symfony \Component \Messenger \Transport \Receiver \ReceiverInterface ;
41
41
use Symfony \Component \Messenger \Worker ;
42
42
use Symfony \Contracts \EventDispatcher \EventDispatcherInterface ;
43
+ use Symfony \Contracts \Service \ResetInterface ;
43
44
44
45
/**
45
46
* @group time-sensitive
@@ -98,6 +99,19 @@ public function testHandlingErrorCausesReject()
98
99
$ this ->assertSame (0 , $ receiver ->getAcknowledgeCount ());
99
100
}
100
101
102
+ public function testWorkerResetsConnectionIfReceiverIsResettable ()
103
+ {
104
+ $ resettableReceiver = new ResettableDummyReceiver ([]);
105
+
106
+ $ bus = $ this ->createMock (MessageBusInterface::class);
107
+ $ dispatcher = new EventDispatcher ();
108
+
109
+ $ worker = new Worker ([$ resettableReceiver ], $ bus , $ dispatcher );
110
+ $ worker ->stop ();
111
+ $ worker ->run ();
112
+ $ this ->assertTrue ($ resettableReceiver ->hasBeenReset ());
113
+ }
114
+
101
115
public function testWorkerDoesNotSendNullMessagesToTheBus ()
102
116
{
103
117
$ receiver = new DummyReceiver ([
@@ -538,3 +552,18 @@ private function process(array $jobs): void
538
552
}
539
553
}
540
554
}
555
+
556
+ class ResettableDummyReceiver extends DummyReceiver implements ResetInterface
557
+ {
558
+ private $ hasBeenReset = false ;
559
+
560
+ public function reset ()
561
+ {
562
+ $ this ->hasBeenReset = true ;
563
+ }
564
+
565
+ public function hasBeenReset (): bool
566
+ {
567
+ return $ this ->hasBeenReset ;
568
+ }
569
+ }
Original file line number Diff line number Diff line change 29
29
use Symfony \Component \Messenger \Transport \Receiver \QueueReceiverInterface ;
30
30
use Symfony \Component \Messenger \Transport \Receiver \ReceiverInterface ;
31
31
use Symfony \Contracts \EventDispatcher \EventDispatcherInterface ;
32
+ use Symfony \Contracts \Service \ResetInterface ;
32
33
33
34
/**
34
35
* @author Samuel Roze <samuel.roze@gmail.com>
@@ -133,6 +134,7 @@ public function run(array $options = []): void
133
134
134
135
$ this ->flush (true );
135
136
$ this ->dispatchEvent (new WorkerStoppedEvent ($ this ));
137
+ $ this ->resetReceiverConnections ();
136
138
}
137
139
138
140
private function handleMessage (Envelope $ envelope , string $ transportName ): void
@@ -256,6 +258,15 @@ public function getMetadata(): WorkerMetadata
256
258
return $ this ->metadata ;
257
259
}
258
260
261
+ private function resetReceiverConnections (): void
262
+ {
263
+ foreach ($ this ->receivers as $ receiver ) {
264
+ if ($ receiver instanceof ResetInterface) {
265
+ $ receiver ->reset ();
266
+ }
267
+ }
268
+ }
269
+
259
270
private function dispatchEvent (object $ event ): void
260
271
{
261
272
if (null === $ this ->eventDispatcher ) {
You can’t perform that action at this time.
0 commit comments