Skip to content

Commit 4e682e4

Browse files
committed
feature symfony#58483 [Messenger] Extend SQS visibility timeout for messages that are still being processed (valtzu)
This PR was merged into the 7.2 branch. Discussion ---------- [Messenger] Extend SQS visibility timeout for messages that are still being processed | Q | A | ------------- | --- | Branch? | 7.2 | Bug fix? | no | New feature? | yes | Deprecations? | no | License | MIT Now that symfony#53508 is merged, let's add keepalive implementation for SQS too. Commits ------- fa795c3 [Messenger] Extend SQS visibility timeout for messages that are still being processed
2 parents 861a84e + fa795c3 commit 4e682e4

File tree

9 files changed

+127
-5
lines changed

9 files changed

+127
-5
lines changed

src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
7.2
5+
---
6+
7+
* Implement the `KeepaliveReceiverInterface` to enable asynchronously notifying SQS that the job is still being processed, in order to avoid timeouts
8+
49
6.4
510
---
611

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,12 @@ public function testConnectionSendAndGet()
4141

4242
private function execute(string $dsn): void
4343
{
44-
$connection = Connection::fromDsn($dsn, []);
44+
$connection = Connection::fromDsn($dsn, ['visibility_timeout' => 1]);
4545
$connection->setup();
4646
$this->clearSqs($dsn);
4747

4848
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class, DummyMessage::class => 'special']);
49+
$messageSentAt = microtime(true);
4950
$this->assertSame(1, $connection->getMessageCount());
5051

5152
$wait = 0;
@@ -55,6 +56,20 @@ private function execute(string $dsn): void
5556

5657
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
5758
$this->assertEquals(['type' => DummyMessage::class, DummyMessage::class => 'special'], $encoded['headers']);
59+
60+
$this->waitUntilElapsed(seconds: 1.0, since: $messageSentAt);
61+
$connection->keepalive($encoded['id']);
62+
$this->waitUntilElapsed(seconds: 2.0, since: $messageSentAt);
63+
$this->assertSame(0, $connection->getMessageCount(), 'The queue should be empty since visibility timeout was extended');
64+
$connection->delete($encoded['id']);
65+
}
66+
67+
private function waitUntilElapsed(float $seconds, float $since): void
68+
{
69+
$waitTime = $seconds - (microtime(true) - $since);
70+
if ($waitTime > 0) {
71+
usleep((int) ($waitTime * 1e6));
72+
}
5873
}
5974

6075
private function clearSqs(string $dsn): void

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsReceiverTest.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp;
1617
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceiver;
1718
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
19+
use Symfony\Component\Messenger\Envelope;
1820
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1921
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2022
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
@@ -54,6 +56,17 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
5456
iterator_to_array($receiver->get());
5557
}
5658

59+
public function testKeepalive()
60+
{
61+
$serializer = $this->createSerializer();
62+
63+
$connection = $this->createMock(Connection::class);
64+
$connection->expects($this->once())->method('keepalive')->with('123', 10);
65+
66+
$receiver = new AmazonSqsReceiver($connection, $serializer);
67+
$receiver->keepalive(new Envelope(new DummyMessage('foo'), [new AmazonSqsReceivedStamp('123')]), 10);
68+
}
69+
5770
private function createSqsEnvelope()
5871
{
5972
return [

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportTest.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use PHPUnit\Framework\MockObject\MockObject;
1717
use PHPUnit\Framework\TestCase;
1818
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
19+
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp;
1920
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceiver;
2021
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransport;
2122
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
@@ -151,6 +152,31 @@ public function testItConvertsHttpExceptionDuringResetIntoTransportException()
151152
$this->transport->reset();
152153
}
153154

155+
public function testKeepalive()
156+
{
157+
$transport = $this->getTransport(
158+
null,
159+
$connection = $this->createMock(Connection::class),
160+
);
161+
162+
$connection->expects($this->once())->method('keepalive')->with('123', 10);
163+
$transport->keepalive(new Envelope(new DummyMessage('foo'), [new AmazonSqsReceivedStamp('123')]), 10);
164+
}
165+
166+
public function testKeepaliveWhenASqsExceptionOccurs()
167+
{
168+
$transport = $this->getTransport(
169+
null,
170+
$connection = $this->createMock(Connection::class),
171+
);
172+
173+
$exception = $this->createHttpException();
174+
$connection->expects($this->once())->method('keepalive')->with('123')->willThrowException($exception);
175+
176+
$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
177+
$transport->keepalive(new Envelope(new DummyMessage('foo'), [new AmazonSqsReceivedStamp('123')]));
178+
}
179+
154180
private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null)
155181
{
156182
$serializer ??= $this->createMock(SerializerInterface::class);

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
use Symfony\Component\HttpClient\MockHttpClient;
2424
use Symfony\Component\HttpClient\Response\MockResponse;
2525
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
26+
use Symfony\Component\Messenger\Exception\TransportException;
2627
use Symfony\Contracts\HttpClient\HttpClientInterface;
2728

2829
class ConnectionTest extends TestCase
@@ -357,6 +358,33 @@ public function testLoggerWithDebugOption()
357358
$connection->get();
358359
}
359360

361+
public function testKeepalive()
362+
{
363+
$expectedParams = [
364+
'QueueUrl' => $queueUrl = 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
365+
'ReceiptHandle' => $id = 'abc',
366+
'VisibilityTimeout' => $visibilityTimeout = 30,
367+
];
368+
369+
$client = $this->createMock(SqsClient::class);
370+
$client->expects($this->once())->method('changeMessageVisibility')->with($expectedParams);
371+
372+
$connection = new Connection(['visibility_timeout' => $visibilityTimeout], $client, $queueUrl);
373+
$connection->keepalive($id);
374+
}
375+
376+
public function testKeepaliveWithTooSmallTtl()
377+
{
378+
$client = $this->createMock(SqsClient::class);
379+
$client->expects($this->never())->method($this->anything());
380+
381+
$connection = new Connection(['visibility_timeout' => 1], $client, 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue');
382+
383+
$this->expectException(TransportException::class);
384+
$this->expectExceptionMessage('SQS visibility_timeout (1s) cannot be smaller than the keepalive interval (2s).');
385+
$connection->keepalive('123', 2);
386+
}
387+
360388
private function getMockedQueueUrlResponse(): MockResponse
361389
{
362390
if ($this->isAsyncAwsSqsVersion2Installed()) {

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@
1616
use Symfony\Component\Messenger\Exception\LogicException;
1717
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1818
use Symfony\Component\Messenger\Exception\TransportException;
19+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1920
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
20-
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
2121
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2222
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2323

2424
/**
2525
* @author Jérémy Derussé <jeremy@derusse.com>
2626
*/
27-
class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface
27+
class AmazonSqsReceiver implements KeepaliveReceiverInterface, MessageCountAwareInterface
2828
{
2929
private SerializerInterface $serializer;
3030

@@ -78,6 +78,15 @@ public function reject(Envelope $envelope): void
7878
}
7979
}
8080

81+
public function keepalive(Envelope $envelope, ?int $seconds = null): void
82+
{
83+
try {
84+
$this->connection->keepalive($this->findSqsReceivedStamp($envelope)->getId(), $seconds);
85+
} catch (HttpException $e) {
86+
throw new TransportException($e->getMessage(), 0, $e);
87+
}
88+
}
89+
8190
public function getMessageCount(): int
8291
{
8392
try {

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use AsyncAws\Core\Exception\Http\HttpException;
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Exception\TransportException;
17+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1718
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1819
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1920
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
@@ -26,7 +27,7 @@
2627
/**
2728
* @author Jérémy Derussé <jeremy@derusse.com>
2829
*/
29-
class AmazonSqsTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ResetInterface
30+
class AmazonSqsTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, MessageCountAwareInterface, ResetInterface
3031
{
3132
private SerializerInterface $serializer;
3233

@@ -54,6 +55,14 @@ public function reject(Envelope $envelope): void
5455
$this->getReceiver()->reject($envelope);
5556
}
5657

58+
public function keepalive(Envelope $envelope, ?int $seconds = null): void
59+
{
60+
$receiver = $this->getReceiver();
61+
if ($receiver instanceof KeepaliveReceiverInterface) {
62+
$receiver->keepalive($envelope, $seconds);
63+
}
64+
}
65+
5766
public function getMessageCount(): int
5867
{
5968
return $this->getReceiver()->getMessageCount();

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,23 @@ public function delete(string $id): void
304304
]);
305305
}
306306

307+
/**
308+
* @param int|null $seconds the minimum duration the message should be kept alive
309+
*/
310+
public function keepalive(string $id, ?int $seconds = null): void
311+
{
312+
$visibilityTimeout = $this->configuration['visibility_timeout'];
313+
if (null !== $visibilityTimeout && null !== $seconds && $visibilityTimeout < $seconds) {
314+
throw new TransportException(\sprintf('SQS visibility_timeout (%ds) cannot be smaller than the keepalive interval (%ds).', $visibilityTimeout, $seconds));
315+
}
316+
317+
$this->client->changeMessageVisibility([
318+
'QueueUrl' => $this->getQueueUrl(),
319+
'ReceiptHandle' => $id,
320+
'VisibilityTimeout' => $this->configuration['visibility_timeout'],
321+
]);
322+
}
323+
307324
public function getMessageCount(): int
308325
{
309326
$response = $this->client->getQueueAttributes([

src/Symfony/Component/Messenger/Bridge/AmazonSqs/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
"php": ">=8.2",
2020
"async-aws/core": "^1.7",
2121
"async-aws/sqs": "^1.0|^2.0",
22-
"symfony/messenger": "^6.4|^7.0",
22+
"symfony/messenger": "^7.2",
2323
"symfony/service-contracts": "^2.5|^3",
2424
"psr/log": "^1|^2|^3"
2525
},

0 commit comments

Comments
 (0)