Skip to content

Commit 3208766

Browse files
committed
Revert get_updates_for_prefix - reduce the amount of new code
1 parent be6af5b commit 3208766

File tree

3 files changed

+5
-46
lines changed

3 files changed

+5
-46
lines changed

quixstreams/state/base/transaction.py

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import (
77
TYPE_CHECKING,
88
Any,
9+
Dict,
910
Generic,
1011
Literal,
1112
Optional,
@@ -139,29 +140,15 @@ def get_column_families(self) -> Set[str]:
139140
"""
140141
return set(self._updated.keys()) | set(self._deleted.keys())
141142

142-
def get_updates(self, cf_name: str = "default") -> dict[bytes, dict[bytes, bytes]]:
143+
def get_updates(self, cf_name: str = "default") -> Dict[bytes, Dict[bytes, bytes]]:
143144
"""
144145
Get all updated keys (excluding deleted)
145-
in the format "{<prefix>: {<key>: <value>, ...}, ...}".
146+
in the format "{<prefix>: {<key>: <value>}}".
146147
147148
:param: cf_name: column family name
148149
"""
149150
return self._updated.get(cf_name, {})
150151

151-
def get_updates_for_prefix(
152-
self,
153-
prefix: bytes,
154-
cf_name: str = "default",
155-
) -> dict[bytes, bytes]:
156-
"""
157-
Get all updated keys (excluding deleted)
158-
in the format "{<key>: <value>, ...}".
159-
160-
:param: prefix: key prefix
161-
:param: cf_name: column family name
162-
"""
163-
return self._updated.get(cf_name, {}).get(prefix, {})
164-
165152
def get_deletes(self, cf_name: str = "default") -> Set[bytes]:
166153
"""
167154
Get all deleted keys (excluding updated) as a set.

quixstreams/state/rocksdb/timestamped.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,7 @@ def get_last(
9999
value: Optional[bytes] = None
100100

101101
deletes = self._update_cache.get_deletes(cf_name=cf_name)
102-
updates = self._update_cache.get_updates_for_prefix(
103-
prefix=prefix,
104-
cf_name=cf_name,
105-
)
102+
updates = self._update_cache.get_updates(cf_name=cf_name).get(prefix, {})
106103

107104
cached = sorted(updates.items(), reverse=True)
108105
for cached_key, cached_value in cached:
@@ -181,9 +178,7 @@ def _expire(
181178

182179
key = self._serialize_key(max(timestamp - retention_ms, 0), prefix)
183180

184-
cached = self._update_cache.get_updates_for_prefix(
185-
prefix=prefix, cf_name=cf_name
186-
)
181+
cached = self._update_cache.get_updates(cf_name=cf_name).get(prefix, {})
187182
# Cast to list to avoid RuntimeError: dictionary changed size during iteration
188183
for cached_key in list(cached):
189184
if cached_key < key:

tests/test_quixstreams/test_state/test_transaction.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -581,29 +581,6 @@ def test_get_updates_after_delete(self, cache: PartitionTransactionCache):
581581
cache.delete(key=b"key", prefix=b"prefix", cf_name="cf_name")
582582
assert cache.get_updates(cf_name="cf_name") == {b"prefix": {}}
583583

584-
def test_get_updates_for_prefix_empty(self, cache: PartitionTransactionCache):
585-
assert cache.get_updates_for_prefix(prefix=b"prefix", cf_name="cf_name") == {}
586-
587-
# Delete an item and make sure it's not in "updates"
588-
cache.delete(key=b"key", prefix=b"prefix", cf_name="cf_name")
589-
assert cache.get_updates_for_prefix(prefix=b"prefix", cf_name="cf_name") == {}
590-
591-
def test_get_updates_for_prefix_present(self, cache: PartitionTransactionCache):
592-
cache.set(key=b"key", value=b"value", prefix=b"prefix", cf_name="cf_name")
593-
cache.set(key=b"key", value=b"value", prefix=b"other_prefix", cf_name="cf_name")
594-
cache.set(key=b"key", value=b"value", prefix=b"other", cf_name="other_cf_name")
595-
596-
assert cache.get_updates_for_prefix(prefix=b"prefix", cf_name="cf_name") == {
597-
b"key": b"value"
598-
}
599-
600-
def test_get_updates_for_prefix_after_delete(
601-
self, cache: PartitionTransactionCache
602-
):
603-
cache.set(key=b"key", value=b"value", prefix=b"prefix", cf_name="cf_name")
604-
cache.delete(key=b"key", prefix=b"prefix", cf_name="cf_name")
605-
assert cache.get_updates_for_prefix(prefix=b"prefix", cf_name="cf_name") == {}
606-
607584
def test_get_deletes_empty(self, cache: PartitionTransactionCache):
608585
assert cache.get_deletes(cf_name="cf_name") == set()
609586

0 commit comments

Comments
 (0)