Skip to content

Commit ea4c611

Browse files
committed
Do not pass around default cf name
1 parent a769fdc commit ea4c611

File tree

1 file changed

+9
-19
lines changed

1 file changed

+9
-19
lines changed

quixstreams/state/rocksdb/timestamped.py

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ def get_last(
6565
self,
6666
timestamp: int,
6767
prefix: Any,
68-
cf_name: str = "default",
6968
) -> Optional[Any]:
7069
"""Get the latest value for a prefix up to a given timestamp.
7170
@@ -78,7 +77,6 @@ def get_last(
7877
7978
:param timestamp: The upper bound timestamp (inclusive) in milliseconds.
8079
:param prefix: The key prefix.
81-
:param cf_name: The column family name.
8280
:return: The deserialized value if found, otherwise None.
8381
"""
8482

@@ -91,8 +89,8 @@ def get_last(
9189

9290
value: Optional[bytes] = None
9391

94-
deletes = self._update_cache.get_deletes(cf_name=cf_name)
95-
updates = self._update_cache.get_updates(cf_name=cf_name).get(prefix, {})
92+
deletes = self._update_cache.get_deletes()
93+
updates = self._update_cache.get_updates().get(prefix, {})
9694

9795
cached = sorted(updates.items(), reverse=True)
9896
for cached_key, cached_value in cached:
@@ -104,7 +102,6 @@ def get_last(
104102
lower_bound=lower_bound,
105103
upper_bound=upper_bound,
106104
backwards=True,
107-
cf_name=cf_name,
108105
)
109106
for stored_key, stored_value in stored:
110107
if stored_key in deletes:
@@ -126,7 +123,6 @@ def set_for_timestamp(
126123
value: Any,
127124
prefix: Any,
128125
retention_ms: int = DAYS_7,
129-
cf_name: str = "default",
130126
) -> None:
131127
"""Set a value for the timestamp.
132128
@@ -139,18 +135,17 @@ def set_for_timestamp(
139135
:param timestamp: Timestamp associated with the value in milliseconds.
140136
:param value: The value to store.
141137
:param prefix: The key prefix.
142-
:param cf_name: Column family name.
143138
"""
144139
prefix = self._ensure_bytes(prefix)
145-
super().set(timestamp, value, prefix, cf_name=cf_name)
140+
super().set(timestamp, value, prefix)
146141
min_eligible_timestamp = max(
147142
self._get_min_eligible_timestamp(prefix),
148143
timestamp - retention_ms,
149144
)
150145
self._set_min_eligible_timestamp(prefix, min_eligible_timestamp)
151-
self._expire(prefix=prefix, cf_name=cf_name)
146+
self._expire(prefix=prefix)
152147

153-
def _expire(self, prefix: bytes, cf_name: str = "default") -> None:
148+
def _expire(self, prefix: bytes) -> None:
154149
"""
155150
Delete all entries for a given prefix with timestamps less than the
156151
provided timestamp.
@@ -159,26 +154,21 @@ def _expire(self, prefix: bytes, cf_name: str = "default") -> None:
159154
RocksDB store within the current transaction.
160155
161156
:param prefix: The key prefix.
162-
:param cf_name: Column family name.
163157
"""
164158

165159
min_eligible_timestamp = self._get_min_eligible_timestamp(prefix)
166160

167161
key = self._serialize_key(min_eligible_timestamp, prefix)
168162

169-
cached = self._update_cache.get_updates(cf_name=cf_name).get(prefix, {})
163+
cached = self._update_cache.get_updates().get(prefix, {})
170164
# Cast to list to avoid RuntimeError: dictionary changed size during iteration
171165
for cached_key in list(cached):
172166
if cached_key < key:
173-
self._update_cache.delete(cached_key, prefix, cf_name=cf_name)
167+
self._update_cache.delete(cached_key, prefix)
174168

175-
stored = self._partition.iter_items(
176-
lower_bound=prefix,
177-
upper_bound=key,
178-
cf_name=cf_name,
179-
)
169+
stored = self._partition.iter_items(lower_bound=prefix, upper_bound=key)
180170
for stored_key, _ in stored:
181-
self._update_cache.delete(stored_key, prefix, cf_name=cf_name)
171+
self._update_cache.delete(stored_key, prefix)
182172

183173
def _ensure_bytes(self, prefix: Any) -> bytes:
184174
if isinstance(prefix, bytes):

0 commit comments

Comments
 (0)