Skip to content

Commit fd1fa0e

Browse files
committed
Expire once per transaction
1 parent ea4c611 commit fd1fa0e

File tree

1 file changed

+19
-19
lines changed

1 file changed

+19
-19
lines changed

quixstreams/state/rocksdb/timestamped.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -143,32 +143,34 @@ def set_for_timestamp(
143143
timestamp - retention_ms,
144144
)
145145
self._set_min_eligible_timestamp(prefix, min_eligible_timestamp)
146-
self._expire(prefix=prefix)
147146

148-
def _expire(self, prefix: bytes) -> None:
147+
def _flush(self, changelog_offset: Optional[int]):
148+
self._expire()
149+
super()._flush(changelog_offset=changelog_offset)
150+
151+
def _expire(self) -> None:
149152
"""
150-
Delete all entries for a given prefix with timestamps less than the
151-
provided timestamp.
153+
Delete all entries with timestamps less than the minimum
154+
eligible timestamp for the given prefix.
152155
153156
This applies to both the in-memory update cache and the underlying
154157
RocksDB store within the current transaction.
155-
156-
:param prefix: The key prefix.
157158
"""
158159

159-
min_eligible_timestamp = self._get_min_eligible_timestamp(prefix)
160+
updates = self._update_cache.get_updates()
161+
for prefix, cached in updates.items():
162+
min_eligible_timestamp = self._get_min_eligible_timestamp(prefix)
160163

161-
key = self._serialize_key(min_eligible_timestamp, prefix)
164+
key = self._serialize_key(min_eligible_timestamp, prefix)
162165

163-
cached = self._update_cache.get_updates().get(prefix, {})
164-
# Cast to list to avoid RuntimeError: dictionary changed size during iteration
165-
for cached_key in list(cached):
166-
if cached_key < key:
167-
self._update_cache.delete(cached_key, prefix)
166+
# Cast to list to avoid RuntimeError: dictionary changed size during iteration
167+
for cached_key in list(cached):
168+
if cached_key < key:
169+
self._update_cache.delete(cached_key, prefix)
168170

169-
stored = self._partition.iter_items(lower_bound=prefix, upper_bound=key)
170-
for stored_key, _ in stored:
171-
self._update_cache.delete(stored_key, prefix)
171+
stored = self._partition.iter_items(lower_bound=prefix, upper_bound=key)
172+
for stored_key, _ in stored:
173+
self._update_cache.delete(stored_key, prefix)
172174

173175
def _ensure_bytes(self, prefix: Any) -> bytes:
174176
if isinstance(prefix, bytes):
@@ -185,9 +187,7 @@ def _serialize_key(self, key: Union[int, bytes], prefix: bytes) -> bytes:
185187
def _get_min_eligible_timestamp(self, prefix: bytes) -> int:
186188
cache = self._min_eligible_timestamps
187189
return (
188-
cache.timestamps.get(prefix)
189-
or self.get(key=cache.key, prefix=prefix, cf_name=cache.cf_name)
190-
or 0
190+
cache.timestamps.get(prefix) or self.get(key=cache.key, prefix=prefix) or 0
191191
)
192192

193193
def _set_min_eligible_timestamp(self, prefix: bytes, timestamp: int) -> None:

0 commit comments

Comments
 (0)