Skip to content

Commit a7c5bce

Browse files
committed
Set min eligible timestamp instead of the latest timestamp
1 parent bb1821d commit a7c5bce

File tree

5 files changed

+29
-35
lines changed

5 files changed

+29
-35
lines changed

quixstreams/state/rocksdb/metadata.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,2 @@
11
PROCESSED_OFFSET_KEY = b"__topic_offset__"
22
CHANGELOG_OFFSET_KEY = b"__changelog_offset__"
3-
4-
LATEST_TIMESTAMPS_CF_NAME = "__latest-timestamps__"
5-
LATEST_TIMESTAMP_KEY = b"__latest_timestamp__"

quixstreams/state/rocksdb/timestamped.py

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@
88
from quixstreams.state.metadata import SEPARATOR
99
from quixstreams.state.recovery import ChangelogProducer
1010
from quixstreams.state.rocksdb.cache import TimestampsCache
11-
from quixstreams.state.rocksdb.metadata import (
12-
LATEST_TIMESTAMP_KEY,
13-
LATEST_TIMESTAMPS_CF_NAME,
14-
)
1511
from quixstreams.state.serialization import (
1612
DumpsFunc,
1713
LoadsFunc,
@@ -30,6 +26,9 @@
3026

3127
DAYS_7 = 7 * 24 * 60 * 60 * 1000
3228

29+
MIN_ELIGIBLE_TIMESTAMPS_CF_NAME = "__min-eligible-timestamps__"
30+
MIN_ELIGIBLE_TIMESTAMPS_KEY = b"__min_eligible_timestamps__"
31+
3332

3433
class TimestampedPartitionTransaction(PartitionTransaction):
3534
"""
@@ -56,9 +55,9 @@ def __init__(
5655
self._partition: TimestampedStorePartition = cast(
5756
"TimestampedStorePartition", self._partition
5857
)
59-
self._latest_timestamps: TimestampsCache = TimestampsCache(
60-
key=LATEST_TIMESTAMP_KEY,
61-
cf_name=LATEST_TIMESTAMPS_CF_NAME,
58+
self._min_eligible_timestamps: TimestampsCache = TimestampsCache(
59+
key=MIN_ELIGIBLE_TIMESTAMPS_KEY,
60+
cf_name=MIN_ELIGIBLE_TIMESTAMPS_CF_NAME,
6261
)
6362

6463
@validate_transaction_status(PartitionTransactionStatus.STARTED)
@@ -86,12 +85,11 @@ def get_last(
8685
"""
8786

8887
prefix = self._ensure_bytes(prefix)
89-
latest_timestamp = self._get_latest_timestamp(prefix, timestamp)
90-
91-
# Negative retention is not allowed
92-
lower_bound = self._serialize_key(
93-
max(latest_timestamp - retention_ms, 0), prefix
88+
min_eligible_timestamp = self._get_min_eligible_timestamp(
89+
prefix, timestamp, retention_ms
9490
)
91+
92+
lower_bound = self._serialize_key(min_eligible_timestamp, prefix)
9593
# +1 because upper bound is exclusive
9694
upper_bound = self._serialize_key(timestamp + 1, prefix)
9795

@@ -172,10 +170,12 @@ def _expire(
172170
:param cf_name: Column family name.
173171
"""
174172

175-
latest_timestamp = self._get_latest_timestamp(prefix, timestamp)
176-
self._set_timestamp(prefix, latest_timestamp)
173+
min_eligible_timestamp = self._get_min_eligible_timestamp(
174+
prefix, timestamp, retention_ms
175+
)
176+
self._set_min_eligible_timestamp(prefix, min_eligible_timestamp)
177177

178-
key = self._serialize_key(max(timestamp - retention_ms, 0), prefix)
178+
key = self._serialize_key(min_eligible_timestamp, prefix)
179179

180180
cached = self._update_cache.get_updates(cf_name=cf_name).get(prefix, {})
181181
# Cast to list to avoid RuntimeError: dictionary changed size during iteration
@@ -203,23 +203,19 @@ def _serialize_key(self, key: Union[int, bytes], prefix: bytes) -> bytes:
203203
return prefix + SEPARATOR + key
204204
raise ValueError(f"Invalid key type: {type(key)}")
205205

206-
def _get_latest_timestamp(self, prefix: bytes, timestamp: int) -> Any:
207-
"""
208-
Get the latest timestamp for a given prefix.
209-
210-
If the timestamp is not found in the cache, it is fetched from the store.
211-
"""
212-
cache = self._latest_timestamps
206+
def _get_min_eligible_timestamp(
207+
self, prefix: bytes, timestamp: int, retention_ms: int
208+
) -> Any:
209+
cache = self._min_eligible_timestamps
213210
ts = (
214211
cache.timestamps.get(prefix)
215212
or self.get(key=cache.key, prefix=prefix, cf_name=cache.cf_name)
216213
or 0
217214
)
218-
cache.timestamps[prefix] = latest_timestamp = max(ts, timestamp)
219-
return latest_timestamp
215+
return max(ts, timestamp - retention_ms)
220216

221-
def _set_timestamp(self, prefix: bytes, timestamp: int) -> None:
222-
cache = self._latest_timestamps
217+
def _set_min_eligible_timestamp(self, prefix: bytes, timestamp: int) -> None:
218+
cache = self._min_eligible_timestamps
223219
cache.timestamps[prefix] = timestamp
224220
self.set(key=cache.key, value=timestamp, prefix=prefix, cf_name=cache.cf_name)
225221

@@ -233,7 +229,7 @@ class TimestampedStorePartition(RocksDBStorePartition):
233229
"""
234230

235231
partition_transaction_class = TimestampedPartitionTransaction
236-
additional_column_families = (LATEST_TIMESTAMPS_CF_NAME,)
232+
additional_column_families = (MIN_ELIGIBLE_TIMESTAMPS_CF_NAME,)
237233

238234

239235
class TimestampedStore(RocksDBStore):

quixstreams/state/rocksdb/windowed/metadata.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
LATEST_DELETED_VALUE_CF_NAME = "__value-deletion-index__"
88
LATEST_DELETED_VALUE_TIMESTAMP_KEY = b"__value_deleted_start_gt__"
99

10+
LATEST_TIMESTAMPS_CF_NAME = "__latest-timestamps__"
11+
LATEST_TIMESTAMP_KEY = b"__latest_timestamp__"
12+
1013
GLOBAL_COUNTER_CF_NAME = "__global-counter__"
1114
GLOBAL_COUNTER_KEY = b"__global_counter__"
1215

quixstreams/state/rocksdb/windowed/partition.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import logging
22
from typing import Iterator, cast
33

4-
from ..metadata import LATEST_TIMESTAMPS_CF_NAME
54
from ..partition import RocksDBStorePartition
65
from .metadata import (
76
GLOBAL_COUNTER_CF_NAME,
87
LATEST_DELETED_VALUE_CF_NAME,
98
LATEST_DELETED_WINDOW_CF_NAME,
109
LATEST_EXPIRED_WINDOW_CF_NAME,
10+
LATEST_TIMESTAMPS_CF_NAME,
1111
VALUES_CF_NAME,
1212
)
1313
from .transaction import WindowedRocksDBPartitionTransaction

quixstreams/state/rocksdb/windowed/transaction.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,6 @@
99
from quixstreams.state.metadata import DEFAULT_PREFIX, SEPARATOR
1010
from quixstreams.state.recovery import ChangelogProducer
1111
from quixstreams.state.rocksdb.cache import CounterCache, TimestampsCache
12-
from quixstreams.state.rocksdb.metadata import (
13-
LATEST_TIMESTAMP_KEY,
14-
LATEST_TIMESTAMPS_CF_NAME,
15-
)
1612
from quixstreams.state.serialization import (
1713
DumpsFunc,
1814
LoadsFunc,
@@ -29,6 +25,8 @@
2925
LATEST_DELETED_WINDOW_TIMESTAMP_KEY,
3026
LATEST_EXPIRED_WINDOW_CF_NAME,
3127
LATEST_EXPIRED_WINDOW_TIMESTAMP_KEY,
28+
LATEST_TIMESTAMP_KEY,
29+
LATEST_TIMESTAMPS_CF_NAME,
3230
VALUES_CF_NAME,
3331
)
3432
from .serialization import (

0 commit comments

Comments
 (0)