12
12
use Throwable ;
13
13
use iamfarhad \LaravelRabbitMQ \RabbitQueue ;
14
14
15
- class Consumer extends Worker
15
+ final class Consumer extends Worker
16
16
{
17
- protected Container $ container ;
17
+ private Container $ container ;
18
18
19
- protected string $ consumerTag ;
19
+ private string $ consumerTag ;
20
20
21
- protected int $ prefetchSize ;
21
+ private int $ prefetchSize ;
22
22
23
- protected int $ maxPriority ;
23
+ private int $ maxPriority ;
24
24
25
- protected int $ prefetchCount ;
25
+ private int $ prefetchCount ;
26
26
27
- protected AMQPChannel $ channel ;
27
+ private AMQPChannel $ amqpChannel ;
28
28
29
- private $ currentJob ;
29
+ private ? object $ currentJob = null ;
30
30
31
- public function setContainer (Container $ value ): void
31
+ public function setContainer (Container $ container ): void
32
32
{
33
- $ this ->container = $ value ;
33
+ $ this ->container = $ container ;
34
34
}
35
35
36
36
public function setConsumerTag (string $ value ): void
@@ -58,62 +58,60 @@ public function setPrefetchCount(int $value): void
58
58
*
59
59
* @param string $connectionName
60
60
* @param string $queue
61
- * @param WorkerOptions $options
62
61
* @return int
63
- *
64
62
* @throws Throwable
65
63
*/
66
- public function daemon ($ connectionName , $ queue , WorkerOptions $ options )
64
+ public function daemon ($ connectionName , $ queue , WorkerOptions $ workerOptions )
67
65
{
68
66
if ($ this ->supportsAsyncSignals ()) {
69
67
$ this ->listenForSignals ();
70
68
}
71
69
72
- $ lastRestart = $ this ->getTimestampOfLastQueueRestart ();
73
-
74
- [ $ startTime , $ jobsProcessed] = [ hrtime ( true ) / 1e9 , 0 ] ;
70
+ $ timestampOfLastQueueRestart = $ this ->getTimestampOfLastQueueRestart ();
71
+ $ startTime = hrtime ( true ) / 1e9 ;
72
+ $ jobsProcessed = 0 ;
75
73
76
74
$ connection = $ this ->manager ->connection ($ connectionName );
77
75
78
- $ this ->channel = $ connection ->getChannel ();
76
+ $ this ->amqpChannel = $ connection ->getChannel ();
79
77
80
- $ this ->channel ->basic_qos (
78
+ $ this ->amqpChannel ->basic_qos (
81
79
$ this ->prefetchSize ,
82
80
$ this ->prefetchCount ,
83
81
null
84
82
);
85
83
86
84
$ jobClass = $ connection ->getJobClass ();
87
85
$ arguments = [];
88
- if ($ this ->maxPriority ) {
86
+ if ($ this ->maxPriority !== 0 ) {
89
87
$ arguments ['priority ' ] = ['I ' , $ this ->maxPriority ];
90
88
}
91
89
92
- $ this ->channel ->basic_consume (
90
+ $ this ->amqpChannel ->basic_consume (
93
91
$ queue ,
94
92
$ this ->consumerTag ,
95
93
false ,
96
94
false ,
97
95
false ,
98
96
false ,
99
- function (AMQPMessage $ message ) use ($ connection , $ options , $ connectionName , $ queue , $ jobClass , &$ jobsProcessed ): void {
97
+ function (AMQPMessage $ amqpMessage ) use ($ connection , $ workerOptions , $ connectionName , $ queue , $ jobClass , &$ jobsProcessed ): void {
100
98
$ job = new $ jobClass (
101
99
$ this ->container ,
102
100
$ connection ,
103
- $ message ,
101
+ $ amqpMessage ,
104
102
$ connectionName ,
105
103
$ queue
106
104
);
107
105
108
106
$ this ->currentJob = $ job ;
109
107
110
108
if ($ this ->supportsAsyncSignals ()) {
111
- $ this ->registerTimeoutHandler ($ job , $ options );
109
+ $ this ->registerTimeoutHandler ($ job , $ workerOptions );
112
110
}
113
111
114
- $ jobsProcessed ++ ;
112
+ ++ $ jobsProcessed ;
115
113
116
- $ this ->runJob ($ job , $ connectionName , $ options );
114
+ $ this ->runJob ($ job , $ connectionName , $ workerOptions );
117
115
118
116
if ($ this ->supportsAsyncSignals ()) {
119
117
$ this ->resetTimeoutHandler ();
@@ -123,21 +121,21 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu
123
121
$ arguments
124
122
);
125
123
126
- while ($ this ->channel ->is_consuming ()) {
124
+ while ($ this ->amqpChannel ->is_consuming ()) {
127
125
// Before reserving any jobs, we will make sure this queue is not paused and
128
126
// if it is we will just pause this worker for a given amount of time and
129
127
// make sure we do not need to kill this worker process off completely.
130
- if (! $ this ->daemonShouldRun ($ options , $ connectionName , $ queue )) {
131
- $ this ->pauseWorker ($ options , $ lastRestart );
128
+ if (! $ this ->daemonShouldRun ($ workerOptions , $ connectionName , $ queue )) {
129
+ $ this ->pauseWorker ($ workerOptions , $ timestampOfLastQueueRestart );
132
130
133
131
continue ;
134
132
}
135
133
136
134
// If the daemon should run (not in maintenance mode, etc.), then we can wait for a job.
137
135
try {
138
- $ this ->channel ->wait (null , true , (int ) $ options ->timeout );
139
- } catch (AMQPRuntimeException $ exception ) {
140
- $ this ->exceptions ->report ($ exception );
136
+ $ this ->amqpChannel ->wait (null , true , (int ) $ workerOptions ->timeout );
137
+ } catch (AMQPRuntimeException $ amqpRuntimeException ) {
138
+ $ this ->exceptions ->report ($ amqpRuntimeException );
141
139
142
140
$ this ->kill (1 );
143
141
} catch (Exception | Throwable $ exception ) {
@@ -148,15 +146,15 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu
148
146
149
147
// If no job is got off the queue, we will need to sleep the worker.
150
148
if ($ this ->currentJob === null ) {
151
- $ this ->sleep ($ options ->sleep );
149
+ $ this ->sleep ($ workerOptions ->sleep );
152
150
}
153
151
154
152
// Finally, we will check to see if we have exceeded our memory limits or if
155
153
// the queue should restart based on other indications. If so, we'll stop
156
154
// this worker and let whatever is "monitoring" it restart the process.
157
155
$ status = $ this ->stopIfNecessary (
158
- $ options ,
159
- $ lastRestart ,
156
+ $ workerOptions ,
157
+ $ timestampOfLastQueueRestart ,
160
158
$ startTime ,
161
159
$ jobsProcessed ,
162
160
$ this ->currentJob
@@ -173,21 +171,19 @@ function (AMQPMessage $message) use ($connection, $options, $connectionName, $qu
173
171
/**
174
172
* Determine if the daemon should process on this iteration.
175
173
*
176
- * @param WorkerOptions $options
177
174
* @param string $connectionName
178
175
* @param string $queue
179
- * @return bool
180
176
*/
181
- protected function daemonShouldRun (WorkerOptions $ options , $ connectionName , $ queue ): bool
177
+ protected function daemonShouldRun (WorkerOptions $ workerOptions , $ connectionName , $ queue ): bool
182
178
{
183
- return ! ((( $ this ->isDownForMaintenance )() && ! $ options ->force ) || $ this ->paused ) ;
179
+ return !(( $ this ->isDownForMaintenance )() && ! $ workerOptions ->force ) && ! $ this ->paused ;
184
180
}
185
181
186
182
public function stop ($ status = 0 , $ options = []): int
187
183
{
188
184
// Tell the server you are going to stop consuming.
189
185
// It will finish up the last message and not send you anymore.
190
- $ this ->channel ->basic_cancel ($ this ->consumerTag , false , true );
186
+ $ this ->amqpChannel ->basic_cancel ($ this ->consumerTag , false , true );
191
187
192
188
return parent ::stop ($ status );
193
189
}
0 commit comments