Skip to content

Commit 3085a7f

Browse files
ofekshenawapetyaslavova
authored andcommitted
Add new stream commands (#3711)
* Add new stream commands * optimization changes
1 parent ff21998 commit 3085a7f

File tree

3 files changed

+351
-0
lines changed

3 files changed

+351
-0
lines changed

redis/commands/core.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3484,6 +3484,28 @@ def xack(self, name: KeyT, groupname: GroupT, *ids: StreamIdT) -> ResponseT:
34843484
"""
34853485
return self.execute_command("XACK", name, groupname, *ids)
34863486

3487+
def xackdel(
3488+
self,
3489+
name: KeyT,
3490+
groupname: GroupT,
3491+
*ids: StreamIdT,
3492+
ref_policy: Literal["KEEPREF", "DELREF", "ACKED"] = "KEEPREF",
3493+
) -> ResponseT:
3494+
"""
3495+
Combines the functionality of XACK and XDEL. Acknowledges the specified
3496+
message IDs in the given consumer group and simultaneously attempts to
3497+
delete the corresponding entries from the stream.
3498+
"""
3499+
if not ids:
3500+
raise DataError("XACKDEL requires at least one message ID")
3501+
3502+
if ref_policy not in {"KEEPREF", "DELREF", "ACKED"}:
3503+
raise DataError("XACKDEL ref_policy must be one of: KEEPREF, DELREF, ACKED")
3504+
3505+
pieces = [name, groupname, ref_policy, "IDS", len(ids)]
3506+
pieces.extend(ids)
3507+
return self.execute_command("XACKDEL", *pieces)
3508+
34873509
def xadd(
34883510
self,
34893511
name: KeyT,
@@ -3494,6 +3516,7 @@ def xadd(
34943516
nomkstream: bool = False,
34953517
minid: Union[StreamIdT, None] = None,
34963518
limit: Optional[int] = None,
3519+
ref_policy: Optional[Literal["KEEPREF", "DELREF", "ACKED"]] = None,
34973520
) -> ResponseT:
34983521
"""
34993522
Add to a stream.
@@ -3507,13 +3530,20 @@ def xadd(
35073530
minid: the minimum id in the stream to query.
35083531
Can't be specified with maxlen.
35093532
limit: specifies the maximum number of entries to retrieve
3533+
ref_policy: optional reference policy for consumer groups when trimming:
3534+
- KEEPREF (default): When trimming, preserves references in consumer groups' PEL
3535+
- DELREF: When trimming, removes all references from consumer groups' PEL
3536+
- ACKED: When trimming, only removes entries acknowledged by all consumer groups
35103537
35113538
For more information see https://redis.io/commands/xadd
35123539
"""
35133540
pieces: list[EncodableT] = []
35143541
if maxlen is not None and minid is not None:
35153542
raise DataError("Only one of ```maxlen``` or ```minid``` may be specified")
35163543

3544+
if ref_policy is not None and ref_policy not in {"KEEPREF", "DELREF", "ACKED"}:
3545+
raise DataError("XADD ref_policy must be one of: KEEPREF, DELREF, ACKED")
3546+
35173547
if maxlen is not None:
35183548
if not isinstance(maxlen, int) or maxlen < 0:
35193549
raise DataError("XADD maxlen must be non-negative integer")
@@ -3530,6 +3560,8 @@ def xadd(
35303560
pieces.extend([b"LIMIT", limit])
35313561
if nomkstream:
35323562
pieces.append(b"NOMKSTREAM")
3563+
if ref_policy is not None:
3564+
pieces.append(ref_policy)
35333565
pieces.append(id)
35343566
if not isinstance(fields, dict) or len(fields) == 0:
35353567
raise DataError("XADD fields must be a non-empty dict")
@@ -3683,6 +3715,26 @@ def xdel(self, name: KeyT, *ids: StreamIdT) -> ResponseT:
36833715
"""
36843716
return self.execute_command("XDEL", name, *ids)
36853717

3718+
def xdelex(
3719+
self,
3720+
name: KeyT,
3721+
*ids: StreamIdT,
3722+
ref_policy: Literal["KEEPREF", "DELREF", "ACKED"] = "KEEPREF",
3723+
) -> ResponseT:
3724+
"""
3725+
Extended version of XDEL that provides more control over how message entries
3726+
are deleted concerning consumer groups.
3727+
"""
3728+
if not ids:
3729+
raise DataError("XDELEX requires at least one message ID")
3730+
3731+
if ref_policy not in {"KEEPREF", "DELREF", "ACKED"}:
3732+
raise DataError("XDELEX ref_policy must be one of: KEEPREF, DELREF, ACKED")
3733+
3734+
pieces = [name, ref_policy, "IDS", len(ids)]
3735+
pieces.extend(ids)
3736+
return self.execute_command("XDELEX", *pieces)
3737+
36863738
def xgroup_create(
36873739
self,
36883740
name: KeyT,
@@ -4034,6 +4086,7 @@ def xtrim(
40344086
approximate: bool = True,
40354087
minid: Union[StreamIdT, None] = None,
40364088
limit: Optional[int] = None,
4089+
ref_policy: Optional[Literal["KEEPREF", "DELREF", "ACKED"]] = None,
40374090
) -> ResponseT:
40384091
"""
40394092
Trims old messages from a stream.
@@ -4044,6 +4097,10 @@ def xtrim(
40444097
minid: the minimum id in the stream to query
40454098
Can't be specified with maxlen.
40464099
limit: specifies the maximum number of entries to retrieve
4100+
ref_policy: optional reference policy for consumer groups:
4101+
- KEEPREF (default): Trims entries but preserves references in consumer groups' PEL
4102+
- DELREF: Trims entries and removes all references from consumer groups' PEL
4103+
- ACKED: Only trims entries that were read and acknowledged by all consumer groups
40474104
40484105
For more information see https://redis.io/commands/xtrim
40494106
"""
@@ -4054,6 +4111,9 @@ def xtrim(
40544111
if maxlen is None and minid is None:
40554112
raise DataError("One of ``maxlen`` or ``minid`` must be specified")
40564113

4114+
if ref_policy is not None and ref_policy not in {"KEEPREF", "DELREF", "ACKED"}:
4115+
raise DataError("XTRIM ref_policy must be one of: KEEPREF, DELREF, ACKED")
4116+
40574117
if maxlen is not None:
40584118
pieces.append(b"MAXLEN")
40594119
if minid is not None:
@@ -4067,6 +4127,8 @@ def xtrim(
40674127
if limit is not None:
40684128
pieces.append(b"LIMIT")
40694129
pieces.append(limit)
4130+
if ref_policy is not None:
4131+
pieces.append(ref_policy)
40704132

40714133
return self.execute_command("XTRIM", name, *pieces)
40724134

tests/test_asyncio/test_commands.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3465,6 +3465,156 @@ async def test_xtrim(self, r: redis.Redis):
34653465
# 1 message is trimmed
34663466
assert await r.xtrim(stream, 3, approximate=False) == 1
34673467

3468+
@skip_if_server_version_lt("8.1.224")
3469+
async def test_xdelex(self, r: redis.Redis):
3470+
stream = "stream"
3471+
3472+
m1 = await r.xadd(stream, {"foo": "bar"})
3473+
m2 = await r.xadd(stream, {"foo": "bar"})
3474+
m3 = await r.xadd(stream, {"foo": "bar"})
3475+
m4 = await r.xadd(stream, {"foo": "bar"})
3476+
3477+
# Test XDELEX with default ref_policy (KEEPREF)
3478+
result = await r.xdelex(stream, m1)
3479+
assert result == [1]
3480+
3481+
# Test XDELEX with explicit KEEPREF
3482+
result = await r.xdelex(stream, m2, ref_policy="KEEPREF")
3483+
assert result == [1]
3484+
3485+
# Test XDELEX with DELREF
3486+
result = await r.xdelex(stream, m3, ref_policy="DELREF")
3487+
assert result == [1]
3488+
3489+
# Test XDELEX with ACKED
3490+
result = await r.xdelex(stream, m4, ref_policy="ACKED")
3491+
assert result == [1]
3492+
3493+
# Test with non-existent ID
3494+
result = await r.xdelex(stream, "999999-0", ref_policy="KEEPREF")
3495+
assert result == [-1]
3496+
3497+
# Test with multiple IDs
3498+
m5 = await r.xadd(stream, {"foo": "bar"})
3499+
m6 = await r.xadd(stream, {"foo": "bar"})
3500+
result = await r.xdelex(stream, m5, m6, ref_policy="KEEPREF")
3501+
assert result == [1, 1]
3502+
3503+
# Test error cases
3504+
with pytest.raises(redis.DataError):
3505+
await r.xdelex(stream, "123-0", ref_policy="INVALID")
3506+
3507+
with pytest.raises(redis.DataError):
3508+
await r.xdelex(stream) # No IDs provided
3509+
3510+
@skip_if_server_version_lt("8.1.224")
3511+
async def test_xackdel(self, r: redis.Redis):
3512+
stream = "stream"
3513+
group = "group"
3514+
consumer = "consumer"
3515+
3516+
m1 = await r.xadd(stream, {"foo": "bar"})
3517+
m2 = await r.xadd(stream, {"foo": "bar"})
3518+
m3 = await r.xadd(stream, {"foo": "bar"})
3519+
m4 = await r.xadd(stream, {"foo": "bar"})
3520+
await r.xgroup_create(stream, group, 0)
3521+
3522+
await r.xreadgroup(group, consumer, streams={stream: ">"})
3523+
3524+
# Test XACKDEL with default ref_policy (KEEPREF)
3525+
result = await r.xackdel(stream, group, m1)
3526+
assert result == [1]
3527+
3528+
# Test XACKDEL with explicit KEEPREF
3529+
result = await r.xackdel(stream, group, m2, ref_policy="KEEPREF")
3530+
assert result == [1]
3531+
3532+
# Test XACKDEL with DELREF
3533+
result = await r.xackdel(stream, group, m3, ref_policy="DELREF")
3534+
assert result == [1]
3535+
3536+
# Test XACKDEL with ACKED
3537+
result = await r.xackdel(stream, group, m4, ref_policy="ACKED")
3538+
assert result == [1]
3539+
3540+
# Test with non-existent ID
3541+
result = await r.xackdel(stream, group, "999999-0", ref_policy="KEEPREF")
3542+
assert result == [-1]
3543+
3544+
# Test error cases
3545+
with pytest.raises(redis.DataError):
3546+
await r.xackdel(stream, group, m1, ref_policy="INVALID")
3547+
3548+
with pytest.raises(redis.DataError):
3549+
await r.xackdel(stream, group) # No IDs provided
3550+
3551+
@skip_if_server_version_lt("8.1.224")
3552+
async def test_xtrim_with_options(self, r: redis.Redis):
3553+
stream = "stream"
3554+
3555+
await r.xadd(stream, {"foo": "bar"})
3556+
await r.xadd(stream, {"foo": "bar"})
3557+
await r.xadd(stream, {"foo": "bar"})
3558+
await r.xadd(stream, {"foo": "bar"})
3559+
3560+
# Test XTRIM with KEEPREF ref_policy
3561+
assert (
3562+
await r.xtrim(stream, maxlen=2, approximate=False, ref_policy="KEEPREF")
3563+
== 2
3564+
)
3565+
3566+
await r.xadd(stream, {"foo": "bar"})
3567+
await r.xadd(stream, {"foo": "bar"})
3568+
3569+
# Test XTRIM with DELREF ref_policy
3570+
assert (
3571+
await r.xtrim(stream, maxlen=2, approximate=False, ref_policy="DELREF") == 2
3572+
)
3573+
3574+
await r.xadd(stream, {"foo": "bar"})
3575+
await r.xadd(stream, {"foo": "bar"})
3576+
3577+
# Test XTRIM with ACKED ref_policy
3578+
assert (
3579+
await r.xtrim(stream, maxlen=2, approximate=False, ref_policy="ACKED") == 2
3580+
)
3581+
3582+
# Test error case
3583+
with pytest.raises(redis.DataError):
3584+
await r.xtrim(stream, maxlen=2, ref_policy="INVALID")
3585+
3586+
@skip_if_server_version_lt("8.1.224")
3587+
async def test_xadd_with_options(self, r: redis.Redis):
3588+
stream = "stream"
3589+
3590+
# Test XADD with KEEPREF ref_policy
3591+
await r.xadd(
3592+
stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF"
3593+
)
3594+
await r.xadd(
3595+
stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF"
3596+
)
3597+
await r.xadd(
3598+
stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF"
3599+
)
3600+
assert await r.xlen(stream) == 2
3601+
3602+
# Test XADD with DELREF ref_policy
3603+
await r.xadd(
3604+
stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="DELREF"
3605+
)
3606+
assert await r.xlen(stream) == 2
3607+
3608+
# Test XADD with ACKED ref_policy
3609+
await r.xadd(
3610+
stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="ACKED"
3611+
)
3612+
assert await r.xlen(stream) == 2
3613+
3614+
# Test error case
3615+
with pytest.raises(redis.DataError):
3616+
await r.xadd(stream, {"foo": "bar"}, ref_policy="INVALID")
3617+
34683618
@pytest.mark.onlynoncluster
34693619
async def test_bitfield_operations(self, r: redis.Redis):
34703620
# comments show affected bits

0 commit comments

Comments
 (0)