Skip to content

Commit a888c37

Browse files
bobvandevijverfabpot
authored andcommitted
[Messenger] Add simple transport based rate limiter to Messenger
1 parent ea3bc64 commit a888c37

File tree

10 files changed

+37
-1
lines changed

10 files changed

+37
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ CHANGELOG
1515
* Tag all workflows services with `workflow`, those with type=workflow are
1616
tagged with `workflow.workflow`, and those with type=state_machine with
1717
`workflow.state_machine`
18+
* Add `rate_limiter` configuration option to `messenger.transport` to allow rate limited transports using the RateLimiter component
1819

1920
6.1
2021
---

DependencyInjection/Configuration.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1492,6 +1492,10 @@ function ($a) {
14921492
->integerNode('max_delay')->defaultValue(0)->min(0)->info('Max time in ms that a retry should ever be delayed (0 = infinite)')->end()
14931493
->end()
14941494
->end()
1495+
->scalarNode('rate_limiter')
1496+
->defaultNull()
1497+
->info('Rate limiter name to use when processing messages')
1498+
->end()
14951499
->end()
14961500
->end()
14971501
->end()

DependencyInjection/FrameworkExtension.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2102,6 +2102,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
21022102

21032103
$senderAliases = [];
21042104
$transportRetryReferences = [];
2105+
$transportRateLimiterReferences = [];
21052106
foreach ($config['transports'] as $name => $transport) {
21062107
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
21072108
$transportDefinition = (new Definition(TransportInterface::class))
@@ -2130,6 +2131,14 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
21302131

21312132
$transportRetryReferences[$name] = new Reference($retryServiceId);
21322133
}
2134+
2135+
if ($transport['rate_limiter']) {
2136+
if (!interface_exists(LimiterInterface::class)) {
2137+
throw new LogicException('Rate limiter cannot be used within Messenger as the RateLimiter component is not installed. Try running "composer require symfony/rate-limiter".');
2138+
}
2139+
2140+
$transportRateLimiterReferences[$name] = new Reference('limiter.'.$transport['rate_limiter']);
2141+
}
21332142
}
21342143

21352144
$senderReferences = [];
@@ -2184,6 +2193,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
21842193
$container->getDefinition('messenger.retry_strategy_locator')
21852194
->replaceArgument(0, $transportRetryReferences);
21862195

2196+
if (!$transportRateLimiterReferences) {
2197+
$container->removeDefinition('messenger.rate_limiter_locator');
2198+
} else {
2199+
$container->getDefinition('messenger.rate_limiter_locator')
2200+
->replaceArgument(0, $transportRateLimiterReferences);
2201+
}
2202+
21872203
if (\count($failureTransports) > 0) {
21882204
$container->getDefinition('console.command.messenger_failed_messages_retry')
21892205
->replaceArgument(0, $config['failure_transport']);

Resources/config/console.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@
160160
[], // Receiver names
161161
service('messenger.listener.reset_services')->nullOnInvalid(),
162162
[], // Bus names
163+
service('messenger.rate_limiter_locator')->nullOnInvalid(),
163164
])
164165
->tag('console.command')
165166
->tag('monolog.logger', ['channel' => 'messenger'])

Resources/config/messenger.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,11 @@
159159
abstract_arg('max delay ms'),
160160
])
161161

162+
// rate limiter
163+
->set('messenger.rate_limiter_locator', ServiceLocator::class)
164+
->args([[]])
165+
->tag('container.service_locator')
166+
162167
// worker event listener
163168
->set('messenger.retry.send_failed_message_for_retry_listener', SendFailedMessageForRetryListener::class)
164169
->args([

Resources/config/schema/symfony-1.0.xsd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,7 @@
554554
<xsd:attribute name="serializer" type="xsd:string" />
555555
<xsd:attribute name="dsn" type="xsd:string" />
556556
<xsd:attribute name="failure-transport" type="xsd:string" />
557+
<xsd:attribute name="rate-limiter" type="xsd:string" />
557558
</xsd:complexType>
558559

559560
<xsd:complexType name="messenger_retry_strategy">

Tests/DependencyInjection/Fixtures/php/messenger_transports.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
'multiplier' => 3,
2121
'max_delay' => 100,
2222
],
23+
'rate_limiter' => 'customised_worker'
2324
],
2425
'failed' => 'in-memory:///',
2526
'redis' => 'redis://127.0.0.1:6379/messages',

Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<framework:messenger failure-transport="failed">
1111
<framework:serializer default-serializer="messenger.transport.symfony_serializer" />
1212
<framework:transport name="default" dsn="amqp://localhost/%2f/messages" />
13-
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name" serializer="messenger.transport.native_php_serializer">
13+
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name" serializer="messenger.transport.native_php_serializer" rate-limiter="customised_worker">
1414
<framework:options>
1515
<framework:queue>
1616
<framework:name>Queue</framework:name>

Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ framework:
1818
delay: 7
1919
multiplier: 3
2020
max_delay: 100
21+
rate_limiter: customised_worker
2122
failed: 'in-memory:///'
2223
redis: 'redis://127.0.0.1:6379/messages'
2324
beanstalkd: 'beanstalkd://127.0.0.1:11300'

Tests/DependencyInjection/FrameworkExtensionTest.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,12 @@ public function testMessengerTransports()
949949
return array_shift($values);
950950
}, $failureTransports);
951951
$this->assertEquals($expectedTransportsByFailureTransports, $failureTransportsReferences);
952+
953+
$rateLimitedTransports = $container->getDefinition('messenger.rate_limiter_locator')->getArgument(0);
954+
$expectedRateLimitersByRateLimitedTransports = [
955+
'customised' => new Reference('limiter.customised_worker'),
956+
];
957+
$this->assertEquals($expectedRateLimitersByRateLimitedTransports, $rateLimitedTransports);
952958
}
953959

954960
public function testMessengerRouting()

0 commit comments

Comments
 (0)