1
- from typing import Any , Optional , List , Tuple , TYPE_CHECKING , cast
1
+ from itertools import chain
2
+ from typing import Any , Optional , TYPE_CHECKING , cast
2
3
3
4
from rocksdict import ReadOptions
4
5
@@ -110,26 +111,29 @@ def _flush(self, processed_offset: Optional[int], changelog_offset: Optional[int
110
111
111
112
def expire_windows (
112
113
self , duration_ms : int , prefix : bytes , grace_ms : int = 0
113
- ) -> List [ Tuple [ Tuple [int , int ], Any ]]:
114
+ ) -> list [ tuple [ tuple [int , int ], Any ]]:
114
115
"""
115
- Get a list of expired windows from RocksDB considering latest timestamp,
116
- window size and grace period.
117
- It marks the latest found window as expired in the expiration index, so
118
- calling this method multiple times will yield different results for the same
119
- "latest timestamp".
116
+ Get all expired windows from RocksDB based on the latest timestamp,
117
+ window duration, and an optional grace period.
118
+
119
+ This method marks the latest found window as expired in the expiration index,
120
+ so consecutive calls may yield different results for the same "latest timestamp".
120
121
121
122
How it works:
122
- - First, it looks for the start time of the last expired window for the current
123
- prefix using expiration cache. If it's found, it will be used to reduce
124
- the search space and to avoid returning already expired windows.
125
- - Then it goes over window segments and fetches the windows
126
- that should be expired.
127
- - At last, it updates the expiration cache with the start time of the latest
128
- found windows
129
-
130
- :return: sorted list of tuples in format `((start, end), value)`
123
+ - First, it checks the expiration cache for the start time of the last expired
124
+ window for the current prefix. If found, this value helps reduce the search
125
+ space and prevents returning previously expired windows.
126
+ - Next, it iterates over window segments and identifies the windows that should
127
+ be marked as expired.
128
+ - Finally, it updates the expiration cache with the start time of the latest
129
+ windows found.
130
+
131
+ :param duration_ms: The duration of each window in milliseconds.
132
+ :param prefix: The key prefix for filtering windows.
133
+ :param grace_ms: An optional grace period in milliseconds to delay expiration.
134
+ Defaults to 0, meaning no grace period is applied.
135
+ :return: A generator that yields sorted tuples in the format `((start, end), value)`.
131
136
"""
132
-
133
137
latest_timestamp = self ._latest_timestamp_ms
134
138
start_to = latest_timestamp - duration_ms - grace_ms
135
139
start_from = - 1
@@ -145,10 +149,12 @@ def expire_windows(
145
149
146
150
# Use the latest expired timestamp to limit the iteration over
147
151
# only those windows that have not been expired before
148
- expired_windows = self ._get_windows (
149
- start_from_ms = start_from ,
150
- start_to_ms = start_to ,
151
- prefix = prefix ,
152
+ expired_windows = list (
153
+ self .get_windows (
154
+ start_from_ms = start_from ,
155
+ start_to_ms = start_to ,
156
+ prefix = prefix ,
157
+ )
152
158
)
153
159
if expired_windows :
154
160
# Save the start of the latest expired window to the expiration index
@@ -170,52 +176,65 @@ def _serialize_key(self, key: Any, prefix: bytes) -> bytes:
170
176
key_bytes = key if isinstance (key , bytes ) else serialize (key , dumps = self ._dumps )
171
177
return prefix + PREFIX_SEPARATOR + key_bytes
172
178
173
- def _get_windows (
174
- self , start_from_ms : int , start_to_ms : int , prefix : bytes
175
- ) -> List [Tuple [Tuple [int , int ], Any ]]:
179
+ def get_windows (
180
+ self ,
181
+ start_from_ms : int ,
182
+ start_to_ms : int ,
183
+ prefix : bytes ,
184
+ backwards : bool = False ,
185
+ ) -> list [tuple [tuple [int , int ], Any ]]:
176
186
"""
177
- Get all windows starting between "start_from" and "start_to"
178
- within the given prefix.
179
-
187
+ Get all windows that start between "start_from_ms" and "start_to_ms"
188
+ within the specified prefix.
180
189
181
- This function also checks the update cache in case some updates have not
182
- been committed to RocksDB yet .
190
+ This function also checks the update cache for any updates not yet
191
+ committed to RocksDB.
183
192
184
- :param start_from_ms: minimal window start time, exclusive
185
- :param start_to_ms: maximum window start time, inclusive
186
- :return: sorted list of tuples in format `((start, end), value)`
193
+ :param start_from_ms: The minimal window start time, exclusive.
194
+ :param start_to_ms: The maximum window start time, inclusive.
195
+ :param prefix: The key prefix for filtering windows.
196
+ :param backwards: If True, yields windows in reverse order.
197
+ :return: A sorted list of tuples in the format `((start, end), value)`.
187
198
"""
188
-
189
- # Iterate over rocksdb within the given prefix and (start_form, start_to)
190
- # timestamps
191
199
seek_from = max (start_from_ms , 0 )
192
200
seek_from_key = encode_window_prefix (prefix = prefix , start_ms = seek_from )
193
201
194
- # Add +1 to make the "start_to" inclusive
202
+ # Add +1 to make the upper bound inclusive
195
203
seek_to = start_to_ms + 1
196
204
seek_to_key = encode_window_prefix (prefix = prefix , start_ms = seek_to )
197
205
198
- # Set iterator bounds to reduce the potential IO
206
+ # Set iterator bounds to reduce IO by limiting the range of keys fetched
199
207
read_opt = ReadOptions ()
200
208
read_opt .set_iterate_lower_bound (seek_from_key )
201
209
read_opt .set_iterate_upper_bound (seek_to_key )
202
210
203
- windows = {}
204
- for key , value in self ._partition .iter_items (
211
+ # Create an iterator over the state store
212
+ db_windows = self ._partition .iter_items (
205
213
read_opt = read_opt , from_key = seek_from_key
206
- ):
207
- message_key , start , end = parse_window_key (key )
214
+ )
215
+
216
+ # Get cached updates with matching keys
217
+ cached_windows = [
218
+ (k , v )
219
+ for k , v in self ._update_cache .get ("default" , {}).get (prefix , {}).items ()
220
+ if seek_from_key < k <= seek_to_key
221
+ ]
222
+
223
+ # Iterate over stored and cached windows (cached come first) and
224
+ # merge them in a single dict
225
+ deleted_windows = set ()
226
+ merged_windows = {}
227
+ for key , value in chain (cached_windows , db_windows ):
228
+ if value is DELETED :
229
+ deleted_windows .add (key )
230
+ elif key not in merged_windows and key not in deleted_windows :
231
+ merged_windows [key ] = value
232
+
233
+ final_windows = []
234
+ for key in sorted (merged_windows , reverse = backwards ):
235
+ _ , start , end = parse_window_key (key )
208
236
if start_from_ms < start <= start_to_ms :
209
- windows [(start , end )] = self ._deserialize_value (value )
210
-
211
- for window_key , window_value in (
212
- self ._update_cache .get ("default" , {}).get (prefix , {}).items ()
213
- ):
214
- message_key , start , end = parse_window_key (window_key )
215
- if window_value is DELETED :
216
- windows .pop ((start , end ), None )
217
- continue
218
- elif start_from_ms < start <= start_to_ms :
219
- windows [(start , end )] = self ._deserialize_value (window_value )
220
-
221
- return sorted (windows .items ())
237
+ value = self ._deserialize_value (merged_windows [key ])
238
+ final_windows .append (((start , end ), value ))
239
+
240
+ return final_windows
0 commit comments