File tree Expand file tree Collapse file tree 1 file changed +22
-1
lines changed Expand file tree Collapse file tree 1 file changed +22
-1
lines changed Original file line number Diff line number Diff line change @@ -29,6 +29,7 @@ final class ConsumeCommand extends WorkCommand
29
29
{--consumer-tag}
30
30
{--prefetch-size=0}
31
31
{--prefetch-count=1000}
32
+ {--num-processes=2 : Number of processes to run in parallel}
32
33
' ;
33
34
34
35
protected $ description = 'Consume messages ' ;
@@ -37,15 +38,35 @@ final class ConsumeCommand extends WorkCommand
37
38
public function handle (): int |null
38
39
{
39
40
$ consumer = $ this ->worker ;
41
+ $ numProcesses = $ this ->option ('num-processes ' );
40
42
41
43
$ consumer ->setContainer ($ this ->laravel );
42
44
$ consumer ->setName ($ this ->option ('name ' ));
43
45
$ consumer ->setConsumerTag ($ this ->consumerTag ());
44
46
$ consumer ->setMaxPriority ((int ) $ this ->option ('max-priority ' ));
45
47
$ consumer ->setPrefetchSize ((int ) $ this ->option ('prefetch-size ' ));
46
48
$ consumer ->setPrefetchCount ((int ) $ this ->option ('prefetch-count ' ));
49
+
50
+ for ($ i = 0 ; $ i < $ numProcesses ; $ i ++) {
51
+ $ pid = pcntl_fork ();
47
52
48
- parent ::handle ();
53
+ if ($ pid === -1 ) {
54
+ // Error handling
55
+ echo "Could not fork process \n" ;
56
+ exit (1 );
57
+ } elseif ($ pid === 0 ) {
58
+ // This is the child process
59
+ $ this ->consume ();
60
+ exit (0 );
61
+ }
62
+ }
63
+
64
+ // Wait for all child processes to finish
65
+ while (pcntl_waitpid (0 , $ status ) !== -1 ) {
66
+ // Handle exit status if needed
67
+ }
68
+
69
+ return 0 ;;
49
70
}
50
71
51
72
private function consumerTag (): string
You can’t perform that action at this time.
0 commit comments