Skip to content

Commit c8aecc1

Browse files
gwaramadzedaniil-quixovv
authored
Feature: TimestampedStore (#841)
Co-authored-by: Daniil Gusev <daniil@quix.io> Co-authored-by: Quentin Dawans <quentin@quix.io>
1 parent d700dfb commit c8aecc1

File tree

15 files changed

+694
-101
lines changed

15 files changed

+694
-101
lines changed

quixstreams/sources/base/source.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ def flush(self, timeout: Optional[float] = None) -> None:
528528
:raises CheckpointProducerTimeout: if any message fails to produce before the timeout
529529
"""
530530
if self._store_transaction:
531-
self._store_transaction.prepare(None)
531+
self._store_transaction.prepare()
532532
self._store_transaction.flush()
533533
self._store_transaction = None
534534
self._store_state = None

quixstreams/state/base/partition.py

+2-6
Original file line numberDiff line numberDiff line change
@@ -106,18 +106,14 @@ def recover_from_changelog_message(
106106
:param offset: changelog message offset
107107
"""
108108

109+
@abstractmethod
109110
def begin(self) -> PartitionTransaction:
110111
"""
111112
Start a new `PartitionTransaction`
112113
113114
Using `PartitionTransaction` is a recommended way for accessing the data.
114115
"""
115-
return PartitionTransaction(
116-
partition=self,
117-
dumps=self._dumps,
118-
loads=self._loads,
119-
changelog_producer=self._changelog_producer,
120-
)
116+
...
121117

122118
def __enter__(self):
123119
return self

quixstreams/state/base/transaction.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
"PartitionTransactionStatus",
4545
"PartitionTransaction",
4646
"PartitionTransactionCache",
47+
"validate_transaction_status",
4748
)
4849

4950
logger = logging.getLogger(__name__)
@@ -471,7 +472,7 @@ def exists(self, key: K, prefix: bytes, cf_name: str = "default") -> bool:
471472
return self._partition.exists(key_serialized, cf_name=cf_name)
472473

473474
@validate_transaction_status(PartitionTransactionStatus.STARTED)
474-
def prepare(self, processed_offsets: Optional[dict[str, int]]):
475+
def prepare(self, processed_offsets: Optional[dict[str, int]] = None) -> None:
475476
"""
476477
Produce changelog messages to the changelog topic for all changes accumulated
477478
in this transaction and prepare transaction to flush its state to the state
@@ -581,4 +582,5 @@ def __enter__(self):
581582

582583
def __exit__(self, exc_type, exc_val, exc_tb):
583584
if exc_val is None and not self.failed:
585+
self.prepare()
584586
self.flush()

quixstreams/state/manager.py

+13-4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from .memory import MemoryStore
1616
from .recovery import ChangelogProducerFactory, RecoveryManager
1717
from .rocksdb import RocksDBOptionsType, RocksDBStore
18+
from .rocksdb.timestamped import TimestampedStore
1819
from .rocksdb.windowed.store import WindowedRocksDBStore
1920

2021
__all__ = ("StateStoreManager", "DEFAULT_STATE_STORE_NAME", "StoreTypes")
@@ -24,7 +25,7 @@
2425

2526
DEFAULT_STATE_STORE_NAME = "default"
2627

27-
StoreTypes = Union[Type[RocksDBStore], Type[MemoryStore]]
28+
StoreTypes = Union[Type[RocksDBStore], Type[MemoryStore], Type[TimestampedStore]]
2829
SUPPORTED_STORES = [RocksDBStore, MemoryStore]
2930

3031

@@ -189,23 +190,31 @@ def register_store(
189190

190191
store_type = store_type or self.default_store_type
191192
if store_type == RocksDBStore:
192-
factory: Store = RocksDBStore(
193+
store: Store = RocksDBStore(
194+
name=store_name,
195+
stream_id=stream_id,
196+
base_dir=str(self._state_dir),
197+
changelog_producer_factory=changelog_producer_factory,
198+
options=self._rocksdb_options,
199+
)
200+
elif store_type == TimestampedStore:
201+
store = TimestampedStore(
193202
name=store_name,
194203
stream_id=stream_id,
195204
base_dir=str(self._state_dir),
196205
changelog_producer_factory=changelog_producer_factory,
197206
options=self._rocksdb_options,
198207
)
199208
elif store_type == MemoryStore:
200-
factory = MemoryStore(
209+
store = MemoryStore(
201210
name=store_name,
202211
stream_id=stream_id,
203212
changelog_producer_factory=changelog_producer_factory,
204213
)
205214
else:
206215
raise ValueError(f"invalid store type: {store_type}")
207216

208-
self._stores.setdefault(stream_id, {})[store_name] = factory
217+
self._stores.setdefault(stream_id, {})[store_name] = store
209218

210219
def register_windowed_store(
211220
self,

quixstreams/state/memory/partition.py

+9
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
from typing import Any, Dict, Literal, Optional, Union
44

5+
from quixstreams.state import PartitionTransaction
56
from quixstreams.state.base import PartitionTransactionCache, StorePartition
67
from quixstreams.state.exceptions import ColumnFamilyDoesNotExist
78
from quixstreams.state.metadata import METADATA_CF_NAME, Marker
@@ -62,6 +63,14 @@ def closed(self) -> bool:
6263
def close(self) -> None:
6364
self._closed = True
6465

66+
def begin(self) -> PartitionTransaction:
67+
return PartitionTransaction(
68+
partition=self,
69+
dumps=self._dumps,
70+
loads=self._loads,
71+
changelog_producer=self._changelog_producer,
72+
)
73+
6574
@_validate_partition_state()
6675
def write(
6776
self,

quixstreams/state/rocksdb/cache.py

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from dataclasses import dataclass, field
2+
from typing import Optional
3+
4+
5+
@dataclass
6+
class TimestampsCache:
7+
key: bytes
8+
cf_name: str
9+
timestamps: dict[bytes, Optional[int]] = field(default_factory=dict)
10+
11+
12+
@dataclass
13+
class CounterCache:
14+
key: bytes
15+
cf_name: str
16+
counter: Optional[int] = None

quixstreams/state/rocksdb/partition.py

+82-7
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,26 @@
11
import logging
22
import time
3-
from typing import Dict, List, Literal, Optional, Union, cast
3+
from typing import (
4+
Dict,
5+
Iterator,
6+
List,
7+
Literal,
8+
Optional,
9+
Union,
10+
cast,
11+
)
412

5-
from rocksdict import AccessType, ColumnFamily, Rdict, WriteBatch
13+
from rocksdict import AccessType, ColumnFamily, Rdict, ReadOptions, WriteBatch
614

7-
from quixstreams.state.base import PartitionTransactionCache, StorePartition
15+
from quixstreams.state.base import (
16+
PartitionTransaction,
17+
PartitionTransactionCache,
18+
StorePartition,
19+
)
820
from quixstreams.state.exceptions import ColumnFamilyDoesNotExist
921
from quixstreams.state.metadata import METADATA_CF_NAME, Marker
1022
from quixstreams.state.recovery import ChangelogProducer
11-
from quixstreams.state.serialization import (
12-
int_from_int64_bytes,
13-
int_to_int64_bytes,
14-
)
23+
from quixstreams.state.serialization import int_from_int64_bytes, int_to_int64_bytes
1524

1625
from .exceptions import ColumnFamilyAlreadyExists
1726
from .metadata import (
@@ -22,6 +31,7 @@
2231

2332
__all__ = ("RocksDBStorePartition",)
2433

34+
2535
logger = logging.getLogger(__name__)
2636

2737

@@ -42,6 +52,8 @@ class RocksDBStorePartition(StorePartition):
4252
:param options: RocksDB options. If `None`, the default options will be used.
4353
"""
4454

55+
additional_column_families: tuple[str, ...] = ()
56+
4557
def __init__(
4658
self,
4759
path: str,
@@ -60,6 +72,8 @@ def __init__(
6072
self._db = self._init_rocksdb()
6173
self._cf_cache: Dict[str, Rdict] = {}
6274
self._cf_handle_cache: Dict[str, ColumnFamily] = {}
75+
for cf_name in self.additional_column_families:
76+
self._ensure_column_family(cf_name)
6377

6478
def recover_from_changelog_message(
6579
self, key: bytes, value: Optional[bytes], cf_name: str, offset: int
@@ -139,6 +153,61 @@ def get(
139153
# RDict accept Any type as value but we only write bytes so we should only get bytes back.
140154
return cast(Union[bytes, Literal[Marker.UNDEFINED]], result)
141155

156+
def iter_items(
157+
self,
158+
lower_bound: bytes, # inclusive
159+
upper_bound: bytes, # exclusive
160+
backwards: bool = False,
161+
cf_name: str = "default",
162+
) -> Iterator[tuple[bytes, bytes]]:
163+
"""
164+
Iterate over key-value pairs within a specified range in a column family.
165+
166+
:param lower_bound: The lower bound key (inclusive) for the iteration range.
167+
:param upper_bound: The upper bound key (exclusive) for the iteration range.
168+
:param backwards: If `True`, iterate in reverse order (descending).
169+
Default is `False` (ascending).
170+
:param cf_name: The name of the column family to iterate over.
171+
Default is "default".
172+
:return: An iterator yielding (key, value) tuples.
173+
"""
174+
cf = self.get_column_family(cf_name=cf_name)
175+
176+
# Set iterator bounds to reduce IO by limiting the range of keys fetched
177+
read_opt = ReadOptions()
178+
read_opt.set_iterate_lower_bound(lower_bound)
179+
read_opt.set_iterate_upper_bound(upper_bound)
180+
181+
from_key = upper_bound if backwards else lower_bound
182+
183+
# RDict accepts Any type as value but we only write bytes so we should only get bytes back.
184+
items = cast(
185+
Iterator[tuple[bytes, bytes]],
186+
cf.items(from_key=from_key, read_opt=read_opt, backwards=backwards),
187+
)
188+
189+
if not backwards:
190+
# NOTE: Forward iteration respects bounds correctly.
191+
# Also, we need to use yield from notation to replace RdictItems
192+
# with Python-native generator or else garbage collection
193+
# will make the result unpredictable.
194+
yield from items
195+
else:
196+
# NOTE: When iterating backwards, the `read_opt` lower bound
197+
# is not respected by Rdict for some reason. We need to manually
198+
# filter it here.
199+
for key, value in items:
200+
if lower_bound <= key:
201+
yield key, value
202+
203+
def begin(self) -> PartitionTransaction:
204+
return PartitionTransaction(
205+
partition=self,
206+
dumps=self._dumps,
207+
loads=self._loads,
208+
changelog_producer=self._changelog_producer,
209+
)
210+
142211
def exists(self, key: bytes, cf_name: str = "default") -> bool:
143212
"""
144213
Check if a key is present in the DB.
@@ -328,3 +397,9 @@ def _update_changelog_offset(self, batch: WriteBatch, offset: int):
328397
int_to_int64_bytes(offset),
329398
self.get_column_family_handle(METADATA_CF_NAME),
330399
)
400+
401+
def _ensure_column_family(self, cf_name: str) -> None:
402+
try:
403+
self.get_column_family(cf_name)
404+
except ColumnFamilyDoesNotExist:
405+
self.create_column_family(cf_name)

quixstreams/state/rocksdb/store.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ class RocksDBStore(Store):
2323
partitions' transactions.
2424
"""
2525

26+
store_partition_class = RocksDBStorePartition
27+
2628
def __init__(
2729
self,
2830
name: str,
@@ -61,6 +63,6 @@ def create_new_partition(
6163
self._changelog_producer_factory.get_partition_producer(partition)
6264
)
6365

64-
return RocksDBStorePartition(
66+
return self.store_partition_class(
6567
path=path, options=self._options, changelog_producer=changelog_producer
6668
)

0 commit comments

Comments
 (0)