Skip to content

Commit 661bda1

Browse files
committed
feature symfony#58552 [Console][Messenger] Add $seconds to keepalive() methods (valtzu)
This PR was merged into the 7.2 branch. Discussion ---------- [Console][Messenger] Add `$seconds` to `keepalive()` methods | Q | A | ------------- | --- | Branch? | 7.2 | Bug fix? | no | New feature? | yes | Deprecations? | no | License | MIT Make the transport aware for how long (at minimum) the message should be kept alive. F.e. when extending SQS visibility timeout, you need to pass the visibility timeout (seconds) as parameter. If you pass a value which is less than alarm interval, SQS would resend the message too early. By making the transport aware of when the next keepalive call will happen we'll be able to do some assertions/clamping to improve DX. This is a prerequisite for symfony#58483. Commits ------- e3beb18 Add `$seconds` to `keepalive` methods
2 parents 4348890 + e3beb18 commit 661bda1

File tree

9 files changed

+22
-11
lines changed

9 files changed

+22
-11
lines changed

src/Symfony/Component/Console/Application.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ public function setAlarmInterval(?int $seconds): void
139139
$this->scheduleAlarm();
140140
}
141141

142+
/**
143+
* Gets the interval in seconds on which a SIGALRM signal is dispatched.
144+
*/
145+
public function getAlarmInterval(): ?int
146+
{
147+
return $this->alarmInterval;
148+
}
149+
142150
private function scheduleAlarm(): void
143151
{
144152
if (null !== $this->alarmInterval) {

src/Symfony/Component/Console/Tests/ApplicationTest.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2305,6 +2305,7 @@ public function testAlarmDispatchWithoutEventDispatcher()
23052305
$application = $this->createSignalableApplication($command, null);
23062306

23072307
$this->assertSame(1, $application->run(new ArrayInput(['alarm'])));
2308+
$this->assertSame(1, $application->getAlarmInterval());
23082309
$this->assertTrue($command->signaled);
23092310
}
23102311

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public function reject(Envelope $envelope): void
6565
$this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
6666
}
6767

68-
public function keepalive(Envelope $envelope): void
68+
public function keepalive(Envelope $envelope, ?int $seconds = null): void
6969
{
7070
$this->connection->keepalive($this->findBeanstalkdReceivedStamp($envelope)->getId());
7171
}

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ public function reject(Envelope $envelope): void
4949
$this->getReceiver()->reject($envelope);
5050
}
5151

52-
public function keepalive(Envelope $envelope): void
52+
public function keepalive(Envelope $envelope, ?int $seconds = null): void
5353
{
54-
$this->getReceiver()->keepalive($envelope);
54+
$this->getReceiver()->keepalive($envelope, $seconds);
5555
}
5656

5757
public function getMessageCount(): int

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
286286
if (\SIGALRM === $signal) {
287287
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);
288288

289-
$this->worker->keepalive();
289+
$this->worker->keepalive($this->getApplication()->getAlarmInterval());
290290

291291
return false;
292292
}

src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
156156
if (\SIGALRM === $signal) {
157157
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);
158158

159-
$this->worker->keepalive();
159+
$this->worker->keepalive($this->getApplication()->getAlarmInterval());
160160

161161
return false;
162162
}

src/Symfony/Component/Messenger/Tests/WorkerTest.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,7 @@ public function testKeepalive()
641641

642642
try {
643643
$oldAsync = pcntl_async_signals(true);
644-
pcntl_signal(\SIGALRM, fn () => $worker->keepalive());
644+
pcntl_signal(\SIGALRM, fn () => $worker->keepalive(2));
645645
pcntl_alarm(2);
646646

647647
$worker->run();
@@ -654,7 +654,7 @@ public function testKeepalive()
654654
$this->assertSame($expectedEnvelopes, $receiver->keepaliveEnvelopes);
655655

656656
$receiver->keepaliveEnvelopes = [];
657-
$worker->keepalive();
657+
$worker->keepalive(2);
658658

659659
$this->assertCount(0, $receiver->keepaliveEnvelopes);
660660
}
@@ -672,7 +672,7 @@ class DummyKeepaliveReceiver extends DummyReceiver implements KeepaliveReceiverI
672672
{
673673
public array $keepaliveEnvelopes = [];
674674

675-
public function keepalive(Envelope $envelope): void
675+
public function keepalive(Envelope $envelope, ?int $seconds = null): void
676676
{
677677
$this->keepaliveEnvelopes[] = $envelope;
678678
}

src/Symfony/Component/Messenger/Transport/Receiver/KeepaliveReceiverInterface.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ interface KeepaliveReceiverInterface extends ReceiverInterface
1919
/**
2020
* Informs the transport that the message is still being processed to avoid a timeout on the transport's side.
2121
*
22+
* @param int|null $seconds The minimum duration the message should be kept alive
23+
*
2224
* @throws TransportException If there is an issue communicating with the transport
2325
*/
24-
public function keepalive(Envelope $envelope): void;
26+
public function keepalive(Envelope $envelope, ?int $seconds = null): void;
2527
}

src/Symfony/Component/Messenger/Worker.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ public function stop(): void
290290
$this->shouldStop = true;
291291
}
292292

293-
public function keepalive(): void
293+
public function keepalive(?int $seconds): void
294294
{
295295
foreach ($this->keepalives as $message) {
296296
[$transportName, $envelope] = $this->keepalives[$message];
@@ -303,7 +303,7 @@ public function keepalive(): void
303303
'transport' => $transportName,
304304
'message_id' => $envelope->last(TransportMessageIdStamp::class)?->getId(),
305305
]);
306-
$this->receivers[$transportName]->keepalive($envelope);
306+
$this->receivers[$transportName]->keepalive($envelope, $seconds);
307307
}
308308
}
309309

0 commit comments

Comments
 (0)