Skip to content

Commit dbef117

Browse files
[Messenger] fix Redis support on 32b arch
1 parent 79bc146 commit dbef117

File tree

2 files changed

+73
-32
lines changed

2 files changed

+73
-32
lines changed

Tests/Transport/RedisExt/ConnectionTest.php

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ public function testGetAfterReject()
220220
{
221221
$redis = new \Redis();
222222
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
223+
$connection->cleanup();
223224

224225
$connection->add('1', []);
225226
$connection->add('2', []);
@@ -230,20 +231,38 @@ public function testGetAfterReject()
230231
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
231232
$this->assertNotNull($connection->get());
232233

233-
$redis->del('messenger-rejectthenget');
234+
$connection->cleanup();
234235
}
235236

236237
public function testGetNonBlocking()
237238
{
238239
$redis = new \Redis();
239240

240241
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
242+
$connection->cleanup();
241243

242244
$this->assertNull($connection->get()); // no message, should return null immediately
243245
$connection->add('1', []);
244246
$this->assertNotEmpty($message = $connection->get());
245247
$connection->reject($message['id']);
246-
$redis->del('messenger-getnonblocking');
248+
249+
$connection->cleanup();
250+
}
251+
252+
public function testGetDelayed()
253+
{
254+
$redis = new \Redis();
255+
256+
$connection = Connection::fromDsn('redis://localhost/messenger-delayed', [], $redis);
257+
$connection->cleanup();
258+
259+
$connection->add('1', [], 100);
260+
$this->assertNull($connection->get());
261+
usleep(300000);
262+
$this->assertNotEmpty($message = $connection->get());
263+
$connection->reject($message['id']);
264+
265+
$connection->cleanup();
247266
}
248267

249268
public function testJsonError()

Transport/RedisExt/Connection.php

Lines changed: 52 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -141,33 +141,29 @@ public function get(): ?array
141141
if ($this->autoSetup) {
142142
$this->setup();
143143
}
144+
$now = microtime();
145+
$now = substr($now, 11).substr($now, 2, 3);
144146

145-
try {
146-
$queuedMessageCount = $this->connection->zcount($this->queue, 0, $this->getCurrentTimeInMilliseconds());
147-
} catch (\RedisException $e) {
148-
throw new TransportException($e->getMessage(), 0, $e);
149-
}
147+
$queuedMessageCount = $this->rawCommand('ZCOUNT', 0, $now);
150148

151-
if ($queuedMessageCount) {
152-
for ($i = 0; $i < $queuedMessageCount; ++$i) {
153-
try {
154-
$queuedMessages = $this->connection->zpopmin($this->queue, 1);
155-
} catch (\RedisException $e) {
156-
throw new TransportException($e->getMessage(), 0, $e);
157-
}
149+
while ($queuedMessageCount--) {
150+
if (![$queuedMessage, $expiry] = $this->rawCommand('ZPOPMIN', 1)) {
151+
break;
152+
}
153+
154+
if (\strlen($expiry) === \strlen($now) ? $expiry > $now : \strlen($expiry) < \strlen($now)) {
155+
// if a future-placed message is popped because of a race condition with
156+
// another running consumer, the message is readded to the queue
158157

159-
foreach ($queuedMessages as $queuedMessage => $time) {
160-
$queuedMessage = json_decode($queuedMessage, true);
161-
// if a futured placed message is actually popped because of a race condition with
162-
// another running message consumer, the message is readded to the queue by add function
163-
// else its just added stream and will be available for all stream consumers
164-
$this->add(
165-
$queuedMessage['body'],
166-
$queuedMessage['headers'],
167-
$time - $this->getCurrentTimeInMilliseconds()
168-
);
158+
if (!$this->rawCommand('ZADD', 'NX', $expiry, $queuedMessage)) {
159+
throw new TransportException('Could not add a message to the redis stream.');
169160
}
161+
162+
break;
170163
}
164+
165+
$queuedMessage = json_decode($queuedMessage, true);
166+
$this->add($queuedMessage['body'], $queuedMessage['headers'], 0);
171167
}
172168

173169
$messageId = '>'; // will receive new messages
@@ -255,7 +251,7 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
255251
}
256252

257253
try {
258-
if ($delayInMs > 0) { // the delay could be smaller 0 in a queued message
254+
if ($delayInMs > 0) { // the delay is <= 0 for queued messages
259255
$message = json_encode([
260256
'body' => $body,
261257
'headers' => $headers,
@@ -267,8 +263,18 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
267263
throw new TransportException(json_last_error_msg());
268264
}
269265

270-
$score = $this->getCurrentTimeInMilliseconds() + $delayInMs;
271-
$added = $this->connection->zadd($this->queue, ['NX'], $score, $message);
266+
$now = explode(' ', microtime(), 2);
267+
$now[0] = str_pad($delayInMs + substr($now[0], 2, 3), 3, '0', \STR_PAD_LEFT);
268+
if (3 < \strlen($now[0])) {
269+
$now[1] += substr($now[0], 0, -3);
270+
$now[0] = substr($now[0], -3);
271+
272+
if (\is_float($now[1])) {
273+
throw new TransportException("Message delay is too big: {$delayInMs}ms.");
274+
}
275+
}
276+
277+
$added = $this->rawCommand('ZADD', 'NX', $now[1].$now[0], $message);
272278
} else {
273279
$message = json_encode([
274280
'body' => $body,
@@ -316,14 +322,30 @@ public function setup(): void
316322
$this->autoSetup = false;
317323
}
318324

319-
private function getCurrentTimeInMilliseconds(): int
320-
{
321-
return (int) (microtime(true) * 1000);
322-
}
323-
324325
public function cleanup(): void
325326
{
326327
$this->connection->del($this->stream);
327328
$this->connection->del($this->queue);
328329
}
330+
331+
/**
332+
* @return mixed
333+
*/
334+
private function rawCommand(string $command, ...$arguments)
335+
{
336+
try {
337+
$result = $this->connection->rawCommand($command, $this->queue, ...$arguments);
338+
} catch (\RedisException $e) {
339+
throw new TransportException($e->getMessage(), 0, $e);
340+
}
341+
342+
if (false === $result) {
343+
if ($error = $this->connection->getLastError() ?: null) {
344+
$this->connection->clearLastError();
345+
}
346+
throw new TransportException($error ?? sprintf('Could not run "%s" on Redis queue.', $command));
347+
}
348+
349+
return $result;
350+
}
329351
}

0 commit comments

Comments
 (0)