Skip to content

Commit 4495d0a

Browse files
committed
bug #31387 [Messenger] Fix Redis Connection::get() after reject() (chalasr)
This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Fix Redis Connection::get() after reject() | Q | A | ------------- | --- | Branch? | 4.3 | Bug fix? | yes | New feature? |no | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | n/a | License | MIT | Doc PR | n/a If a message is rejected, another consumer cannot read from the stream because the first subsequent call to `\Redis::xreadgroup()` returns false for some reason. Reproducer: https://github.com/chalasr/redis-transport-bug ping @alexander-schranz Commits ------- c05273f793 [Messenger] Fix Redis Connection::get() after reject()
2 parents 520c23f + 892cfc7 commit 4495d0a

File tree

2 files changed

+22
-4
lines changed

2 files changed

+22
-4
lines changed

Tests/Transport/RedisExt/ConnectionTest.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,22 @@ public function testUnexpectedRedisError()
112112
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
113113
$connection->get();
114114
}
115+
116+
public function testGetAfterReject()
117+
{
118+
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
119+
try {
120+
$connection->setup();
121+
} catch (TransportException $e) {
122+
}
123+
124+
$connection->add('1', []);
125+
$connection->add('2', []);
126+
127+
$failing = $connection->get();
128+
$connection->reject($failing['id']);
129+
130+
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
131+
$this->assertNotNull($connection->get());
132+
}
115133
}

Transport/RedisExt/Connection.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public function get(): ?array
8989
} catch (\RedisException $e) {
9090
}
9191

92-
if (false === $messages || $e) {
92+
if ($e || (false === $messages && !$this->couldHavePendingMessages)) {
9393
throw new TransportException(
9494
($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not read messages from the redis stream.'
9595
);
@@ -123,7 +123,7 @@ public function ack(string $id): void
123123
} catch (\RedisException $e) {
124124
}
125125

126-
if (!$acknowledged || $e) {
126+
if ($e || !$acknowledged) {
127127
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not acknowledge redis message "%s".', $id), 0, $e);
128128
}
129129
}
@@ -136,7 +136,7 @@ public function reject(string $id): void
136136
} catch (\RedisException $e) {
137137
}
138138

139-
if (!$deleted || $e) {
139+
if ($e || !$deleted) {
140140
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not delete message "%s" from the redis stream.', $id), 0, $e);
141141
}
142142
}
@@ -151,7 +151,7 @@ public function add(string $body, array $headers)
151151
} catch (\RedisException $e) {
152152
}
153153

154-
if (!$added || $e) {
154+
if ($e || !$added) {
155155
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not add a message to the redis stream.', 0, $e);
156156
}
157157
}

0 commit comments

Comments
 (0)