Skip to content

Commit a769fdc

Browse files
committed
Refactor
1 parent a7c5bce commit a769fdc

File tree

2 files changed

+17
-53
lines changed

2 files changed

+17
-53
lines changed

quixstreams/state/rocksdb/timestamped.py

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ def get_last(
6565
self,
6666
timestamp: int,
6767
prefix: Any,
68-
retention_ms: int = DAYS_7,
6968
cf_name: str = "default",
7069
) -> Optional[Any]:
7170
"""Get the latest value for a prefix up to a given timestamp.
@@ -79,15 +78,12 @@ def get_last(
7978
8079
:param timestamp: The upper bound timestamp (inclusive) in milliseconds.
8180
:param prefix: The key prefix.
82-
:param retention_ms: The retention period in milliseconds.
8381
:param cf_name: The column family name.
8482
:return: The deserialized value if found, otherwise None.
8583
"""
8684

8785
prefix = self._ensure_bytes(prefix)
88-
min_eligible_timestamp = self._get_min_eligible_timestamp(
89-
prefix, timestamp, retention_ms
90-
)
86+
min_eligible_timestamp = self._get_min_eligible_timestamp(prefix)
9187

9288
lower_bound = self._serialize_key(min_eligible_timestamp, prefix)
9389
# +1 because upper bound is exclusive
@@ -147,33 +143,26 @@ def set_for_timestamp(
147143
"""
148144
prefix = self._ensure_bytes(prefix)
149145
super().set(timestamp, value, prefix, cf_name=cf_name)
150-
self._expire(
151-
timestamp=timestamp,
152-
prefix=prefix,
153-
retention_ms=retention_ms,
154-
cf_name=cf_name,
146+
min_eligible_timestamp = max(
147+
self._get_min_eligible_timestamp(prefix),
148+
timestamp - retention_ms,
155149
)
150+
self._set_min_eligible_timestamp(prefix, min_eligible_timestamp)
151+
self._expire(prefix=prefix, cf_name=cf_name)
156152

157-
def _expire(
158-
self, timestamp: int, prefix: bytes, retention_ms: int, cf_name: str = "default"
159-
) -> None:
153+
def _expire(self, prefix: bytes, cf_name: str = "default") -> None:
160154
"""
161155
Delete all entries for a given prefix with timestamps less than the
162156
provided timestamp.
163157
164158
This applies to both the in-memory update cache and the underlying
165159
RocksDB store within the current transaction.
166160
167-
:param timestamp: The upper bound timestamp (exclusive) in milliseconds.
168-
Entries with timestamps strictly less than this will be deleted.
169161
:param prefix: The key prefix.
170162
:param cf_name: Column family name.
171163
"""
172164

173-
min_eligible_timestamp = self._get_min_eligible_timestamp(
174-
prefix, timestamp, retention_ms
175-
)
176-
self._set_min_eligible_timestamp(prefix, min_eligible_timestamp)
165+
min_eligible_timestamp = self._get_min_eligible_timestamp(prefix)
177166

178167
key = self._serialize_key(min_eligible_timestamp, prefix)
179168

@@ -203,16 +192,13 @@ def _serialize_key(self, key: Union[int, bytes], prefix: bytes) -> bytes:
203192
return prefix + SEPARATOR + key
204193
raise ValueError(f"Invalid key type: {type(key)}")
205194

206-
def _get_min_eligible_timestamp(
207-
self, prefix: bytes, timestamp: int, retention_ms: int
208-
) -> Any:
195+
def _get_min_eligible_timestamp(self, prefix: bytes) -> int:
209196
cache = self._min_eligible_timestamps
210-
ts = (
197+
return (
211198
cache.timestamps.get(prefix)
212199
or self.get(key=cache.key, prefix=prefix, cf_name=cache.cf_name)
213200
or 0
214201
)
215-
return max(ts, timestamp - retention_ms)
216202

217203
def _set_min_eligible_timestamp(self, prefix: bytes, timestamp: int) -> None:
218204
cache = self._min_eligible_timestamps

tests/test_quixstreams/test_state/test_rocksdb/test_timestamped.py

Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -101,44 +101,22 @@ def test_get_last_prefix_not_bytes(transaction: TimestampedPartitionTransaction)
101101
assert tx.get_last(timestamp=10, prefix=b'"key"') == "value"
102102

103103

104-
def test_get_last_from_cache_with_retention(
105-
transaction: TimestampedPartitionTransaction,
106-
):
107-
with transaction() as tx:
108-
tx.set_for_timestamp(timestamp=5, value="value", prefix=b"key")
109-
assert tx.get_last(timestamp=10, prefix=b"key") == "value"
110-
assert tx.get_last(timestamp=10, prefix=b"key", retention_ms=5) == "value"
111-
assert tx.get_last(timestamp=10, prefix=b"key", retention_ms=4) == None
112-
113-
114-
def test_get_last_from_store_with_retention(
115-
transaction: TimestampedPartitionTransaction,
116-
):
117-
with transaction() as tx:
118-
tx.set_for_timestamp(timestamp=5, value="value", prefix=b"key")
119-
120-
with transaction() as tx:
121-
assert tx.get_last(timestamp=10, prefix=b"key") == "value"
122-
assert tx.get_last(timestamp=10, prefix=b"key", retention_ms=5) == "value"
123-
assert tx.get_last(timestamp=10, prefix=b"key", retention_ms=4) == None
124-
125-
126104
def test_get_last_for_out_of_order_timestamp(
127105
transaction: TimestampedPartitionTransaction,
128106
):
129107
with transaction() as tx:
130108
tx.set_for_timestamp(
131109
timestamp=10, value="value10", prefix=b"key", retention_ms=5
132110
)
133-
assert tx.get_last(timestamp=10, prefix=b"key", retention_ms=5) == "value10"
111+
assert tx.get_last(timestamp=10, prefix=b"key") == "value10"
134112
tx.set_for_timestamp(timestamp=5, value="value5", prefix=b"key", retention_ms=5)
135113
tx.set_for_timestamp(timestamp=4, value="value4", prefix=b"key", retention_ms=5)
136114

137115
with transaction() as tx:
138-
assert tx.get_last(timestamp=5, prefix=b"key", retention_ms=5) == "value5"
116+
assert tx.get_last(timestamp=5, prefix=b"key") == "value5"
139117

140118
# Retention watermark is 10 - 5 = 5 so everything lower is ignored
141-
assert tx.get_last(timestamp=4, prefix=b"key", retention_ms=5) is None
119+
assert tx.get_last(timestamp=4, prefix=b"key") is None
142120

143121

144122
def test_set_for_timestamp_with_prefix_not_bytes(
@@ -156,8 +134,8 @@ def test_set_for_timestamp_with_retention_cached(
156134
with transaction() as tx:
157135
tx.set_for_timestamp(timestamp=2, value="v2", prefix=b"key", retention_ms=2)
158136
tx.set_for_timestamp(timestamp=5, value="v5", prefix=b"key", retention_ms=2)
159-
assert tx.get_last(timestamp=2, prefix=b"key", retention_ms=3) == None
160-
assert tx.get_last(timestamp=5, prefix=b"key", retention_ms=3) == "v5"
137+
assert tx.get_last(timestamp=2, prefix=b"key") == None
138+
assert tx.get_last(timestamp=5, prefix=b"key") == "v5"
161139

162140

163141
def test_set_for_timestamp_with_retention_stored(
@@ -168,5 +146,5 @@ def test_set_for_timestamp_with_retention_stored(
168146
tx.set_for_timestamp(timestamp=5, value="v5", prefix=b"key", retention_ms=2)
169147

170148
with transaction() as tx:
171-
assert tx.get_last(timestamp=2, prefix=b"key", retention_ms=3) == None
172-
assert tx.get_last(timestamp=5, prefix=b"key", retention_ms=3) == "v5"
149+
assert tx.get_last(timestamp=2, prefix=b"key") == None
150+
assert tx.get_last(timestamp=5, prefix=b"key") == "v5"

0 commit comments

Comments
 (0)