Skip to content

Commit af9796d

Browse files
committed
feat: Make RabbitMQ basic_qos settings configurable
This commit introduces configurable basic_qos settings for the RabbitMQ consumer, allowing users to fine-tune message prefetching behavior. Key changes: - Moved basic_qos settings (prefetch_size, prefetch_count, global) to the configuration file. - Users can now adjust these settings to optimize performance based on their specific needs. - Updated documentation to reflect the new configuration options. This change provides greater flexibility and control over message consumption, improving overall performance and resource utilization.
1 parent 85162aa commit af9796d

File tree

5 files changed

+32
-39
lines changed

5 files changed

+32
-39
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ Add connection to config/queue.php:
6262
],
6363
'queue' => [
6464
'job' => \iamfarhad\LaravelRabbitMQ\Jobs\RabbitMQJob::class,
65+
'qos' => [
66+
'prefetch_size' => 0,
67+
'prefetch_count' => 10,
68+
'global' => false
69+
]
6570
],
6671
],
6772
]

config/RabbitMQConnectionConfig.php

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,34 @@
22

33
return [
44
'driver' => 'rabbitmq',
5-
'queue' => env('RABBITMQ_QUEUE', 'default'),
5+
'queue' => env('RABBITMQ_QUEUE', 'default'),
66

77
'hosts' => [
8-
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
9-
'port' => env('RABBITMQ_PORT', 5672),
10-
'user' => env('RABBITMQ_USER', 'guest'),
11-
'password' => env('RABBITMQ_PASSWORD', 'guest'),
12-
'vhost' => env('RABBITMQ_VHOST', '/'),
13-
'lazy' => env('RABBITMQ_LAZY_CONNECTION', true),
8+
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
9+
'port' => env('RABBITMQ_PORT', 5672),
10+
'user' => env('RABBITMQ_USER', 'guest'),
11+
'password' => env('RABBITMQ_PASSWORD', 'guest'),
12+
'vhost' => env('RABBITMQ_VHOST', '/'),
13+
'lazy' => env('RABBITMQ_LAZY_CONNECTION', true),
1414
'keepalive' => env('RABBITMQ_KEEPALIVE_CONNECTION', false),
1515
'heartbeat' => env('RABBITMQ_HEARTBEAT_CONNECTION', 0),
1616
],
1717

1818
'options' => [
1919
'ssl_options' => [
20-
'cafile' => env('RABBITMQ_SSL_CAFILE', null),
21-
'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
22-
'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
20+
'cafile' => env('RABBITMQ_SSL_CAFILE', null),
21+
'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
22+
'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
2323
'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
24-
'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
24+
'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
2525
],
26-
'queue' => [
26+
'queue' => [
2727
'job' => \iamfarhad\LaravelRabbitMQ\Jobs\RabbitMQJob::class,
28+
'qos' => [
29+
'prefetch_size' => 0,
30+
'prefetch_count' => 10,
31+
'global' => false
32+
]
2833
],
2934
],
3035
];

src/Console/ConsumeCommand.php

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ final class ConsumeCommand extends WorkCommand
3535
{--rest=0 : Number of seconds to rest between jobs}
3636
{--max-priority=null : Maximum priority level to consume}
3737
{--consumer-tag}
38-
{--prefetch-size=0}
39-
{--prefetch-count=1000}
4038
{--num-processes=2 : Number of processes to run in parallel}
4139
';
4240

@@ -137,10 +135,6 @@ private function consume(): int
137135
$consumer->setName($this->option('name'));
138136
$consumer->setConsumerTag($this->generateConsumerTag());
139137

140-
// Initialize prefetch size and count first
141-
$consumer->setPrefetchSize((int) $this->option('prefetch-size'));
142-
$consumer->setPrefetchCount((int) $this->option('prefetch-count'));
143-
144138
// Only set max priority if it's provided and not null
145139
$maxPriority = $this->option('max-priority');
146140
if ($maxPriority !== null && $maxPriority !== '') {

src/Consumer.php

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,8 @@ class Consumer extends Worker
1717

1818
private string $consumerTag;
1919

20-
private int $prefetchSize;
21-
2220
private int $maxPriority;
2321

24-
private int $prefetchCount;
25-
2622
private AMQPChannel $amqpChannel;
2723

2824
private object|null $currentJob = null;
@@ -42,16 +38,6 @@ public function setMaxPriority(int $value): void
4238
$this->maxPriority = $value;
4339
}
4440

45-
public function setPrefetchSize(int $value): void
46-
{
47-
$this->prefetchSize = $value;
48-
}
49-
50-
public function setPrefetchCount(int $value): void
51-
{
52-
$this->prefetchCount = $value;
53-
}
54-
5541
/**
5642
* Listen to the given queue in a loop.
5743
*
@@ -75,12 +61,6 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
7561

7662
$this->amqpChannel = $connection->getChannel();
7763

78-
$this->amqpChannel->basic_qos(
79-
$this->prefetchSize,
80-
$this->prefetchCount,
81-
null
82-
);
83-
8464
$jobClass = $connection->getJobClass();
8565
$arguments = [];
8666
if ($this->maxPriority !== 0) {
@@ -90,6 +70,16 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
9070
];
9171
}
9272

73+
$prefetchSize = config('queue.connections.rabbitmq.queue.prefetch_size') ?? 0;
74+
$prefetchCount = config('queue.connections.rabbitmq.queue.prefetch_count') ?? 10;
75+
$global = config('queue.connections.rabbitmq.queue.global') ?? false;
76+
77+
$this->amqpChannel->basic_qos(
78+
$prefetchSize,
79+
$prefetchCount,
80+
$global
81+
);
82+
9383
$this->amqpChannel->basic_consume(
9484
$queue,
9585
$this->consumerTag,

src/LaravelRabbitQueueServiceProvider.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
use iamfarhad\LaravelRabbitMQ\Connectors\RabbitMQConnector;
66
use iamfarhad\LaravelRabbitMQ\Console\ConsumeCommand;
7-
// Added missing import
87
use Illuminate\Contracts\Debug\ExceptionHandler;
98
use Illuminate\Support\ServiceProvider;
109

0 commit comments

Comments
 (0)