Skip to content

Commit 34f130d

Browse files
authored
Merge pull request #400 from keithgunning/xautoclaim-cursor-fix
Fix XAUTOCLAIM skipping the initial start message.
2 parents f29ff4e + 4075e65 commit 34f130d

File tree

3 files changed

+25
-3
lines changed

3 files changed

+25
-3
lines changed

cmd_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1536,7 +1536,7 @@ func xautoclaim(
15361536
return nextCallId, nil
15371537
}
15381538

1539-
msgs := g.pendingAfter(start)
1539+
msgs := g.pendingAfterOrEqual(start)
15401540
var res []StreamEntry
15411541
for i, p := range msgs {
15421542
if minIdleTime > 0 && now.Before(p.lastDelivery.Add(minIdleTime)) {

cmd_stream_test.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,6 +1322,21 @@ func TestStreamAutoClaim(t *testing.T) {
13221322
proto.Array(),
13231323
),
13241324
)
1325+
1326+
// read again using the ID returned from the last XAUTOCLAIM call as 'start'.
1327+
// the results include that starting message. unlike XREADGROUP, the results of XAUTOCLAIM
1328+
// are INCLUSIVE of the start ID.
1329+
mustDo(t, c,
1330+
"XAUTOCLAIM", "planets", "processing", "bob", "15000", "0-2", "COUNT", "1",
1331+
proto.Array(
1332+
proto.String("0-0"),
1333+
proto.Array(
1334+
proto.Array(proto.String("0-2"), proto.Strings("name", "Venus")),
1335+
),
1336+
proto.Array(),
1337+
),
1338+
)
1339+
13251340
mustDo(t, c,
13261341
"XINFO", "CONSUMERS", "planets", "processing",
13271342
proto.Array(
@@ -1350,8 +1365,8 @@ func TestStreamAutoClaim(t *testing.T) {
13501365
proto.Array(
13511366
proto.String("0-2"),
13521367
proto.String("bob"),
1353-
proto.Int(20000),
1354-
proto.Int(4),
1368+
proto.Int(0),
1369+
proto.Int(5),
13551370
),
13561371
),
13571372
)

stream.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,13 @@ func (s *streamKey) delete(ids []string) (int, error) {
440440
return count, nil
441441
}
442442

443+
func (g *streamGroup) pendingAfterOrEqual(id string) []pendingEntry {
444+
pos := sort.Search(len(g.pending), func(i int) bool {
445+
return streamCmp(id, g.pending[i].id) <= 0
446+
})
447+
return g.pending[pos:]
448+
}
449+
443450
func (g *streamGroup) pendingAfter(id string) []pendingEntry {
444451
pos := sort.Search(len(g.pending), func(i int) bool {
445452
return streamCmp(id, g.pending[i].id) < 0

0 commit comments

Comments
 (0)