Skip to content

Commit 647d769

Browse files
committed
refactor
1 parent f854afa commit 647d769

File tree

1 file changed

+22
-45
lines changed

1 file changed

+22
-45
lines changed

quixstreams/state/rocksdb/timestamped.py

Lines changed: 22 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def __init__(
4747
dumps: DumpsFunc,
4848
loads: LoadsFunc,
4949
changelog_producer: Optional[ChangelogProducer] = None,
50-
):
50+
) -> None:
5151
super().__init__(
5252
partition=partition,
5353
dumps=dumps,
@@ -87,13 +87,7 @@ def get_last(
8787
"""
8888

8989
prefix = self._ensure_bytes(prefix)
90-
91-
latest_timestamp = max(
92-
self._get_timestamp(
93-
prefix=prefix, cache=self._latest_timestamps, default=0
94-
),
95-
timestamp,
96-
)
90+
latest_timestamp = self._get_latest_timestamp(prefix, timestamp)
9791

9892
# Negative retention is not allowed
9993
lower_bound = self._serialize_key(
@@ -143,7 +137,7 @@ def set_for_timestamp(
143137
prefix: Any,
144138
retention_ms: int = DAYS_7,
145139
cf_name: str = "default",
146-
):
140+
) -> None:
147141
"""Set a value for the timestamp.
148142
149143
This method acts as a proxy, passing the provided `timestamp` and `prefix`
@@ -166,7 +160,7 @@ def set_for_timestamp(
166160

167161
def _expire(
168162
self, timestamp: int, prefix: bytes, retention_ms: int, cf_name: str = "default"
169-
):
163+
) -> None:
170164
"""
171165
Delete all entries for a given prefix with timestamps less than the
172166
provided timestamp.
@@ -180,17 +174,8 @@ def _expire(
180174
:param cf_name: Column family name.
181175
"""
182176

183-
latest_timestamp = max(
184-
self._get_timestamp(
185-
prefix=prefix, cache=self._latest_timestamps, default=0
186-
),
187-
timestamp,
188-
)
189-
self._set_timestamp(
190-
cache=self._latest_timestamps,
191-
prefix=prefix,
192-
timestamp_ms=latest_timestamp,
193-
)
177+
latest_timestamp = self._get_latest_timestamp(prefix, timestamp)
178+
self._set_timestamp(prefix, latest_timestamp)
194179

195180
key = self._serialize_key(max(timestamp - retention_ms, 0), prefix)
196181

@@ -224,33 +209,25 @@ def _serialize_key(self, key: Union[int, bytes], prefix: bytes) -> bytes:
224209
case _:
225210
raise ValueError(f"Invalid key type: {type(key)}")
226211

227-
def _get_timestamp(
228-
self, cache: TimestampsCache, prefix: bytes, default: Any = None
229-
) -> Any:
230-
cached_ts = cache.timestamps.get(prefix)
231-
if cached_ts is not None:
232-
return cached_ts
212+
def _get_latest_timestamp(self, prefix: bytes, timestamp: int) -> Any:
213+
"""
214+
Get the latest timestamp for a given prefix.
233215
234-
stored_ts = self.get(
235-
key=cache.key,
236-
prefix=prefix,
237-
cf_name=cache.cf_name,
238-
default=default,
216+
If the timestamp is not found in the cache, it is fetched from the store.
217+
"""
218+
cache = self._latest_timestamps
219+
ts = (
220+
cache.timestamps.get(prefix)
221+
or self.get(key=cache.key, prefix=prefix, cf_name=cache.cf_name)
222+
or 0
239223
)
240-
if stored_ts is not None and not isinstance(stored_ts, int):
241-
raise ValueError(f"invalid timestamp {stored_ts}")
224+
cache.timestamps[prefix] = latest_timestamp = max(ts, timestamp)
225+
return latest_timestamp
242226

243-
cache.timestamps[prefix] = stored_ts or default
244-
return stored_ts
245-
246-
def _set_timestamp(self, cache: TimestampsCache, prefix: bytes, timestamp_ms: int):
247-
cache.timestamps[prefix] = timestamp_ms
248-
self.set(
249-
key=cache.key,
250-
value=timestamp_ms,
251-
prefix=prefix,
252-
cf_name=cache.cf_name,
253-
)
227+
def _set_timestamp(self, prefix: bytes, timestamp: int) -> None:
228+
cache = self._latest_timestamps
229+
cache.timestamps[prefix] = timestamp
230+
self.set(key=cache.key, value=timestamp, prefix=prefix, cf_name=cache.cf_name)
254231

255232

256233
class TimestampedStorePartition(RocksDBStorePartition):

0 commit comments

Comments
 (0)