Skip to content

Commit bb1821d

Browse files
committed
Ensure additional column families in RocksDBStorePartition base class
1 parent 3208766 commit bb1821d

File tree

3 files changed

+14
-33
lines changed

3 files changed

+14
-33
lines changed

quixstreams/state/rocksdb/partition.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class RocksDBStorePartition(StorePartition):
4242
:param options: RocksDB options. If `None`, the default options will be used.
4343
"""
4444

45+
additional_column_families: tuple[str, ...] = ()
46+
4547
def __init__(
4648
self,
4749
path: str,
@@ -60,6 +62,8 @@ def __init__(
6062
self._db = self._init_rocksdb()
6163
self._cf_cache: Dict[str, Rdict] = {}
6264
self._cf_handle_cache: Dict[str, ColumnFamily] = {}
65+
for cf_name in self.additional_column_families:
66+
self._ensure_column_family(cf_name)
6367

6468
def recover_from_changelog_message(
6569
self, key: bytes, value: Optional[bytes], cf_name: str, offset: int

quixstreams/state/rocksdb/timestamped.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
LATEST_TIMESTAMP_KEY,
1313
LATEST_TIMESTAMPS_CF_NAME,
1414
)
15-
from quixstreams.state.rocksdb.types import RocksDBOptionsType
1615
from quixstreams.state.serialization import (
1716
DumpsFunc,
1817
LoadsFunc,
@@ -234,15 +233,7 @@ class TimestampedStorePartition(RocksDBStorePartition):
234233
"""
235234

236235
partition_transaction_class = TimestampedPartitionTransaction
237-
238-
def __init__(
239-
self,
240-
path: str,
241-
options: Optional[RocksDBOptionsType] = None,
242-
changelog_producer: Optional[ChangelogProducer] = None,
243-
) -> None:
244-
super().__init__(path, options=options, changelog_producer=changelog_producer)
245-
self._ensure_column_family(LATEST_TIMESTAMPS_CF_NAME)
236+
additional_column_families = (LATEST_TIMESTAMPS_CF_NAME,)
246237

247238

248239
class TimestampedStore(RocksDBStore):

quixstreams/state/rocksdb/windowed/partition.py

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
import logging
2-
from typing import Iterator, Optional, cast
3-
4-
from quixstreams.state.recovery import ChangelogProducer
2+
from typing import Iterator, cast
53

64
from ..metadata import LATEST_TIMESTAMPS_CF_NAME
75
from ..partition import RocksDBStorePartition
8-
from ..types import RocksDBOptionsType
96
from .metadata import (
107
GLOBAL_COUNTER_CF_NAME,
118
LATEST_DELETED_VALUE_CF_NAME,
@@ -25,28 +22,17 @@ class WindowedRocksDBStorePartition(RocksDBStorePartition):
2522
2623
Besides the data, it keeps track of the latest observed timestamp and
2724
stores the expiration index to delete expired windows.
28-
29-
:param path: an absolute path to the RocksDB folder
30-
:param options: RocksDB options. If `None`, the default options will be used.
3125
"""
3226

3327
partition_transaction_class = WindowedRocksDBPartitionTransaction
34-
35-
def __init__(
36-
self,
37-
path: str,
38-
options: Optional[RocksDBOptionsType] = None,
39-
changelog_producer: Optional[ChangelogProducer] = None,
40-
):
41-
super().__init__(
42-
path=path, options=options, changelog_producer=changelog_producer
43-
)
44-
self._ensure_column_family(LATEST_DELETED_VALUE_CF_NAME)
45-
self._ensure_column_family(LATEST_EXPIRED_WINDOW_CF_NAME)
46-
self._ensure_column_family(LATEST_DELETED_WINDOW_CF_NAME)
47-
self._ensure_column_family(LATEST_TIMESTAMPS_CF_NAME)
48-
self._ensure_column_family(GLOBAL_COUNTER_CF_NAME)
49-
self._ensure_column_family(VALUES_CF_NAME)
28+
additional_column_families = (
29+
LATEST_DELETED_VALUE_CF_NAME,
30+
LATEST_EXPIRED_WINDOW_CF_NAME,
31+
LATEST_DELETED_WINDOW_CF_NAME,
32+
LATEST_TIMESTAMPS_CF_NAME,
33+
GLOBAL_COUNTER_CF_NAME,
34+
VALUES_CF_NAME,
35+
)
5036

5137
def iter_keys(self, cf_name: str = "default") -> Iterator[bytes]:
5238
"""

0 commit comments

Comments
 (0)