Skip to content

Commit 9e39547

Browse files
[12.x] Standardize size() behavior and add extended queue metrics support (#56010)
* Add methods for metrics on queues for; pending size, delayed size, reserved size and oldest job where available. Add the new metrics to the monitor command output. Update the QueueFake driver to support testing delayed jobs and reserved jobs. * Add new line to make cli output more readable * Fix redis oldestPending * Update comments * Add pending jobs to CLI * Fix SQS driver The size method now follows the convention of all other drivers where size() returns the total number of jobs and not just pending jobs * Add comments on sqs methods * Update beanstalkd size() method to return the total size of the queue Remove redundant int casts * Lint Remove unused --metrics flag * Update SQS test case to support updated size() method * Add phpdoc for Queue interface * formatting * formatting * Formatting * remove test --------- Co-authored-by: Taylor Otwell <taylor@laravel.com>
1 parent 829394f commit 9e39547

File tree

10 files changed

+404
-6
lines changed

10 files changed

+404
-6
lines changed

src/Illuminate/Contracts/Queue/Queue.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
namespace Illuminate\Contracts\Queue;
44

5+
/**
6+
* @method int pendingSize(string|null $queue = null)
7+
* @method int delayedSize(string|null $queue = null)
8+
* @method int reservedSize(string|null $queue = null)
9+
* @method int|null creationTimeOfOldestPendingJob(string|null $queue = null)
10+
*/
511
interface Queue
612
{
713
/**
@@ -37,7 +43,6 @@ public function pushOn($queue, $job, $data = '');
3743
*
3844
* @param string $payload
3945
* @param string|null $queue
40-
* @param array $options
4146
* @return mixed
4247
*/
4348
public function pushRaw($payload, $queue = null, array $options = []);

src/Illuminate/Queue/BeanstalkdQueue.php

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,56 @@ public function __construct(
7171
*/
7272
public function size($queue = null)
7373
{
74-
return (int) $this->pheanstalk->statsTube(new TubeName($this->getQueue($queue)))->currentJobsReady;
74+
$stats = $this->pheanstalk->statsTube(new TubeName($this->getQueue($queue)));
75+
76+
return $stats->currentJobsReady
77+
+ $stats->currentJobsDelayed
78+
+ $stats->currentJobsReserved;
79+
}
80+
81+
/**
82+
* Get the number of pending jobs.
83+
*
84+
* @param string|null $queue
85+
* @return int
86+
*/
87+
public function pendingSize($queue = null)
88+
{
89+
return $this->pheanstalk->statsTube(new TubeName($this->getQueue($queue)))->currentJobsReady;
90+
}
91+
92+
/**
93+
* Get the number of delayed jobs.
94+
*
95+
* @param string|null $queue
96+
* @return int
97+
*/
98+
public function delayedSize($queue = null)
99+
{
100+
return $this->pheanstalk->statsTube(new TubeName($this->getQueue($queue)))->currentJobsDelayed;
101+
}
102+
103+
/**
104+
* Get the number of reserved jobs.
105+
*
106+
* @param string|null $queue
107+
* @return int
108+
*/
109+
public function reservedSize($queue = null)
110+
{
111+
return $this->pheanstalk->statsTube(new TubeName($this->getQueue($queue)))->currentJobsReserved;
112+
}
113+
114+
/**
115+
* Get the creation timestamp of the oldest pending job, excluding delayed jobs.
116+
*
117+
* @param string|null $queue
118+
* @return int|null
119+
*/
120+
public function creationTimeOfOldestPendingJob($queue = null)
121+
{
122+
// Not supported by Beanstalkd...
123+
return null;
75124
}
76125

77126
/**

src/Illuminate/Queue/Console/MonitorCommand.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,18 @@ protected function parseQueues($queues)
9999
'connection' => $connection,
100100
'queue' => $queue,
101101
'size' => $size = $this->manager->connection($connection)->size($queue),
102+
'pending' => method_exists($this->manager->connection($connection), 'pendingSize')
103+
? $this->manager->connection($connection)->pendingSize($queue)
104+
: null,
105+
'delayed' => method_exists($this->manager->connection($connection), 'delayedSize')
106+
? $this->manager->connection($connection)->delayedSize($queue)
107+
: null,
108+
'reserved' => method_exists($this->manager->connection($connection), 'reservedSize')
109+
? $this->manager->connection($connection)->reservedSize($queue)
110+
: null,
111+
'oldest_pending' => method_exists($this->manager->connection($connection), 'oldestPending')
112+
? $this->manager->connection($connection)->creationTimeOfOldestPendingJob($queue)
113+
: null,
102114
'status' => $size >= $this->option('max') ? '<fg=yellow;options=bold>ALERT</>' : '<fg=green;options=bold>OK</>',
103115
];
104116
});
@@ -121,6 +133,10 @@ protected function displaySizes(Collection $queues)
121133
$status = '['.$queue['size'].'] '.$queue['status'];
122134

123135
$this->components->twoColumnDetail($name, $status);
136+
$this->components->twoColumnDetail('Pending jobs', $queue['pending'] ?? 'N/A');
137+
$this->components->twoColumnDetail('Delayed jobs', $queue['delayed'] ?? 'N/A');
138+
$this->components->twoColumnDetail('Reserved jobs', $queue['reserved'] ?? 'N/A');
139+
$this->line('');
124140
});
125141

126142
$this->newLine();

src/Illuminate/Queue/DatabaseQueue.php

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,66 @@ public function size($queue = null)
7979
->count();
8080
}
8181

82+
/**
83+
* Get the number of pending jobs.
84+
*
85+
* @param string|null $queue
86+
* @return int
87+
*/
88+
public function pendingSize($queue = null)
89+
{
90+
return $this->database->table($this->table)
91+
->where('queue', $this->getQueue($queue))
92+
->whereNull('reserved_at')
93+
->where('available_at', '<=', $this->currentTime())
94+
->count();
95+
}
96+
97+
/**
98+
* Get the number of delayed jobs.
99+
*
100+
* @param string|null $queue
101+
* @return int
102+
*/
103+
public function delayedSize($queue = null)
104+
{
105+
return $this->database->table($this->table)
106+
->where('queue', $this->getQueue($queue))
107+
->whereNull('reserved_at')
108+
->where('available_at', '>', $this->currentTime())
109+
->count();
110+
}
111+
112+
/**
113+
* Get the number of reserved jobs.
114+
*
115+
* @param string|null $queue
116+
* @return int
117+
*/
118+
public function reservedSize($queue = null)
119+
{
120+
return $this->database->table($this->table)
121+
->where('queue', $this->getQueue($queue))
122+
->whereNotNull('reserved_at')
123+
->count();
124+
}
125+
126+
/**
127+
* Get the creation timestamp of the oldest pending job, excluding delayed jobs.
128+
*
129+
* @param string|null $queue
130+
* @return int|null
131+
*/
132+
public function creationTimeOfOldestPendingJob($queue = null)
133+
{
134+
return $this->database->table($this->table)
135+
->where('queue', $this->getQueue($queue))
136+
->whereNull('reserved_at')
137+
->where('available_at', '<=', $this->currentTime())
138+
->oldest('available_at')
139+
->value('available_at');
140+
}
141+
82142
/**
83143
* Push a new job onto the queue.
84144
*

src/Illuminate/Queue/NullQueue.php

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,50 @@ public function size($queue = null)
1717
return 0;
1818
}
1919

20+
/**
21+
* Get the number of pending jobs.
22+
*
23+
* @param string|null $queue
24+
* @return int
25+
*/
26+
public function pendingSize($queue = null)
27+
{
28+
return 0;
29+
}
30+
31+
/**
32+
* Get the number of delayed jobs.
33+
*
34+
* @param string|null $queue
35+
* @return int
36+
*/
37+
public function delayedSize($queue = null)
38+
{
39+
return 0;
40+
}
41+
42+
/**
43+
* Get the number of reserved jobs.
44+
*
45+
* @param string|null $queue
46+
* @return int
47+
*/
48+
public function reservedSize($queue = null)
49+
{
50+
return 0;
51+
}
52+
53+
/**
54+
* Get the creation timestamp of the oldest pending job, excluding delayed jobs.
55+
*
56+
* @param string|null $queue
57+
* @return int|null
58+
*/
59+
public function creationTimeOfOldestPendingJob($queue = null)
60+
{
61+
return null;
62+
}
63+
2064
/**
2165
* Push a new job onto the queue.
2266
*

src/Illuminate/Queue/RedisQueue.php

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,58 @@ public function size($queue = null)
109109
);
110110
}
111111

112+
/**
113+
* Get the number of pending jobs.
114+
*
115+
* @param string|null $queue
116+
* @return int
117+
*/
118+
public function pendingSize($queue = null)
119+
{
120+
return $this->getConnection()->llen($this->getQueue($queue));
121+
}
122+
123+
/**
124+
* Get the number of delayed jobs.
125+
*
126+
* @param string|null $queue
127+
* @return int
128+
*/
129+
public function delayedSize($queue = null)
130+
{
131+
return $this->getConnection()->zcard($this->getQueue($queue).':delayed');
132+
}
133+
134+
/**
135+
* Get the number of reserved jobs.
136+
*
137+
* @param string|null $queue
138+
* @return int
139+
*/
140+
public function reservedSize($queue = null)
141+
{
142+
return $this->getConnection()->zcard($this->getQueue($queue).':reserved');
143+
}
144+
145+
/**
146+
* Get the creation timestamp of the oldest pending job, excluding delayed jobs.
147+
*
148+
* @param string|null $queue
149+
* @return int|null
150+
*/
151+
public function creationTimeOfOldestPendingJob($queue = null)
152+
{
153+
$payload = $this->getConnection()->lindex($this->getQueue($queue), 0);
154+
155+
if (! $payload) {
156+
return null;
157+
}
158+
159+
$data = json_decode($payload, true);
160+
161+
return $data['createdAt'] ?? null;
162+
}
163+
112164
/**
113165
* Push an array of jobs onto the queue.
114166
*

src/Illuminate/Queue/SqsQueue.php

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,83 @@ public function __construct(
6868
* @return int
6969
*/
7070
public function size($queue = null)
71+
{
72+
$response = $this->sqs->getQueueAttributes([
73+
'QueueUrl' => $this->getQueue($queue),
74+
'AttributeNames' => [
75+
'ApproximateNumberOfMessages',
76+
'ApproximateNumberOfMessagesDelayed',
77+
'ApproximateNumberOfMessagesNotVisible',
78+
],
79+
]);
80+
81+
$a = $response['Attributes'];
82+
83+
return (int) $a['ApproximateNumberOfMessages']
84+
+ (int) $a['ApproximateNumberOfMessagesDelayed']
85+
+ (int) $a['ApproximateNumberOfMessagesNotVisible'];
86+
}
87+
88+
/**
89+
* Get the number of pending jobs.
90+
*
91+
* @param string|null $queue
92+
* @return int
93+
*/
94+
public function pendingSize($queue = null)
7195
{
7296
$response = $this->sqs->getQueueAttributes([
7397
'QueueUrl' => $this->getQueue($queue),
7498
'AttributeNames' => ['ApproximateNumberOfMessages'],
7599
]);
76100

77-
$attributes = $response->get('Attributes');
101+
return (int) $response['Attributes']['ApproximateNumberOfMessages'] ?? 0;
102+
}
103+
104+
/**
105+
* Get the number of delayed jobs.
106+
*
107+
* @param string|null $queue
108+
* @return int
109+
*/
110+
public function delayedSize($queue = null)
111+
{
112+
$response = $this->sqs->getQueueAttributes([
113+
'QueueUrl' => $this->getQueue($queue),
114+
'AttributeNames' => ['ApproximateNumberOfMessagesDelayed'],
115+
]);
116+
117+
return (int) $response['Attributes']['ApproximateNumberOfMessagesDelayed'] ?? 0;
118+
}
119+
120+
/**
121+
* Get the number of reserved jobs.
122+
*
123+
* @param string|null $queue
124+
* @return int
125+
*/
126+
public function reservedSize($queue = null)
127+
{
128+
$response = $this->sqs->getQueueAttributes([
129+
'QueueUrl' => $this->getQueue($queue),
130+
'AttributeNames' => ['ApproximateNumberOfMessagesNotVisible'],
131+
]);
132+
133+
return (int) $response['Attributes']['ApproximateNumberOfMessagesNotVisible'] ?? 0;
134+
}
78135

79-
return (int) $attributes['ApproximateNumberOfMessages'];
136+
/**
137+
* Get the creation timestamp of the oldest pending job, excluding delayed jobs.
138+
*
139+
* Not supported by SQS, returns null.
140+
*
141+
* @param string|null $queue
142+
* @return int|null
143+
*/
144+
public function creationTimeOfOldestPendingJob($queue = null)
145+
{
146+
// Not supported by SQS...
147+
return null;
80148
}
81149

82150
/**

0 commit comments

Comments
 (0)