Skip to content

Commit bb2686e

Browse files
authored
state: Extract base store implementation from RocksDBStore (#533)
state: Extract base store implementation from RocksDBStore
1 parent 3bc502a commit bb2686e

34 files changed

+994
-1119
lines changed

quixstreams/dataframe/dataframe.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
from quixstreams.models.serializers import SerializerType, DeserializerType
4646
from quixstreams.processing import ProcessingContext
4747
from quixstreams.sinks import BaseSink
48-
from quixstreams.state.types import State
48+
from quixstreams.state.base import State
4949
from .base import BaseStreaming
5050
from .exceptions import InvalidOperation
5151
from .registry import DataframeRegistry

quixstreams/state/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22
from .manager import *
33
from .recovery import *
44
from .types import *
5+
from .base import *

quixstreams/state/base/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# ruff: noqa: F403
2+
from .store import *
3+
from .partition import *
4+
from .transaction import *
5+
from .state import *

quixstreams/state/base/partition.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
import logging
2+
3+
from abc import ABC, abstractmethod
4+
from typing import Optional, Union, Any, TYPE_CHECKING
5+
6+
from quixstreams.models import ConfluentKafkaMessageProto
7+
from quixstreams.state.exceptions import ColumnFamilyHeaderMissing
8+
from quixstreams.state.metadata import (
9+
CHANGELOG_CF_MESSAGE_HEADER,
10+
CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER,
11+
)
12+
from quixstreams.state.serialization import DumpsFunc, LoadsFunc
13+
from quixstreams.utils.json import loads as json_loads
14+
15+
from .transaction import PartitionTransaction, CACHE_TYPE
16+
17+
if TYPE_CHECKING:
18+
from quixstreams.state.recovery import ChangelogProducer
19+
20+
21+
__all__ = ("StorePartition",)
22+
23+
logger = logging.getLogger(__name__)
24+
25+
26+
class StorePartition(ABC):
27+
"""
28+
A base class to access state in the underlying storage.
29+
It represents a single instance of some storage (e.g. a single database for
30+
the persistent storage).
31+
"""
32+
33+
def __init__(
34+
self,
35+
dumps: DumpsFunc,
36+
loads: LoadsFunc,
37+
changelog_producer: Optional["ChangelogProducer"],
38+
) -> None:
39+
super().__init__()
40+
self._dumps = dumps
41+
self._loads = loads
42+
self._changelog_producer = changelog_producer
43+
44+
@abstractmethod
45+
def close(self): ...
46+
47+
@abstractmethod
48+
def _recover_from_changelog_message(
49+
self,
50+
changelog_message: ConfluentKafkaMessageProto,
51+
cf_name: str,
52+
processed_offset: Optional[int],
53+
committed_offset: int,
54+
): ...
55+
56+
@abstractmethod
57+
def get_processed_offset(self) -> Optional[int]:
58+
"""
59+
Get last processed offset for the given partition
60+
:return: offset or `None` if there's no processed offset yet
61+
"""
62+
...
63+
64+
@abstractmethod
65+
def get_changelog_offset(self) -> Optional[int]:
66+
"""
67+
Get offset that the changelog is up-to-date with.
68+
:return: offset or `None` if there's no processed offset yet
69+
"""
70+
...
71+
72+
@abstractmethod
73+
def write(
74+
self,
75+
data: CACHE_TYPE,
76+
processed_offset: Optional[int],
77+
changelog_offset: Optional[int],
78+
):
79+
"""
80+
Update the state with data
81+
82+
:param data: The modified data
83+
:param processed_offset: The offset processed to generate the data.
84+
:param changelog_offset: The changelog message offset of the data.
85+
"""
86+
87+
@abstractmethod
88+
def get(
89+
self, key: bytes, default: Any = None, cf_name: str = "default"
90+
) -> Union[None, bytes, Any]:
91+
"""
92+
Get a key from the store
93+
94+
:param key: a key encoded to `bytes`
95+
:param default: a default value to return if the key is not found.
96+
:param cf_name: rocksdb column family name. Default - "default"
97+
:return: a value if the key is present in the store. Otherwise, `default`
98+
"""
99+
100+
@abstractmethod
101+
def exists(self, key: bytes, cf_name: str = "default") -> bool:
102+
"""
103+
Check if a key is present in the store.
104+
105+
:param key: a key encoded to `bytes`.
106+
:param cf_name: rocksdb column family name. Default - "default"
107+
:return: `True` if the key is present, `False` otherwise.
108+
"""
109+
110+
def begin(self) -> PartitionTransaction:
111+
"""
112+
Start a new `PartitionTransaction`
113+
114+
Using `PartitionTransaction` is a recommended way for accessing the data.
115+
"""
116+
return PartitionTransaction(
117+
partition=self,
118+
dumps=self._dumps,
119+
loads=self._loads,
120+
changelog_producer=self._changelog_producer,
121+
)
122+
123+
def recover_from_changelog_message(
124+
self, changelog_message: ConfluentKafkaMessageProto, committed_offset: int
125+
):
126+
"""
127+
Updates state from a given changelog message.
128+
129+
:param changelog_message: A raw Confluent message read from a changelog topic.
130+
:param committed_offset: latest committed offset for the partition
131+
"""
132+
headers = dict(changelog_message.headers() or ())
133+
# Parse the column family name from message headers
134+
cf_name = headers.get(CHANGELOG_CF_MESSAGE_HEADER, b"").decode()
135+
if not cf_name:
136+
raise ColumnFamilyHeaderMissing(
137+
f"Header '{CHANGELOG_CF_MESSAGE_HEADER}' missing from changelog message"
138+
)
139+
140+
# Parse the processed topic-partition-offset info from the changelog message
141+
# headers to determine whether the update should be applied or skipped.
142+
# It can be empty if the message was produced by the older version of the lib.
143+
processed_offset = json_loads(
144+
headers.get(CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER, b"null")
145+
)
146+
147+
self._recover_from_changelog_message(
148+
changelog_message,
149+
cf_name,
150+
processed_offset,
151+
committed_offset,
152+
)
153+
154+
def _should_apply_changelog(
155+
self, processed_offset: Optional[int], committed_offset: int
156+
) -> bool:
157+
"""
158+
Determine whether the changelog update should be skipped.
159+
160+
:param processed_offset: changelog message processed offset.
161+
:param committed_offset: latest committed offset of the source topic partition
162+
:return: True if update should be applied, else False.
163+
"""
164+
if processed_offset is not None:
165+
# Skip recovering from the message if its processed offset is ahead of the
166+
# current committed offset.
167+
# This way it will recover to a consistent state if the checkpointing code
168+
# produced the changelog messages but failed to commit
169+
# the source topic offset.
170+
return processed_offset < committed_offset
171+
return True
172+
173+
def __enter__(self):
174+
return self
175+
176+
def __exit__(self, exc_type, exc_val, exc_tb):
177+
self.close()

quixstreams/state/state.py renamed to quixstreams/state/base/state.py

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,58 @@
1-
from typing import Any, Optional
1+
import logging
2+
from abc import ABC, abstractmethod
3+
from typing import Any, Optional, TYPE_CHECKING
24

3-
from .types import State, PartitionTransaction
5+
if TYPE_CHECKING:
6+
from .transaction import PartitionTransaction
7+
8+
__all__ = ("State", "TransactionState")
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class State(ABC):
14+
"""
15+
Primary interface for working with key-value state data from `StreamingDataFrame`
16+
"""
17+
18+
@abstractmethod
19+
def get(self, key: Any, default: Any = None) -> Optional[Any]:
20+
"""
21+
Get the value for key if key is present in the state, else default
22+
23+
:param key: key
24+
:param default: default value to return if the key is not found
25+
:return: value or None if the key is not found and `default` is not provided
26+
"""
27+
...
28+
29+
@abstractmethod
30+
def set(self, key: Any, value: Any):
31+
"""
32+
Set value for the key.
33+
:param key: key
34+
:param value: value
35+
"""
36+
...
37+
38+
@abstractmethod
39+
def delete(self, key: Any):
40+
"""
41+
Delete value for the key.
42+
43+
This function always returns `None`, even if value is not found.
44+
:param key: key
45+
"""
46+
...
47+
48+
@abstractmethod
49+
def exists(self, key: Any) -> bool:
50+
"""
51+
Check if the key exists in state.
52+
:param key: key
53+
:return: True if key exists, False otherwise
54+
"""
55+
...
456

557

658
class TransactionState(State):
@@ -9,7 +61,7 @@ class TransactionState(State):
961
"_prefix",
1062
)
1163

12-
def __init__(self, prefix: bytes, transaction: PartitionTransaction):
64+
def __init__(self, prefix: bytes, transaction: "PartitionTransaction"):
1365
"""
1466
Simple key-value state to be provided into `StreamingDataFrame` functions
1567

0 commit comments

Comments
 (0)