Skip to content

Commit bb2bd2a

Browse files
committed
Optimize TimestampedPartitionTransaction.get_last and fix undefined variable warning
- Check the boundaries of the cache before iterating over it
1 parent fc92e8e commit bb2bd2a

File tree

1 file changed

+17
-8
lines changed

1 file changed

+17
-8
lines changed

quixstreams/state/rocksdb/timestamped.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,20 +89,29 @@ def get_last(self, timestamp: int, prefix: Any) -> Optional[Any]:
8989
prefix = self._ensure_bytes(prefix)
9090
min_eligible_timestamp = self._get_min_eligible_timestamp(prefix)
9191

92+
if timestamp < min_eligible_timestamp:
93+
return None
94+
9295
lower_bound = self._serialize_key(min_eligible_timestamp, prefix)
9396
# +1 because upper bound is exclusive
9497
upper_bound = self._serialize_key(timestamp + 1, prefix)
9598

96-
value: Optional[bytes] = None
97-
9899
deletes = self._update_cache.get_deletes()
99100
updates = self._update_cache.get_updates().get(prefix, {})
100101

101102
cached = sorted(updates.items(), reverse=True)
102-
for cached_key, cached_value in cached:
103-
if lower_bound <= cached_key < upper_bound and cached_key not in deletes:
104-
value = cached_value
105-
break
103+
value: Optional[bytes] = None
104+
cached_key: Optional[bytes] = None
105+
# First check the boundaries to skip the iteration if the cached values
106+
# are outside the search boundaries
107+
if cached and cached[0][0] >= lower_bound and cached[-1][0] <= upper_bound:
108+
for cached_key, cached_value in cached:
109+
if (
110+
lower_bound <= cached_key < upper_bound
111+
and cached_key not in deletes
112+
):
113+
value = cached_value
114+
break
106115

107116
stored = self._partition.iter_items(
108117
lower_bound=lower_bound,
@@ -113,7 +122,7 @@ def get_last(self, timestamp: int, prefix: Any) -> Optional[Any]:
113122
if stored_key in deletes:
114123
continue
115124

116-
if value is None or cached_key < stored_key:
125+
if value is None or (cached_key and cached_key < stored_key):
117126
value = stored_value
118127

119128
# We only care about the first not deleted item when
@@ -198,7 +207,7 @@ def _serialize_key(self, key: Union[int, bytes], prefix: bytes) -> bytes:
198207
return prefix + SEPARATOR + int_to_int64_bytes(key)
199208
elif isinstance(key, bytes):
200209
return prefix + SEPARATOR + key
201-
raise ValueError(f"Invalid key type: {type(key)}")
210+
raise TypeError(f"Invalid key type: {type(key)}")
202211

203212
def _get_min_eligible_timestamp(self, prefix: bytes) -> int:
204213
"""

0 commit comments

Comments
 (0)