Skip to content

state: Extract base store implementation from RocksDBStore #533

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
6 commits merged into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quixstreams/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from quixstreams.models.serializers import SerializerType, DeserializerType
from quixstreams.processing import ProcessingContext
from quixstreams.sinks import BaseSink
from quixstreams.state.types import State
from quixstreams.state.base import State
from .base import BaseStreaming
from .exceptions import InvalidOperation
from .registry import DataframeRegistry
Expand Down
1 change: 1 addition & 0 deletions quixstreams/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from .manager import *
from .recovery import *
from .types import *
from .base import *
5 changes: 5 additions & 0 deletions quixstreams/state/base/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# ruff: noqa: F403
from .store import *
from .partition import *
from .transaction import *
from .state import *
177 changes: 177 additions & 0 deletions quixstreams/state/base/partition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import logging

from abc import ABC, abstractmethod
from typing import Optional, Union, Any, TYPE_CHECKING

from quixstreams.models import ConfluentKafkaMessageProto
from quixstreams.state.exceptions import ColumnFamilyHeaderMissing
from quixstreams.state.metadata import (
CHANGELOG_CF_MESSAGE_HEADER,
CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER,
)
from quixstreams.state.serialization import DumpsFunc, LoadsFunc
from quixstreams.utils.json import loads as json_loads

from .transaction import PartitionTransaction, CACHE_TYPE

if TYPE_CHECKING:
from quixstreams.state.recovery import ChangelogProducer


__all__ = ("StorePartition",)

logger = logging.getLogger(__name__)


class StorePartition(ABC):
"""
A base class to access state in the underlying storage.
It represents a single instance of some storage (e.g. a single database for
the persistent storage).
"""

def __init__(
self,
dumps: DumpsFunc,
loads: LoadsFunc,
changelog_producer: Optional["ChangelogProducer"],
) -> None:
super().__init__()
self._dumps = dumps
self._loads = loads
self._changelog_producer = changelog_producer

@abstractmethod
def close(self): ...

@abstractmethod
def _recover_from_changelog_message(
self,
changelog_message: ConfluentKafkaMessageProto,
cf_name: str,
processed_offset: Optional[int],
committed_offset: int,
): ...

@abstractmethod
def get_processed_offset(self) -> Optional[int]:
"""
Get last processed offset for the given partition
:return: offset or `None` if there's no processed offset yet
"""
...

@abstractmethod
def get_changelog_offset(self) -> Optional[int]:
"""
Get offset that the changelog is up-to-date with.
:return: offset or `None` if there's no processed offset yet
"""
...

@abstractmethod
def write(
self,
data: CACHE_TYPE,
processed_offset: Optional[int],
changelog_offset: Optional[int],
):
"""
Update the state with data

:param data: The modified data
:param processed_offset: The offset processed to generate the data.
:param changelog_offset: The changelog message offset of the data.
"""

@abstractmethod
def get(
self, key: bytes, default: Any = None, cf_name: str = "default"
) -> Union[None, bytes, Any]:
"""
Get a key from the store

:param key: a key encoded to `bytes`
:param default: a default value to return if the key is not found.
:param cf_name: rocksdb column family name. Default - "default"
:return: a value if the key is present in the store. Otherwise, `default`
"""

@abstractmethod
def exists(self, key: bytes, cf_name: str = "default") -> bool:
"""
Check if a key is present in the store.

:param key: a key encoded to `bytes`.
:param cf_name: rocksdb column family name. Default - "default"
:return: `True` if the key is present, `False` otherwise.
"""

def begin(self) -> PartitionTransaction:
"""
Start a new `PartitionTransaction`

Using `PartitionTransaction` is a recommended way for accessing the data.
"""
return PartitionTransaction(
partition=self,
dumps=self._dumps,
loads=self._loads,
changelog_producer=self._changelog_producer,
)

def recover_from_changelog_message(
self, changelog_message: ConfluentKafkaMessageProto, committed_offset: int
):
"""
Updates state from a given changelog message.

:param changelog_message: A raw Confluent message read from a changelog topic.
:param committed_offset: latest committed offset for the partition
"""
headers = dict(changelog_message.headers() or ())
# Parse the column family name from message headers
cf_name = headers.get(CHANGELOG_CF_MESSAGE_HEADER, b"").decode()
if not cf_name:
raise ColumnFamilyHeaderMissing(
f"Header '{CHANGELOG_CF_MESSAGE_HEADER}' missing from changelog message"
)

# Parse the processed topic-partition-offset info from the changelog message
# headers to determine whether the update should be applied or skipped.
# It can be empty if the message was produced by the older version of the lib.
processed_offset = json_loads(
headers.get(CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER, b"null")
)

self._recover_from_changelog_message(
changelog_message,
cf_name,
processed_offset,
committed_offset,
)

def _should_apply_changelog(
self, processed_offset: Optional[int], committed_offset: int
) -> bool:
"""
Determine whether the changelog update should be skipped.

:param processed_offset: changelog message processed offset.
:param committed_offset: latest committed offset of the source topic partition
:return: True if update should be applied, else False.
"""
if processed_offset is not None:
# Skip recovering from the message if its processed offset is ahead of the
# current committed offset.
# This way it will recover to a consistent state if the checkpointing code
# produced the changelog messages but failed to commit
# the source topic offset.
return processed_offset < committed_offset
return True

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
58 changes: 55 additions & 3 deletions quixstreams/state/state.py → quixstreams/state/base/state.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,58 @@
from typing import Any, Optional
import logging
from abc import ABC, abstractmethod
from typing import Any, Optional, TYPE_CHECKING

from .types import State, PartitionTransaction
if TYPE_CHECKING:
from .transaction import PartitionTransaction

__all__ = ("State", "TransactionState")

logger = logging.getLogger(__name__)


class State(ABC):
"""
Primary interface for working with key-value state data from `StreamingDataFrame`
"""

@abstractmethod
def get(self, key: Any, default: Any = None) -> Optional[Any]:
"""
Get the value for key if key is present in the state, else default

:param key: key
:param default: default value to return if the key is not found
:return: value or None if the key is not found and `default` is not provided
"""
...

@abstractmethod
def set(self, key: Any, value: Any):
"""
Set value for the key.
:param key: key
:param value: value
"""
...

@abstractmethod
def delete(self, key: Any):
"""
Delete value for the key.

This function always returns `None`, even if value is not found.
:param key: key
"""
...

@abstractmethod
def exists(self, key: Any) -> bool:
"""
Check if the key exists in state.
:param key: key
:return: True if key exists, False otherwise
"""
...


class TransactionState(State):
Expand All @@ -9,7 +61,7 @@ class TransactionState(State):
"_prefix",
)

def __init__(self, prefix: bytes, transaction: PartitionTransaction):
def __init__(self, prefix: bytes, transaction: "PartitionTransaction"):
"""
Simple key-value state to be provided into `StreamingDataFrame` functions

Expand Down
Loading
Loading