|
| 1 | +from collections import deque |
1 | 2 | from typing import Any, Optional, Union, cast
|
2 | 3 |
|
3 | 4 | from quixstreams.state.base.transaction import (
|
@@ -183,19 +184,25 @@ def _expire(self) -> None:
|
183 | 184 | RocksDB store within the current transaction.
|
184 | 185 | """
|
185 | 186 | updates = self._update_cache.get_updates()
|
| 187 | + # Accumulate the expired keys separately to avoid |
| 188 | + # mutating the update cache during iteration |
| 189 | + keys_to_delete: deque[tuple[bytes, bytes]] = deque() |
| 190 | + |
186 | 191 | for prefix, cached in updates.items():
|
187 | 192 | min_eligible_timestamp = self._get_min_eligible_timestamp(prefix)
|
188 | 193 |
|
189 | 194 | key = self._serialize_key(min_eligible_timestamp, prefix)
|
190 |
| - |
191 |
| - # Cast to list to avoid RuntimeError: dictionary changed size during iteration |
192 |
| - for cached_key in list(cached): |
| 195 | + for cached_key in cached: |
193 | 196 | if cached_key < key:
|
194 |
| - self._update_cache.delete(cached_key, prefix) |
| 197 | + keys_to_delete.append((cached_key, prefix)) |
195 | 198 |
|
196 | 199 | stored = self._partition.iter_items(lower_bound=prefix, upper_bound=key)
|
197 | 200 | for stored_key, _ in stored:
|
198 |
| - self._update_cache.delete(stored_key, prefix) |
| 201 | + keys_to_delete.append((stored_key, prefix)) |
| 202 | + |
| 203 | + # Mark the expired keys as deleted in the update cache |
| 204 | + for key, prefix in keys_to_delete: |
| 205 | + self._update_cache.delete(key, prefix) |
199 | 206 |
|
200 | 207 | def _ensure_bytes(self, prefix: Any) -> bytes:
|
201 | 208 | if isinstance(prefix, bytes):
|
|
0 commit comments