Skip to content

Commit 6143767

Browse files
committed
fix some issues and support laravel 12
1 parent 6792e5f commit 6143767

File tree

7 files changed

+593
-181
lines changed

7 files changed

+593
-181
lines changed

composer.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@
66
"require": {
77
"php": "^8.2",
88
"illuminate/queue": "^11.0|^12.0",
9-
"php-amqplib/php-amqplib": "^3.6.0",
9+
"php-amqplib/php-amqplib": "^v3.7.3",
1010
"illuminate/support": "^11.0|^12.0",
1111
"ext-pcntl": "*"
1212
},
1313
"require-dev": {
1414
"ext-json": "*",
15-
"phpunit/phpunit": "^10.0|^11.0",
16-
"orchestra/testbench": "^9.0|^10.0",
17-
"squizlabs/php_codesniffer": "^3",
15+
"phpunit/phpunit": "^10.0|^11.5.14",
16+
"orchestra/testbench": "^9.0|^10.1",
17+
"squizlabs/php_codesniffer": "^3.12",
1818
"mockery/mockery": "^1.7.0",
1919
"dg/bypass-finals": "dev-master",
2020
"rector/rector": "^0.15.25"

src/Connectors/RabbitMQConnector.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
use iamfarhad\LaravelRabbitMQ\RabbitQueue;
1111
use Illuminate\Contracts\Queue\Queue;
1212

13-
class RabbitMQConnector implements ConnectorInterface
13+
readonly class RabbitMQConnector implements ConnectorInterface
1414
{
15-
public function __construct(private readonly Dispatcher $dispatcher)
15+
public function __construct(private Dispatcher $dispatcher)
1616
{
1717
}
1818

src/Console/ConsumeCommand.php

Lines changed: 110 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,29 @@
11
<?php
22

3+
declare(strict_types=1);
4+
35
namespace iamfarhad\LaravelRabbitMQ\Console;
46

57
use Symfony\Component\Console\Attribute\AsCommand;
68
use Illuminate\Queue\Console\WorkCommand;
79
use Illuminate\Support\Str;
10+
use RuntimeException;
811

912
#[AsCommand(name: 'rabbitmq:consume')]
1013
final class ConsumeCommand extends WorkCommand
1114
{
15+
/**
16+
* The console command signature.
17+
*
18+
* @var string
19+
*/
1220
protected $signature = 'rabbitmq:consume
1321
{connection? : The name of the queue connection to work}
1422
{--name=default : The name of the consumer}
1523
{--queue= : The names of the queues to work}
1624
{--once : Only process the next job on the queue}
1725
{--stop-when-empty : Stop when the queue is empty}
18-
{--delay=0 : The number of seconds to delay failed jobs (Deprecated)}
26+
{--delay=0 : The number of seconds to delay failed jobs}
1927
{--backoff=0 : The number of seconds to wait before retrying a job that encountered an uncaught exception}
2028
{--max-jobs=0 : The number of jobs to process before stopping}
2129
{--max-time=0 : The maximum number of seconds the worker should run}
@@ -25,69 +33,143 @@ final class ConsumeCommand extends WorkCommand
2533
{--timeout=60 : The number of seconds a child process can run}
2634
{--tries=1 : Number of times to attempt a job before logging it failed}
2735
{--rest=0 : Number of seconds to rest between jobs}
28-
{--max-priority=}
36+
{--max-priority=null : Maximum priority level to consume}
2937
{--consumer-tag}
3038
{--prefetch-size=0}
3139
{--prefetch-count=1000}
3240
{--num-processes=2 : Number of processes to run in parallel}
3341
';
3442

35-
protected $description = 'Consume messages';
36-
37-
public function handle(): int|null
43+
/**
44+
* The console command description.
45+
*
46+
* @var string
47+
*/
48+
protected $description = 'Consume messages from RabbitMQ queue';
49+
50+
/**
51+
* Execute the console command.
52+
*/
53+
public function handle(): int
3854
{
39-
$numProcesses = $this->option('num-processes');
55+
$numProcesses = (int) $this->option('num-processes');
56+
57+
if ($numProcesses < 1) {
58+
$this->error('Number of processes must be at least 1');
59+
return 1;
60+
}
61+
62+
// Skip forking if only one process is needed
63+
if ($numProcesses === 1) {
64+
return $this->consume();
65+
}
66+
67+
// Check if pcntl extension is available
68+
if (!extension_loaded('pcntl')) {
69+
$this->error('The pcntl extension is required for parallel processing');
70+
return 1;
71+
}
72+
73+
$childPids = [];
4074

4175
for ($i = 0; $i < $numProcesses; $i++) {
4276
$pid = pcntl_fork();
4377

4478
if ($pid === -1) {
45-
// Error handling
46-
echo "Could not fork process \n";
47-
exit(1);
79+
$this->error("Failed to fork process $i");
80+
continue;
4881
}
4982

5083
if ($pid === 0) {
51-
// This is the child process
52-
$this->consume();
53-
exit(0);
84+
// Child process
85+
exit($this->consume());
86+
} else {
87+
// Parent process
88+
$childPids[] = $pid;
89+
$this->info("Started worker process $pid");
5490
}
5591
}
5692

93+
// Set up signal handling for graceful termination
94+
if (function_exists('pcntl_signal')) {
95+
pcntl_signal(SIGTERM, function () use (&$childPids) {
96+
foreach ($childPids as $pid) {
97+
if (function_exists('posix_kill')) {
98+
posix_kill($pid, SIGTERM);
99+
}
100+
}
101+
});
102+
}
103+
57104
// Wait for all child processes to finish
58-
// while (pcntl_waitpid(0, $status) !== -1) {
59-
// Handle exit status if needed
60-
// }
105+
foreach ($childPids as $pid) {
106+
pcntl_waitpid($pid, $status);
107+
108+
if (pcntl_wifexited($status)) {
109+
$exitCode = pcntl_wexitstatus($status);
110+
if ($exitCode !== 0) {
111+
$this->warn("Process $pid exited with code $exitCode");
112+
}
113+
} else {
114+
$this->warn("Process $pid terminated abnormally");
115+
}
116+
}
61117

62118
return 0;
63119
}
64120

65-
private function consume(): void
121+
/**
122+
* Configure and run the consumer.
123+
*/
124+
private function consume(): int
66125
{
67-
$consumer = $this->worker;
126+
try {
127+
$consumer = $this->worker;
128+
129+
if (!$consumer) {
130+
throw new RuntimeException('Worker instance not initialized');
131+
}
68132

69-
$consumer->setContainer($this->laravel);
70-
$consumer->setName($this->option('name'));
71-
$consumer->setConsumerTag($this->consumerTag());
72-
$consumer->setMaxPriority((int) $this->option('max-priority'));
73-
$consumer->setPrefetchSize((int) $this->option('prefetch-size'));
74-
$consumer->setPrefetchCount((int) $this->option('prefetch-count'));
133+
$consumer->setContainer($this->laravel);
134+
$consumer->setName($this->option('name'));
135+
$consumer->setConsumerTag($this->generateConsumerTag());
75136

76-
parent::handle();
137+
// Initialize prefetch size and count first
138+
$consumer->setPrefetchSize((int) $this->option('prefetch-size'));
139+
$consumer->setPrefetchCount((int) $this->option('prefetch-count'));
140+
141+
// Only set max priority if it's provided and not null
142+
$maxPriority = $this->option('max-priority');
143+
if ($maxPriority !== null && $maxPriority !== '') {
144+
$consumer->setMaxPriority((int) $maxPriority);
145+
}
146+
147+
return parent::handle() ?? 0;
148+
} catch (\Throwable $e) {
149+
$this->error($e->getMessage());
150+
return 1;
151+
}
77152
}
78153

79-
private function consumerTag(): string
154+
/**
155+
* Generate a unique consumer tag.
156+
*/
157+
private function generateConsumerTag(): string
80158
{
81159
if ($consumerTag = $this->option('consumer-tag')) {
82160
return $consumerTag;
83161
}
84162

163+
$appName = config('app.name', 'laravel');
164+
$consumerName = $this->option('name');
165+
$uniqueId = md5(serialize($this->options()) . Str::random(16) . getmypid());
166+
85167
$consumerTag = implode(
86168
'_',
87169
[
88-
Str::slug(config('app.name', 'laravel')),
89-
Str::slug($this->option('name')),
90-
md5(serialize($this->options()) . Str::random(16) . getmypid()),
170+
Str::slug($appName),
171+
Str::slug($consumerName),
172+
$uniqueId,
91173
]
92174
);
93175

src/Consumer.php

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public function setPrefetchCount(int $value): void
6565
* @return int
6666
* @throws Throwable
6767
*/
68-
public function daemon($connectionName, $queue, WorkerOptions $workerOptions)
68+
public function daemon($connectionName, $queue, WorkerOptions $options)
6969
{
7070
if ($this->supportsAsyncSignals()) {
7171
$this->listenForSignals();
@@ -102,7 +102,7 @@ public function daemon($connectionName, $queue, WorkerOptions $workerOptions)
102102
false,
103103
false,
104104
false,
105-
function (AMQPMessage $amqpMessage) use ($connection, $workerOptions, $connectionName, $queue, $jobClass, &$jobsProcessed): void {
105+
function (AMQPMessage $amqpMessage) use ($connection, $options, $connectionName, $queue, $jobClass, &$jobsProcessed): void {
106106
$job = new $jobClass(
107107
$this->container,
108108
$connection,
@@ -114,12 +114,12 @@ function (AMQPMessage $amqpMessage) use ($connection, $workerOptions, $connectio
114114
$this->currentJob = $job;
115115

116116
if ($this->supportsAsyncSignals()) {
117-
$this->registerTimeoutHandler($job, $workerOptions);
117+
$this->registerTimeoutHandler($job, $options);
118118
}
119119

120120
++$jobsProcessed;
121121

122-
$this->runJob($job, $connectionName, $workerOptions);
122+
$this->runJob($job, $connectionName, $options);
123123

124124
if ($this->supportsAsyncSignals()) {
125125
$this->resetTimeoutHandler();
@@ -133,15 +133,15 @@ function (AMQPMessage $amqpMessage) use ($connection, $workerOptions, $connectio
133133
// Before reserving any jobs, we will make sure this queue is not paused and
134134
// if it is we will just pause this worker for a given amount of time and
135135
// make sure we do not need to kill this worker process off completely.
136-
if (! $this->daemonShouldRun($workerOptions, $connectionName, $queue)) {
137-
$this->pauseWorker($workerOptions, $timestampOfLastQueueRestart);
136+
if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
137+
$this->pauseWorker($options, $timestampOfLastQueueRestart);
138138

139139
continue;
140140
}
141141

142142
// If the daemon should run (not in maintenance mode, etc.), then we can wait for a job.
143143
try {
144-
$this->amqpChannel->wait(null, true, (int) $workerOptions->timeout);
144+
$this->amqpChannel->wait(null, true, (int) $options->timeout);
145145
} catch (AMQPRuntimeException $amqpRuntimeException) {
146146
$this->exceptions->report($amqpRuntimeException);
147147

@@ -154,14 +154,14 @@ function (AMQPMessage $amqpMessage) use ($connection, $workerOptions, $connectio
154154

155155
// If no job is got off the queue, we will need to sleep the worker.
156156
if ($this->currentJob === null) {
157-
$this->sleep($workerOptions->sleep);
157+
$this->sleep($options->sleep);
158158
}
159159

160160
// Finally, we will check to see if we have exceeded our memory limits or if
161161
// the queue should restart based on other indications. If so, we'll stop
162162
// this worker and let whatever is "monitoring" it restart the process.
163163
$status = $this->stopIfNecessary(
164-
$workerOptions,
164+
$options,
165165
$timestampOfLastQueueRestart,
166166
$startTime,
167167
$jobsProcessed,
@@ -182,9 +182,9 @@ function (AMQPMessage $amqpMessage) use ($connection, $workerOptions, $connectio
182182
* @param string $connectionName
183183
* @param string $queue
184184
*/
185-
protected function daemonShouldRun(WorkerOptions $workerOptions, $connectionName, $queue): bool
185+
protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue): bool
186186
{
187-
return !(($this->isDownForMaintenance)() && ! $workerOptions->force) && !$this->paused;
187+
return !(($this->isDownForMaintenance)() && ! $options->force) && !$this->paused;
188188
}
189189

190190

src/Jobs/RabbitMQJob.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ public function attempts(): int
5959
}
6060

6161

62+
/**
63+
* @throws JsonException
64+
*/
6265
private function convertMessageToFailed(): void
6366
{
6467
if ($this->amqpMessage->getExchange() !== 'failed_messages') {
@@ -69,6 +72,7 @@ private function convertMessageToFailed(): void
6972

7073
/**
7174
* {@inheritdoc}
75+
* @throws JsonException
7276
*/
7377
public function markAsFailed(): void
7478
{

0 commit comments

Comments
 (0)