From f80e67dd7333191dcb370563e352314ab94fe780 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Tue, 1 Oct 2024 13:25:07 +0200 Subject: [PATCH 1/6] state: Extract base store implementation from RocksDBStore --- quixstreams/dataframe/dataframe.py | 2 +- quixstreams/state/__init__.py | 1 + quixstreams/state/base/__init__.py | 5 + quixstreams/state/base/partition.py | 130 ++++++ quixstreams/state/{ => base}/state.py | 58 ++- quixstreams/state/base/store.py | 135 ++++++ quixstreams/state/base/transaction.py | 386 ++++++++++++++++ quixstreams/state/exceptions.py | 15 + quixstreams/state/manager.py | 2 +- quixstreams/state/metadata.py | 12 + quixstreams/state/recovery.py | 2 +- quixstreams/state/rocksdb/exceptions.py | 21 +- quixstreams/state/rocksdb/metadata.py | 4 - quixstreams/state/rocksdb/options.py | 2 +- quixstreams/state/rocksdb/partition.py | 139 +++--- quixstreams/state/rocksdb/store.py | 133 +----- quixstreams/state/rocksdb/transaction.py | 427 +----------------- quixstreams/state/rocksdb/types.py | 2 +- .../state/rocksdb/windowed/partition.py | 31 +- .../state/rocksdb/windowed/serialization.py | 7 +- quixstreams/state/rocksdb/windowed/store.py | 29 +- .../state/rocksdb/windowed/transaction.py | 32 +- .../state/{rocksdb => }/serialization.py | 6 +- quixstreams/state/types.py | 315 +------------ tests/test_quixstreams/test_state/fixtures.py | 2 +- .../test_recovery/test_recovery_manager.py | 2 +- .../test_state/test_rocksdb/test_partition.py | 4 +- .../test_rocksdb/test_transaction.py | 32 +- .../test_windowed/test_partition.py | 2 +- .../test_windowed/test_serialization.py | 2 +- .../test_windowed/test_transaction.py | 2 +- 31 files changed, 923 insertions(+), 1019 deletions(-) create mode 100644 quixstreams/state/base/__init__.py create mode 100644 quixstreams/state/base/partition.py rename quixstreams/state/{ => base}/state.py (51%) create mode 100644 quixstreams/state/base/store.py create mode 100644 quixstreams/state/base/transaction.py create mode 100644 quixstreams/state/metadata.py rename quixstreams/state/{rocksdb => }/serialization.py (89%) diff --git a/quixstreams/dataframe/dataframe.py b/quixstreams/dataframe/dataframe.py index e1a2faf7d..750db8c93 100644 --- a/quixstreams/dataframe/dataframe.py +++ b/quixstreams/dataframe/dataframe.py @@ -45,7 +45,7 @@ from quixstreams.models.serializers import SerializerType, DeserializerType from quixstreams.processing import ProcessingContext from quixstreams.sinks import BaseSink -from quixstreams.state.types import State +from quixstreams.state.base import State from .base import BaseStreaming from .exceptions import InvalidOperation from .registry import DataframeRegistry diff --git a/quixstreams/state/__init__.py b/quixstreams/state/__init__.py index 6618dcb01..de715e215 100644 --- a/quixstreams/state/__init__.py +++ b/quixstreams/state/__init__.py @@ -2,3 +2,4 @@ from .manager import * from .recovery import * from .types import * +from .base import * diff --git a/quixstreams/state/base/__init__.py b/quixstreams/state/base/__init__.py new file mode 100644 index 000000000..d40a3163d --- /dev/null +++ b/quixstreams/state/base/__init__.py @@ -0,0 +1,5 @@ +# ruff: noqa: F403 +from .store import * +from .partition import * +from .transaction import * +from .state import * diff --git a/quixstreams/state/base/partition.py b/quixstreams/state/base/partition.py new file mode 100644 index 000000000..ce651e6c1 --- /dev/null +++ b/quixstreams/state/base/partition.py @@ -0,0 +1,130 @@ +import logging + +from abc import ABC, abstractmethod +from typing import Optional + +from quixstreams.models import ConfluentKafkaMessageProto +from quixstreams.state.exceptions import ColumnFamilyHeaderMissing +from quixstreams.state.metadata import ( + CHANGELOG_CF_MESSAGE_HEADER, + CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER, +) +from quixstreams.utils.json import loads as json_loads + +from .transaction import PartitionTransaction, CACHE_TYPE + +__all__ = ("StorePartition",) + +logger = logging.getLogger(__name__) + + +class StorePartition(ABC): + """ + A base class to access state in the underlying storage. + It represents a single instance of some storage (e.g. a single database for + the persistent storage). + """ + + def __init__(self) -> None: + super().__init__() + + @abstractmethod + def begin(self) -> PartitionTransaction: + """ + Start new `PartitionTransaction` + """ + + @abstractmethod + def close(self): ... + + @abstractmethod + def _recover_from_changelog_message( + self, changelog_message: ConfluentKafkaMessageProto, committed_offset: int + ): ... + + @abstractmethod + def get_processed_offset(self) -> Optional[int]: + """ + Get last processed offset for the given partition + :return: offset or `None` if there's no processed offset yet + """ + ... + + @abstractmethod + def get_changelog_offset(self) -> Optional[int]: + """ + Get offset that the changelog is up-to-date with. + :return: offset or `None` if there's no processed offset yet + """ + ... + + @abstractmethod + def write( + self, + data: CACHE_TYPE, + processed_offset: Optional[int], + changelog_offset: Optional[int], + ): + """ + Update the state with data + + :param data: The modified data + :param processed_offset: The offset processed to generate the data. + :param changelog_offset: The changelog message offset of the data. + """ + + def recover_from_changelog_message( + self, changelog_message: ConfluentKafkaMessageProto, committed_offset: int + ): + """ + Updates state from a given changelog message. + + :param changelog_message: A raw Confluent message read from a changelog topic. + :param committed_offset: latest committed offset for the partition + """ + headers = dict(changelog_message.headers() or ()) + # Parse the column family name from message headers + cf_name = headers.get(CHANGELOG_CF_MESSAGE_HEADER, b"").decode() + if not cf_name: + raise ColumnFamilyHeaderMissing( + f"Header '{CHANGELOG_CF_MESSAGE_HEADER}' missing from changelog message" + ) + + # Parse the processed topic-partition-offset info from the changelog message + # headers to determine whether the update should be applied or skipped. + # It can be empty if the message was produced by the older version of the lib. + processed_offset = json_loads( + headers.get(CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER, b"null") + ) + + self._recover_from_changelog_message( + changelog_message, + cf_name, + processed_offset, + committed_offset, + ) + + def _should_apply_changelog( + self, processed_offset: Optional[int], committed_offset: int + ) -> bool: + """ + Determine whether the changelog update should be skipped. + + :param headers: changelog message headers + :param committed_offset: latest committed offset of the source topic partition + :return: True if update should be applied, else False. + """ + if processed_offset is not None: + # Skip recovering from the message if its processed offset is ahead of the + # current committed offset. + # This way it will recover to a consistent state if the checkpointing code + # produced the changelog messages but failed to commit + # the source topic offset. + return processed_offset < committed_offset + return True + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() diff --git a/quixstreams/state/state.py b/quixstreams/state/base/state.py similarity index 51% rename from quixstreams/state/state.py rename to quixstreams/state/base/state.py index 3e06d9882..5167b8b9c 100644 --- a/quixstreams/state/state.py +++ b/quixstreams/state/base/state.py @@ -1,6 +1,58 @@ -from typing import Any, Optional +import logging +from abc import ABC, abstractmethod +from typing import Any, Optional, TYPE_CHECKING -from .types import State, PartitionTransaction +if TYPE_CHECKING: + from quixstreams.state.base import PartitionTransaction + +__all__ = ("State", "TransactionState") + +logger = logging.getLogger(__name__) + + +class State(ABC): + """ + Primary interface for working with key-value state data from `StreamingDataFrame` + """ + + @abstractmethod + def get(self, key: Any, default: Any = None) -> Optional[Any]: + """ + Get the value for key if key is present in the state, else default + + :param key: key + :param default: default value to return if the key is not found + :return: value or None if the key is not found and `default` is not provided + """ + ... + + @abstractmethod + def set(self, key: Any, value: Any): + """ + Set value for the key. + :param key: key + :param value: value + """ + ... + + @abstractmethod + def delete(self, key: Any): + """ + Delete value for the key. + + This function always returns `None`, even if value is not found. + :param key: key + """ + ... + + @abstractmethod + def exists(self, key: Any) -> bool: + """ + Check if the key exists in state. + :param key: key + :return: True if key exists, False otherwise + """ + ... class TransactionState(State): @@ -9,7 +61,7 @@ class TransactionState(State): "_prefix", ) - def __init__(self, prefix: bytes, transaction: PartitionTransaction): + def __init__(self, prefix: bytes, transaction: "PartitionTransaction"): """ Simple key-value state to be provided into `StreamingDataFrame` functions diff --git a/quixstreams/state/base/store.py b/quixstreams/state/base/store.py new file mode 100644 index 000000000..6e86e1eab --- /dev/null +++ b/quixstreams/state/base/store.py @@ -0,0 +1,135 @@ +import logging + +from typing import Dict, ClassVar +from abc import ABC, abstractmethod + +from quixstreams.state.exceptions import PartitionNotAssignedError + +from .partition import StorePartition +from .transaction import PartitionTransaction + +__all__ = ("Store",) + +logger = logging.getLogger(__name__) + + +class Store(ABC): + """ + Abstract state store. + + It keeps track of individual store partitions and provides access to the + partitions' transactions. + """ + + options_type: ClassVar[object] + + def __init__(self, name: str, topic: str) -> None: + super().__init__() + + self._name = name + self._topic = topic + self._partitions: Dict[int, StorePartition] = {} + + @abstractmethod + def create_new_partition(self, partition: int) -> StorePartition: + pass + + @property + def topic(self) -> str: + """ + Topic name + """ + return self._topic + + @property + def name(self) -> str: + """ + Store name + """ + return self._name + + @property + def partitions(self) -> Dict[int, StorePartition]: + """ + Mapping of assigned store partitions + :return: dict of "{partition: }" + """ + return self._partitions + + def assign_partition(self, partition: int) -> StorePartition: + """ + Assign new store partition + + :param partition: partition number + :return: instance of `StorePartition` + """ + store_partition = self._partitions.get(partition) + if store_partition is not None: + logger.debug( + f'Partition "{partition}" for store "{self._name}" ' + f'(topic "{self._topic}") ' + f"is already assigned" + ) + return store_partition + + store_partition = self.create_new_partition(partition) + + self._partitions[partition] = store_partition + logger.debug( + 'Assigned store partition "%s[%s]" (topic "%s")', + self._name, + partition, + self._topic, + ) + return store_partition + + def revoke_partition(self, partition: int): + """ + Revoke assigned store partition + + :param partition: partition number + """ + store_partition = self._partitions.pop(partition, None) + if store_partition is None: + return + + store_partition.close() + logger.debug( + 'Revoked store partition "%s[%s]" topic("%s")', + self._name, + partition, + self._topic, + ) + + def start_partition_transaction(self, partition: int) -> PartitionTransaction: + """ + Start a new partition transaction. + + `PartitionTransaction` is the primary interface for working with data in Stores. + :param partition: partition number + :return: instance of `PartitionTransaction` + """ + store_partition = self._partitions.get(partition) + if store_partition is None: + # Requested partition has not been assigned. Something went completely wrong + raise PartitionNotAssignedError( + f'Store partition "{self._name}[{partition}]" ' + f'(topic "{self._topic}") is not assigned' + ) + + return store_partition.begin() + + def close(self): + """ + Close store and revoke all store partitions + """ + logger.debug(f'Closing store "{self.name}" (topic "{self.topic}")') + for partition in list(self._partitions.keys()): + self.revoke_partition(partition) + logger.debug(f'Closed store "{self.name}" (topic "{self.topic}")') + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() diff --git a/quixstreams/state/base/transaction.py b/quixstreams/state/base/transaction.py new file mode 100644 index 000000000..fc3f73b3d --- /dev/null +++ b/quixstreams/state/base/transaction.py @@ -0,0 +1,386 @@ +import enum +import logging +import functools +from typing import ( + Any, + Optional, + Dict, + Tuple, + Union, + TYPE_CHECKING, +) + +from abc import ABC, abstractmethod + +from quixstreams.state.exceptions import ( + StateTransactionError, + InvalidChangelogOffset, +) +from quixstreams.state.metadata import ( + CHANGELOG_CF_MESSAGE_HEADER, + CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER, + DELETED, + PREFIX_SEPARATOR, + UNDEFINED, + Undefined, + DEFAULT_PREFIX, +) +from quixstreams.state.serialization import ( + serialize, + deserialize, + LoadsFunc, + DumpsFunc, +) +from quixstreams.utils.json import dumps as json_dumps + +from .state import State, TransactionState + +if TYPE_CHECKING: + from .partition import StorePartition + from ..recovery import ChangelogProducer + +__all__ = ("PartitionTransactionStatus", "PartitionTransaction", "CACHE_TYPE") + +logger = logging.getLogger(__name__) +CACHE_TYPE = Dict[str, Dict[bytes, Dict[bytes, Union[bytes, Undefined]]]] + + +class PartitionTransactionStatus(enum.Enum): + STARTED = 1 # Transaction is started and accepts updates + + PREPARED = 2 # Transaction is prepared, it can no longer receive updates + # and can only be flushed + + COMPLETE = 3 # Transaction is fully completed, it cannot be used anymore + + FAILED = 4 # Transaction is failed, it cannot be used anymore + + +def validate_transaction_status(*allowed: PartitionTransactionStatus): + """ + Check that the status of `RocksDBTransaction` is valid before calling a method + """ + + def wrapper(func): + @functools.wraps(func) + def _wrapper(tx: "PartitionTransaction", *args, **kwargs): + if tx.status not in allowed: + raise StateTransactionError( + f"Invalid transaction status {tx.status}, " f"allowed: {allowed}" + ) + + return func(tx, *args, **kwargs) + + return _wrapper + + return wrapper + + +class PartitionTransaction(ABC): + """ + A transaction class to perform simple key-value operations like + "get", "set", "delete" and "exists" on a single storage partition. + """ + + def __init__( + self, + partition: "StorePartition", + dumps: DumpsFunc, + loads: LoadsFunc, + changelog_producer: Optional["ChangelogProducer"] = None, + ) -> None: + super().__init__() + self._changelog_producer = changelog_producer + self._status = PartitionTransactionStatus.STARTED + + self._dumps = dumps + self._loads = loads + self._partition = partition + + self._update_cache: CACHE_TYPE = {} + + @abstractmethod + def _get( + self, key_serialized: bytes, default: Any = None, cf_name: str = "default" + ) -> Optional[Any]: ... + + @abstractmethod + def _exists(self, key_serialized: bytes, cf_name: str) -> bool: ... + + @property + def changelog_producer(self) -> Optional["ChangelogProducer"]: + return self._changelog_producer + + @property + def status(self) -> PartitionTransactionStatus: + return self._status + + @property + def failed(self) -> bool: + """ + Return `True` if transaction failed to update data at some point. + + Failed transactions cannot be re-used. + :return: bool + """ + return self._status == PartitionTransactionStatus.FAILED + + @property + def completed(self) -> bool: + """ + Return `True` if transaction is successfully completed. + + Completed transactions cannot be re-used. + :return: bool + """ + return self._status == PartitionTransactionStatus.COMPLETE + + @property + def prepared(self) -> bool: + """ + Return `True` if transaction is prepared completed. + + Prepared transactions cannot receive new updates, but can be flushed. + :return: bool + """ + return self._status == PartitionTransactionStatus.PREPARED + + @property + def changelog_topic_partition(self) -> Optional[Tuple[str, int]]: + """ + Return the changelog topic-partition for the StorePartition of this transaction. + + Returns `None` if changelog_producer is not provided. + + :return: (topic, partition) or None + """ + if self.changelog_producer is not None: + return ( + self.changelog_producer.changelog_name, + self.changelog_producer.partition, + ) + + def _serialize_value(self, value: Any) -> bytes: + return serialize(value, dumps=self._dumps) + + def _deserialize_value(self, value: bytes) -> Any: + return deserialize(value, loads=self._loads) + + def _serialize_key(self, key: Any, prefix: bytes) -> bytes: + key_bytes = serialize(key, dumps=self._dumps) + prefix = prefix + PREFIX_SEPARATOR if prefix else b"" + return prefix + key_bytes + + def as_state(self, prefix: Any = DEFAULT_PREFIX) -> State: + """ + Create an instance implementing the `State` protocol to be provided + to `StreamingDataFrame` functions. + All operations called on this State object will be prefixed with + the supplied `prefix`. + + :return: an instance implementing the `State` protocol + """ + return TransactionState( + transaction=self, + prefix=( + prefix + if isinstance(prefix, bytes) + else serialize(prefix, dumps=self._dumps) + ), + ) + + @validate_transaction_status(PartitionTransactionStatus.STARTED) + def get( + self, + key: Any, + prefix: bytes, + default: Any = None, + cf_name: str = "default", + ) -> Optional[Any]: + """ + Get the value for key if key is present in the state, else default + + :param key: key + :param prefix: a key prefix + :param default: default value to return if the key is not found + :return: value or None if the key is not found and `default` is not provided + """ + key_serialized = self._serialize_key(key, prefix=prefix) + + cached = ( + self._update_cache.get(cf_name, {}) + .get(prefix, {}) + .get(key_serialized, UNDEFINED) + ) + if cached is DELETED: + return default + + if cached is not UNDEFINED: + return self._deserialize_value(cached) + + return self._get(key_serialized, default, cf_name) + + @validate_transaction_status(PartitionTransactionStatus.STARTED) + def set(self, key: Any, value: Any, prefix: bytes, cf_name: str = "default"): + """ + Set value for the key. + :param key: key + :param prefix: a key prefix + :param value: value + """ + + try: + key_serialized = self._serialize_key(key, prefix=prefix) + value_serialized = self._serialize_value(value) + self._update_cache.setdefault(cf_name, {}).setdefault(prefix, {})[ + key_serialized + ] = value_serialized + except Exception: + self._status = PartitionTransactionStatus.FAILED + raise + + @validate_transaction_status(PartitionTransactionStatus.STARTED) + def delete(self, key: Any, prefix: bytes, cf_name: str = "default"): + """ + Delete value for the key. + + This function always returns `None`, even if value is not found. + :param key: key + :param prefix: a key prefix + """ + try: + key_serialized = self._serialize_key(key, prefix=prefix) + self._update_cache.setdefault(cf_name, {}).setdefault(prefix, {})[ + key_serialized + ] = DELETED + except Exception: + self._status = PartitionTransactionStatus.FAILED + raise + + @validate_transaction_status(PartitionTransactionStatus.STARTED) + def exists(self, key: Any, prefix: bytes, cf_name: str = "default") -> bool: + """ + Check if the key exists in state. + :param key: key + :param prefix: a key prefix + :return: True if key exists, False otherwise + """ + key_serialized = self._serialize_key(key, prefix=prefix) + cached = ( + self._update_cache.get(cf_name, {}) + .get(prefix, {}) + .get(key_serialized, UNDEFINED) + ) + if cached is DELETED: + return False + + if cached is not UNDEFINED: + return True + + return self._exists(key_serialized, cf_name) + + @validate_transaction_status(PartitionTransactionStatus.STARTED) + def prepare(self, processed_offset: int): + """ + Produce changelog messages to the changelog topic for all changes accumulated + in this transaction and prepare transaction to flush its state to the state + store. + + After successful `prepare()`, the transaction status is changed to PREPARED, + and it cannot receive updates anymore. + + If changelog is disabled for this application, no updates will be produced + to the changelog topic. + + :param processed_offset: the offset of the latest processed message + """ + + try: + self._prepare(processed_offset=processed_offset) + self._status = PartitionTransactionStatus.PREPARED + except Exception: + self._status = PartitionTransactionStatus.FAILED + raise + + def _prepare(self, processed_offset: int): + if self._changelog_producer is None: + return + + logger.debug( + f"Flushing state changes to the changelog topic " + f'topic_name="{self._changelog_producer.changelog_name}" ' + f"partition={self._changelog_producer.partition} " + f"processed_offset={processed_offset}" + ) + for cf_name, cf_update_cache in self._update_cache.items(): + source_tp_offset_header = json_dumps(processed_offset) + headers = { + CHANGELOG_CF_MESSAGE_HEADER: cf_name, + CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER: source_tp_offset_header, + } + for _, prefix_update_cache in cf_update_cache.items(): + for key, value in prefix_update_cache.items(): + # Produce changes to the changelog topic + self._changelog_producer.produce( + key=key, + value=value if value is not DELETED else None, + headers=headers, + ) + + @validate_transaction_status( + PartitionTransactionStatus.STARTED, PartitionTransactionStatus.PREPARED + ) + def flush( + self, + processed_offset: Optional[int] = None, + changelog_offset: Optional[int] = None, + ): + """ + Flush the recent updates to the database. + It writes the WriteBatch to RocksDB and marks itself as finished. + + If writing fails, the transaction is marked as failed and + cannot be used anymore. + + >***NOTE:*** If no keys have been modified during the transaction + (i.e. no "set" or "delete" have been called at least once), it will + not flush ANY data to the database including the offset to optimize + I/O. + + :param processed_offset: offset of the last processed message, optional. + :param changelog_offset: offset of the last produced changelog message, + optional. + """ + try: + self._flush(processed_offset, changelog_offset) + self._status = PartitionTransactionStatus.COMPLETE + except Exception: + self._status = PartitionTransactionStatus.FAILED + raise + + def _flush(self, processed_offset: Optional[int], changelog_offset: Optional[int]): + if not self._update_cache: + return + + if changelog_offset is not None: + current_changelog_offset = self._partition.get_changelog_offset() + if ( + current_changelog_offset is not None + and changelog_offset < current_changelog_offset + ): + raise InvalidChangelogOffset( + "Cannot set changelog offset lower than already saved one" + ) + + self._partition.write( + data=self._update_cache, + processed_offset=processed_offset, + changelog_offset=changelog_offset, + ) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_val is None and not self.failed: + self.flush() diff --git a/quixstreams/state/exceptions.py b/quixstreams/state/exceptions.py index 5d758d520..f5463e797 100644 --- a/quixstreams/state/exceptions.py +++ b/quixstreams/state/exceptions.py @@ -20,3 +20,18 @@ class StoreTransactionFailed(QuixException): ... class InvalidStoreChangelogOffset(QuixException): ... + + +class StateError(QuixException): ... + + +class StateSerializationError(StateError): ... + + +class StateTransactionError(StateError): ... + + +class ColumnFamilyHeaderMissing(StateError): ... + + +class InvalidChangelogOffset(StateError): ... diff --git a/quixstreams/state/manager.py b/quixstreams/state/manager.py index dbe6cd54d..2d370317a 100644 --- a/quixstreams/state/manager.py +++ b/quixstreams/state/manager.py @@ -12,7 +12,7 @@ from .recovery import RecoveryManager, ChangelogProducerFactory from .rocksdb import RocksDBStore, RocksDBOptionsType from .rocksdb.windowed.store import WindowedRocksDBStore -from .types import Store, StorePartition +from .base import Store, StorePartition __all__ = ("StateStoreManager", "DEFAULT_STATE_STORE_NAME") diff --git a/quixstreams/state/metadata.py b/quixstreams/state/metadata.py new file mode 100644 index 000000000..cd91295ab --- /dev/null +++ b/quixstreams/state/metadata.py @@ -0,0 +1,12 @@ +from typing import NewType + +PREFIX_SEPARATOR = b"|" + +CHANGELOG_CF_MESSAGE_HEADER = "__column_family__" +CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER = "__processed_tp_offset__" + +Undefined = NewType("Undefined", object) +UNDEFINED = Undefined(object()) +DELETED = Undefined(object()) + +DEFAULT_PREFIX = b"" diff --git a/quixstreams/state/recovery.py b/quixstreams/state/recovery.py index 750bb2407..bf0c35331 100644 --- a/quixstreams/state/recovery.py +++ b/quixstreams/state/recovery.py @@ -8,7 +8,7 @@ from quixstreams.models.topics import TopicManager from quixstreams.models.types import MessageHeadersMapping from quixstreams.rowproducer import RowProducer -from quixstreams.state.types import StorePartition +from quixstreams.state.base import StorePartition from quixstreams.utils.dicts import dict_values from .exceptions import InvalidStoreChangelogOffset diff --git a/quixstreams/state/rocksdb/exceptions.py b/quixstreams/state/rocksdb/exceptions.py index 008f44b6c..23e8f996b 100644 --- a/quixstreams/state/rocksdb/exceptions.py +++ b/quixstreams/state/rocksdb/exceptions.py @@ -1,31 +1,12 @@ -from quixstreams.exceptions import QuixException +from quixstreams.state.exceptions import StateError __all__ = ( - "StateSerializationError", - "StateTransactionError", "ColumnFamilyDoesNotExist", "ColumnFamilyAlreadyExists", - "ColumnFamilyHeaderMissing", - "InvalidChangelogOffset", ) -class StateError(QuixException): ... - - -class StateSerializationError(StateError): ... - - -class StateTransactionError(StateError): ... - - class ColumnFamilyDoesNotExist(StateError): ... class ColumnFamilyAlreadyExists(StateError): ... - - -class ColumnFamilyHeaderMissing(StateError): ... - - -class InvalidChangelogOffset(StateError): ... diff --git a/quixstreams/state/rocksdb/metadata.py b/quixstreams/state/rocksdb/metadata.py index 948469f0d..719eea079 100644 --- a/quixstreams/state/rocksdb/metadata.py +++ b/quixstreams/state/rocksdb/metadata.py @@ -1,9 +1,5 @@ -PREFIX_SEPARATOR = b"|" PROCESSED_OFFSET_KEY = b"__topic_offset__" CHANGELOG_OFFSET_KEY = b"__changelog_offset__" LATEST_TIMESTAMP_KEY = b"__topic_latest_timestamp__" METADATA_CF_NAME = "__metadata__" - -CHANGELOG_CF_MESSAGE_HEADER = "__column_family__" -CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER = "__processed_tp_offset__" diff --git a/quixstreams/state/rocksdb/options.py b/quixstreams/state/rocksdb/options.py index 7dba7b32c..565724203 100644 --- a/quixstreams/state/rocksdb/options.py +++ b/quixstreams/state/rocksdb/options.py @@ -4,7 +4,7 @@ import rocksdict from rocksdict import DBCompressionType -from quixstreams.state.types import DumpsFunc, LoadsFunc +from quixstreams.state.serialization import DumpsFunc, LoadsFunc from quixstreams.utils.json import dumps, loads from .types import RocksDBOptionsType, CompressionType diff --git a/quixstreams/state/rocksdb/partition.py b/quixstreams/state/rocksdb/partition.py index 7ea5afe6b..f97e83e07 100644 --- a/quixstreams/state/rocksdb/partition.py +++ b/quixstreams/state/rocksdb/partition.py @@ -5,28 +5,23 @@ from rocksdict import WriteBatch, Rdict, ColumnFamily, AccessType from quixstreams.models import ConfluentKafkaMessageProto -from quixstreams.utils.json import loads as json_loads +from quixstreams.state.metadata import DELETED from quixstreams.state.recovery import ChangelogProducer -from quixstreams.state.types import ( - StorePartition, +from quixstreams.state.base import StorePartition, CACHE_TYPE +from quixstreams.state.serialization import ( + int_from_int64_bytes, + int_to_int64_bytes, ) from .exceptions import ( ColumnFamilyAlreadyExists, ColumnFamilyDoesNotExist, - ColumnFamilyHeaderMissing, ) from .metadata import ( METADATA_CF_NAME, PROCESSED_OFFSET_KEY, CHANGELOG_OFFSET_KEY, - CHANGELOG_CF_MESSAGE_HEADER, - CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER, ) from .options import RocksDBOptions -from .serialization import ( - int_from_int64_bytes, - int_to_int64_bytes, -) from .transaction import ( RocksDBPartitionTransaction, ) @@ -60,6 +55,7 @@ def __init__( options: Optional[RocksDBOptionsType] = None, changelog_producer: Optional[ChangelogProducer] = None, ): + super().__init__() self._path = path self._options = options or RocksDBOptions() self._rocksdb_options = self._options.to_options() @@ -76,10 +72,10 @@ def begin( self, ) -> RocksDBPartitionTransaction: """ - Create a new `RocksDBTransaction` object. - Using `RocksDBTransaction` is a recommended way for accessing the data. + Create a new `RocksDBPartitionTransaction` object. + Using `RocksDBPartitionTransaction` is a recommended way for accessing the data. - :return: an instance of `RocksDBTransaction` + :return: an instance of `RocksDBPartitionTransaction` """ return RocksDBPartitionTransaction( partition=self, @@ -97,36 +93,14 @@ def _changelog_recover_flush(self, changelog_offset: int, batch: WriteBatch): int_to_int64_bytes(changelog_offset), self.get_column_family_handle(METADATA_CF_NAME), ) - self.write(batch) - - def _should_apply_changelog( - self, headers: Dict[str, bytes], committed_offset: int - ) -> bool: - """ - Determine whether the changelog update should be skipped. + self._write(batch) - :param headers: changelog message headers - :param committed_offset: latest committed offset of the source topic partition - :return: True if update should be applied, else False. - """ - # Parse the processed topic-partition-offset info from the changelog message - # headers to determine whether the update should be applied or skipped. - # It can be empty if the message was produced by the older version of the lib. - processed_offset_header = headers.get( - CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER, b"null" - ) - processed_offset = json_loads(processed_offset_header) - if processed_offset is not None: - # Skip recovering from the message if its processed offset is ahead of the - # current committed offset. - # This way it will recover to a consistent state if the checkpointing code - # produced the changelog messages but failed to commit - # the source topic offset. - return processed_offset < committed_offset - return True - - def recover_from_changelog_message( - self, changelog_message: ConfluentKafkaMessageProto, committed_offset: int + def _recover_from_changelog_message( + self, + changelog_message: ConfluentKafkaMessageProto, + cf_name: str, + processed_offset: Optional[int], + committed_offset: int, ): """ Updates state from a given changelog message. @@ -143,21 +117,11 @@ def recover_from_changelog_message( :param changelog_message: A raw Confluent message read from a changelog topic. :param committed_offset: latest committed offset for the partition """ - headers = dict(changelog_message.headers() or ()) - # Parse the column family name from message headers - cf_name = headers.get(CHANGELOG_CF_MESSAGE_HEADER, b"").decode() - if not cf_name: - raise ColumnFamilyHeaderMissing( - f"Header '{CHANGELOG_CF_MESSAGE_HEADER}' missing from changelog message" - ) - cf_handle = self.get_column_family_handle(cf_name) - batch = WriteBatch(raw_mode=True) - # Determine whether the update should be applied or skipped based on the - # latest committed offset and processed offset from the changelog message header - if self._should_apply_changelog( - headers=headers, committed_offset=committed_offset - ): + if self._should_apply_changelog(processed_offset, committed_offset): + cf_handle = self.get_column_family_handle(cf_name) + # Determine whether the update should be applied or skipped based on the + # latest committed offset and processed offset from the changelog message header key = changelog_message.key() if value := changelog_message.value(): batch.put(key, value, cf_handle) @@ -166,17 +130,60 @@ def recover_from_changelog_message( self._changelog_recover_flush(changelog_message.offset(), batch) - def set_changelog_offset(self, changelog_offset: int): + def write( + self, + data: CACHE_TYPE, + processed_offset: Optional[int], + changelog_offset: Optional[int], + batch: Optional[WriteBatch] = None, + ): """ - Set the changelog offset based on a message (usually an "offset-only" message). + Write data to RocksDB - Used during recovery. - - :param changelog_offset: A changelog offset + :param data: The modified data + :param processed_offset: The offset processed to generate the data. + :param changelog_offset: The changelog message offset of the data. """ - self._changelog_recover_flush(changelog_offset, WriteBatch(raw_mode=True)) + if batch is None: + batch = WriteBatch(raw_mode=True) + + meta_cf_handle = self.get_column_family_handle(METADATA_CF_NAME) + # Iterate over the transaction update cache + for cf_name, cf_update_cache in data.items(): + cf_handle = self.get_column_family_handle(cf_name) + for _prefix, prefix_update_cache in cf_update_cache.items(): + for key, value in prefix_update_cache.items(): + # Apply changes to the Writebatch + if value is DELETED: + batch.delete(key, cf_handle) + else: + batch.put(key, value, cf_handle) + + # Save the latest processed input topic offset + if processed_offset is not None: + batch.put( + PROCESSED_OFFSET_KEY, + int_to_int64_bytes(processed_offset), + meta_cf_handle, + ) + # Save the latest changelog topic offset to know where to recover from + # It may be None if changelog topics are disabled + if changelog_offset is not None: + batch.put( + CHANGELOG_OFFSET_KEY, + int_to_int64_bytes(changelog_offset), + meta_cf_handle, + ) + logger.debug( + f"Flushing state changes to the disk " + f'path="{self.path}" ' + f"processed_offset={processed_offset} " + f"changelog_offset={changelog_offset}" + ) + + self._write(batch) - def write(self, batch: WriteBatch): + def _write(self, batch: WriteBatch): """ Write `WriteBatch` to RocksDB :param batch: an instance of `rocksdict.WriteBatch` @@ -376,9 +383,3 @@ def _init_rocksdb(self) -> Rdict: attempt += 1 time.sleep(self._open_retry_backoff) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() diff --git a/quixstreams/state/rocksdb/store.py b/quixstreams/state/rocksdb/store.py index 62a36ff57..70d257cf5 100644 --- a/quixstreams/state/rocksdb/store.py +++ b/quixstreams/state/rocksdb/store.py @@ -2,12 +2,10 @@ from pathlib import Path from typing import Dict, Optional -from quixstreams.state.exceptions import PartitionNotAssignedError from quixstreams.state.recovery import ChangelogProducer, ChangelogProducerFactory -from quixstreams.state.types import Store +from quixstreams.state.base import Store from .partition import ( RocksDBStorePartition, - RocksDBPartitionTransaction, ) from .types import RocksDBOptionsType @@ -42,133 +40,24 @@ def __init__( if using changelogs :param options: RocksDB options. If `None`, the default options will be used. """ - self._name = name - self._topic = topic + super().__init__(name, topic) self._partitions_dir = Path(base_dir).absolute() / self._name / self._topic self._partitions: Dict[int, RocksDBStorePartition] = {} self._changelog_producer_factory = changelog_producer_factory self._options = options - @property - def topic(self) -> str: - """ - Store topic name - """ - return self._topic - - @property - def name(self) -> str: - """ - Store name - """ - return self._name - - @property - def partitions(self) -> Dict[int, RocksDBStorePartition]: - """ - Mapping of assigned store partitions - """ - return self._partitions - def create_new_partition( - self, path: str, changelog_producer: Optional[ChangelogProducer] = None + self, + partition: int, ) -> RocksDBStorePartition: - return RocksDBStorePartition( - path=path, options=self._options, changelog_producer=changelog_producer - ) - - def assign_partition(self, partition: int) -> RocksDBStorePartition: - """ - Open and assign store partition. - - If the partition is already assigned, it will not re-open it and return - the existing partition instead. - - :param partition: partition number - :return: instance of`RocksDBStorePartition` - """ - if partition in self._partitions: - logger.debug( - f'Partition "{partition}" for store "{self._name}" ' - f'(topic "{self._topic}") ' - f"is already assigned" - ) - return self._partitions[partition] - path = str((self._partitions_dir / str(partition)).absolute()) - store_partition = self.create_new_partition( - path, - ( - self._changelog_producer_factory.get_partition_producer(partition) - if self._changelog_producer_factory - else None - ), - ) - self._partitions[partition] = store_partition - logger.debug( - 'Assigned store partition "%s[%s]" (topic "%s")', - self._name, - partition, - self._topic, - ) - return store_partition - - def revoke_partition(self, partition: int): - """ - Revoke and close the assigned store partition. - - If the partition is not assigned, it will log the message and return. - - :param partition: partition number - """ - store_partition = self._partitions.get(partition) - if store_partition is None: - return - - store_partition.close() - self._partitions.pop(partition) - logger.debug( - 'Revoked store partition "%s[%s]" topic("%s")', - self._name, - partition, - self._topic, - ) - - def start_partition_transaction( - self, partition: int - ) -> RocksDBPartitionTransaction: - """ - Start a new partition transaction. - - `RocksDBPartitionTransaction` is the primary interface for working with data in - the underlying RocksDB. - - :param partition: partition number - :return: instance of `RocksDBPartitionTransaction` - """ - if partition not in self._partitions: - # Requested partition has not been assigned. Something went completely wrong - raise PartitionNotAssignedError( - f'Store partition "{self._name}[{partition}]" ' - f'(topic "{self._topic}") is not assigned' + changelog_producer: Optional[ChangelogProducer] = None + if self._changelog_producer_factory: + changelog_producer = ( + self._changelog_producer_factory.get_partition_producer(partition) ) - store_partition = self._partitions[partition] - return store_partition.begin() - - def close(self): - """ - Close the store and revoke all assigned partitions - """ - logger.debug(f'Closing store "{self._name}" (topic "{self._topic}")') - partitions = list(self._partitions.keys()) - for partition in partitions: - self.revoke_partition(partition) - logger.debug(f'Closed store "{self._name}" (topic "{self._topic}")') - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() + return RocksDBStorePartition( + path=path, options=self._options, changelog_producer=changelog_producer + ) diff --git a/quixstreams/state/rocksdb/transaction.py b/quixstreams/state/rocksdb/transaction.py index 935ad6183..299c11720 100644 --- a/quixstreams/state/rocksdb/transaction.py +++ b/quixstreams/state/rocksdb/transaction.py @@ -1,62 +1,13 @@ -import functools import logging -from typing import Any, Union, Optional, Dict, NewType, TYPE_CHECKING, Tuple +from typing import Any, Optional -from rocksdict import WriteBatch -from quixstreams.utils.json import dumps as json_dumps -from quixstreams.state.recovery import ChangelogProducer -from quixstreams.state.types import ( - DumpsFunc, - LoadsFunc, - PartitionTransaction, - PartitionTransactionStatus, -) -from .exceptions import StateTransactionError, InvalidChangelogOffset -from .metadata import ( - METADATA_CF_NAME, - PROCESSED_OFFSET_KEY, - CHANGELOG_OFFSET_KEY, - PREFIX_SEPARATOR, - CHANGELOG_CF_MESSAGE_HEADER, - CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER, -) -from .serialization import serialize, deserialize, int_to_int64_bytes -from ..state import TransactionState +from quixstreams.state.metadata import UNDEFINED +from quixstreams.state.base import PartitionTransaction -if TYPE_CHECKING: - from .partition import RocksDBStorePartition - -__all__ = ("RocksDBPartitionTransaction", "DEFAULT_PREFIX", "DELETED") +__all__ = ("RocksDBPartitionTransaction",) logger = logging.getLogger(__name__) -Undefined = NewType("Undefined", object) - -UNDEFINED = Undefined(object()) -DELETED = Undefined(object()) - -DEFAULT_PREFIX = b"" - - -def _validate_transaction_status(*allowed: PartitionTransactionStatus): - """ - Check that the status of `RocksDBTransaction` is valid before calling a method - """ - - def wrapper(func): - @functools.wraps(func) - def _wrapper(tx: "RocksDBPartitionTransaction", *args, **kwargs): - if tx.status not in allowed: - raise StateTransactionError( - f"Invalid transaction status {tx.status}, " f"allowed: {allowed}" - ) - - return func(tx, *args, **kwargs) - - return _wrapper - - return wrapper - class RocksDBPartitionTransaction(PartitionTransaction): """ @@ -88,386 +39,32 @@ class RocksDBPartitionTransaction(PartitionTransaction): `RocksDBTransaction` can be used only once. """ - __slots__ = ( - "_partition", - "_update_cache", - "_batch", - "_dumps", - "_loads", - "_status", - ) - - def __init__( - self, - partition: "RocksDBStorePartition", - dumps: DumpsFunc, - loads: LoadsFunc, - changelog_producer: Optional[ChangelogProducer] = None, - ): - """ - :param partition: instance of `RocksDBStatePartition` to be used for accessing - the underlying RocksDB - :param dumps: a function to serialize data to bytes. - :param loads: a function to deserialize data from bytes. - """ - self._partition = partition - self._update_cache: Dict[ - str, Dict[bytes, Dict[bytes, Union[bytes, Undefined]]] - ] = {"default": {}} - self._batch = WriteBatch(raw_mode=True) - self._dumps = dumps - self._loads = loads - self._status = PartitionTransactionStatus.STARTED - self._changelog_producer = changelog_producer - - @_validate_transaction_status(PartitionTransactionStatus.STARTED) - def get( - self, key: Any, prefix: bytes, default: Any = None, cf_name: str = "default" + def _get( + self, key_serialized: bytes, default: Any = None, cf_name: str = "default" ) -> Optional[Any]: """ Get a key from the store. - It first looks up the key in the update cache in case it has been updated - but not flushed yet. - It returns `None` if the key is not found and `default` is not provided. - :param key: a key to get from DB - :param prefix: a key prefix + :param key_serialized: a key to get from DB :param default: value to return if the key is not present in the state. It can be of any type. :param cf_name: rocksdb column family name. Default - "default" :return: value or `default` """ - - # First, check the update cache in case the value was previously written - # Use _undefined sentinel as default because the actual value can be "None" - key_serialized = self._serialize_key(key, prefix=prefix) - cached = ( - self._update_cache.get(cf_name, {}) - .get(prefix, {}) - .get(key_serialized, UNDEFINED) - ) - if cached is DELETED: - return default - - if cached is not UNDEFINED: - return self._deserialize_value(cached) - # The value is not found in cache, check the db stored = self._partition.get(key_serialized, UNDEFINED, cf_name=cf_name) - if stored is not UNDEFINED: - return self._deserialize_value(stored) - return default - - @_validate_transaction_status(PartitionTransactionStatus.STARTED) - def set(self, key: Any, value: Any, prefix: bytes, cf_name: str = "default"): - """ - Set a key to the store. - - It first updates the key in the update cache. - - :param key: key to store in DB - :param prefix: a key prefix - :param value: value to store in DB - :param cf_name: rocksdb column family name. Default - "default" - """ - - try: - key_serialized = self._serialize_key(key, prefix=prefix) - value_serialized = self._serialize_value(value) - self._update_cache.setdefault(cf_name, {}).setdefault(prefix, {})[ - key_serialized - ] = value_serialized - except Exception: - self._status = PartitionTransactionStatus.FAILED - raise - - @_validate_transaction_status(PartitionTransactionStatus.STARTED) - def delete(self, key: Any, prefix: bytes, cf_name: str = "default"): - """ - Delete a key from the store. - - It first deletes the key from the update cache. - - :param key: a key to delete from DB - :param prefix: a key prefix - :param cf_name: rocksdb column family name. Default - "default" - """ - try: - key_serialized = self._serialize_key(key, prefix=prefix) - self._update_cache.setdefault(cf_name, {}).setdefault(prefix, {})[ - key_serialized - ] = DELETED - - except Exception: - self._status = PartitionTransactionStatus.FAILED - raise + if stored is UNDEFINED: + return default + return self._deserialize_value(stored) - @_validate_transaction_status(PartitionTransactionStatus.STARTED) - def exists(self, key: Any, prefix: bytes, cf_name: str = "default") -> bool: + def _exists(self, key_serialized: bytes, cf_name: str = "default") -> bool: """ Check if a key exists in the store. - It first looks up the key in the update cache. - - :param key: a key to check in DB - :param prefix: a key prefix + :param key_serialized: a key to get from DB :param cf_name: rocksdb column family name. Default - "default" :return: `True` if the key exists, `False` otherwise. """ - - key_serialized = self._serialize_key(key, prefix=prefix) - cached = ( - self._update_cache.get(cf_name, {}) - .get(prefix, {}) - .get(key_serialized, UNDEFINED) - ) - if cached is DELETED: - return False - - if cached is not UNDEFINED: - return True - return self._partition.exists(key_serialized, cf_name=cf_name) - - @_validate_transaction_status(PartitionTransactionStatus.STARTED) - def prepare(self, processed_offset: int): - """ - Produce changelog messages to the changelog topic for all changes accumulated - in this transaction and prepare transaction to flush its state to the state - store. - - After successful `prepare()`, the transaction status is changed to PREPARED, - and it cannot receive updates anymore. - - If changelog is disabled for this application, no updates will be produced - to the changelog topic. - - :param processed_offset: the offset of the latest processed message - """ - try: - self._produce_changelog(processed_offset=processed_offset) - self._status = PartitionTransactionStatus.PREPARED - except Exception: - self._status = PartitionTransactionStatus.FAILED - raise - - @_validate_transaction_status( - PartitionTransactionStatus.STARTED, PartitionTransactionStatus.PREPARED - ) - def flush( - self, - processed_offset: Optional[int] = None, - changelog_offset: Optional[int] = None, - ): - """ - Flush the recent updates to the database. - It writes the WriteBatch to RocksDB and marks itself as finished. - - If writing fails, the transaction is marked as failed and - cannot be used anymore. - - >***NOTE:*** If no keys have been modified during the transaction - (i.e. no "set" or "delete" have been called at least once), it will - not flush ANY data to the database including the offset to optimize - I/O. - - :param processed_offset: offset of the last processed message, optional. - :param changelog_offset: offset of the last produced changelog message, - optional. - """ - try: - self._flush_state( - processed_offset=processed_offset, changelog_offset=changelog_offset - ) - self._status = PartitionTransactionStatus.COMPLETE - except Exception: - self._status = PartitionTransactionStatus.FAILED - raise - - @property - def status(self) -> PartitionTransactionStatus: - return self._status - - @property - def completed(self) -> bool: - """ - Check if the transaction is completed. - - It doesn't indicate whether transaction is successful or not. - Use `RocksDBTransaction.failed` for that. - - The completed transaction should not be re-used. - - :return: `True` if transaction is completed, `False` otherwise. - """ - return self._status == PartitionTransactionStatus.COMPLETE - - @property - def prepared(self) -> bool: - """ - Check if the transaction is in PREPARED status. - - Prepared transaction successfully flushed its changelog and cannot receive - updates anymore, but its state is not yet flushed to the disk - - :return: `True` if transaction is prepared, `False` otherwise. - """ - return self._status == PartitionTransactionStatus.PREPARED - - @property - def failed(self) -> bool: - """ - Check if the transaction has failed. - - The failed transaction should not be re-used because the update cache - and - - :return: `True` if transaction is failed, `False` otherwise. - """ - return self._status == PartitionTransactionStatus.FAILED - - @property - def changelog_topic_partition(self) -> Optional[Tuple[str, int]]: - """ - Return the changelog topic-partition for the StorePartition of this transaction. - - Returns `None` if changelog_producer is not provided. - - :return: (topic, partition) or None - """ - if self._changelog_producer is not None: - return ( - self._changelog_producer.changelog_name, - self._changelog_producer.partition, - ) - - def as_state(self, prefix: Any = DEFAULT_PREFIX) -> TransactionState: - """ - Create a one-time use `TransactionState` object with a limited CRUD interface - to be provided to `StreamingDataFrame` operations. - - The `TransactionState` will prefix all the keys with the supplied `prefix` - for all underlying operations. - - :param prefix: a prefix to be used for all keys - :return: an instance of `TransactionState` - """ - return TransactionState( - transaction=self, - prefix=( - prefix - if isinstance(prefix, bytes) - else serialize(prefix, dumps=self._dumps) - ), - ) - - def _produce_changelog(self, processed_offset: Optional[int] = None): - changelog_producer = self._changelog_producer - if changelog_producer is None: - return - - changelog_topic, partition = ( - changelog_producer.changelog_name, - changelog_producer.partition, - ) - logger.debug( - f"Flushing state changes to the changelog topic " - f'topic_name="{changelog_topic}" ' - f"partition={partition} " - f"processed_offset={processed_offset}" - ) - # Iterate over the transaction update cache - for cf_name, cf_update_cache in self._update_cache.items(): - source_tp_offset_header = json_dumps(processed_offset) - headers = { - CHANGELOG_CF_MESSAGE_HEADER: cf_name, - CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER: source_tp_offset_header, - } - for _prefix, prefix_update_cache in cf_update_cache.items(): - for key, value in prefix_update_cache.items(): - # Produce changes to the changelog topic - self._changelog_producer.produce( - key=key, - value=value if value is not DELETED else None, - headers=headers, - ) - - def _flush_state( - self, - processed_offset: Optional[int] = None, - changelog_offset: Optional[int] = None, - ): - meta_cf_handle = self._partition.get_column_family_handle(METADATA_CF_NAME) - # Iterate over the transaction update cache - for cf_name, cf_update_cache in self._update_cache.items(): - cf_handle = self._partition.get_column_family_handle(cf_name) - for _prefix, prefix_update_cache in cf_update_cache.items(): - for key, value in prefix_update_cache.items(): - # Apply changes to the Writebatch - if value is DELETED: - self._batch.delete(key, cf_handle) - else: - self._batch.put(key, value, cf_handle) - - if not len(self._batch): - # Exit early if transaction doesn't update anything - return - - # Save the latest processed input topic offset - if processed_offset is not None: - self._batch.put( - PROCESSED_OFFSET_KEY, - int_to_int64_bytes(processed_offset), - meta_cf_handle, - ) - # Save the latest changelog topic offset to know where to recover from - # It may be None if changelog topics are disabled - if changelog_offset is not None: - current_changelog_offset = self._partition.get_changelog_offset() - if ( - current_changelog_offset is not None - and changelog_offset < current_changelog_offset - ): - raise InvalidChangelogOffset( - "Cannot set changelog offset lower than already saved one" - ) - self._batch.put( - CHANGELOG_OFFSET_KEY, - int_to_int64_bytes(changelog_offset), - meta_cf_handle, - ) - logger.debug( - f"Flushing state changes to the disk " - f'path="{self._partition.path}" ' - f"processed_offset={processed_offset} " - f"changelog_offset={changelog_offset}" - ) - self._partition.write(self._batch) - - def _serialize_value(self, value: Any) -> bytes: - return serialize(value, dumps=self._dumps) - - def _deserialize_value(self, value: bytes) -> Any: - return deserialize(value, loads=self._loads) - - def _serialize_key(self, key: Any, prefix: bytes) -> bytes: - key_bytes = serialize(key, dumps=self._dumps) - prefix = prefix + PREFIX_SEPARATOR if prefix else b"" - return prefix + key_bytes - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """ - Note: with state transactions, context manager interface is meant - to be used mostly in unit tests. - - Normally, the Checkpoint class is responsible for managing and flushing - the transactions. - """ - - if exc_val is None and not self.failed: - self.flush() diff --git a/quixstreams/state/rocksdb/types.py b/quixstreams/state/rocksdb/types.py index 44641751d..1995f3374 100644 --- a/quixstreams/state/rocksdb/types.py +++ b/quixstreams/state/rocksdb/types.py @@ -3,7 +3,7 @@ import rocksdict -from quixstreams.state.types import DumpsFunc, LoadsFunc +from quixstreams.state.serialization import DumpsFunc, LoadsFunc CompressionType = Literal["none", "snappy", "zlib", "bz2", "lz4", "lz4hc", "zstd"] diff --git a/quixstreams/state/rocksdb/windowed/partition.py b/quixstreams/state/rocksdb/windowed/partition.py index 579292e88..ecde8d9d9 100644 --- a/quixstreams/state/rocksdb/windowed/partition.py +++ b/quixstreams/state/rocksdb/windowed/partition.py @@ -1,7 +1,10 @@ import logging -from typing import Optional +from typing import Optional, Dict -from rocksdict import ReadOptions, RdictItems # type: ignore +from rocksdict import WriteBatch, ReadOptions, RdictItems # type: ignore + +from quixstreams.state.serialization import int_from_int64_bytes, int_to_int64_bytes +from quixstreams.state.recovery import ChangelogProducer from .metadata import LATEST_EXPIRED_WINDOW_CF_NAME from .transaction import WindowedRocksDBPartitionTransaction @@ -13,11 +16,8 @@ from ..partition import ( RocksDBStorePartition, ) -from ..serialization import int_from_int64_bytes from ..types import RocksDBOptionsType -from ...recovery import ChangelogProducer - logger = logging.getLogger(__name__) @@ -74,3 +74,24 @@ def _ensure_column_family(self, cf_name: str): self.get_column_family(cf_name) except ColumnFamilyDoesNotExist: self.create_column_family(cf_name) + + def write( + self, + data: Dict, + processed_offset: Optional[int], + changelog_offset: Optional[int], + ): + batch = WriteBatch(raw_mode=True) + + cf_handle = self.get_column_family_handle(METADATA_CF_NAME) + batch.put( + LATEST_TIMESTAMP_KEY, + int_to_int64_bytes(self._latest_timestamp_ms), + cf_handle, + ) + super().write( + data=data, + processed_offset=processed_offset, + changelog_offset=changelog_offset, + batch=batch, + ) diff --git a/quixstreams/state/rocksdb/windowed/serialization.py b/quixstreams/state/rocksdb/windowed/serialization.py index 610bac4cc..8a6e1d058 100644 --- a/quixstreams/state/rocksdb/windowed/serialization.py +++ b/quixstreams/state/rocksdb/windowed/serialization.py @@ -1,7 +1,10 @@ from typing import Tuple -from ..metadata import PREFIX_SEPARATOR -from ..serialization import int_from_int64_bytes, int_to_int64_bytes +from quixstreams.state.metadata import PREFIX_SEPARATOR +from quixstreams.state.serialization import ( + int_from_int64_bytes, + int_to_int64_bytes, +) __all__ = ("parse_window_key", "encode_window_key", "encode_window_prefix") diff --git a/quixstreams/state/rocksdb/windowed/store.py b/quixstreams/state/rocksdb/windowed/store.py index ec3e96516..e469cd69f 100644 --- a/quixstreams/state/rocksdb/windowed/store.py +++ b/quixstreams/state/rocksdb/windowed/store.py @@ -1,7 +1,6 @@ -from typing import Optional, cast +from typing import Optional from .partition import WindowedRocksDBStorePartition -from .transaction import WindowedRocksDBPartitionTransaction from ..store import RocksDBStore from ..types import RocksDBOptionsType from ...recovery import ChangelogProducerFactory, ChangelogProducer @@ -39,23 +38,15 @@ def __init__( options=options, ) - def create_new_partition( - self, path: str, changelog_producer: Optional[ChangelogProducer] = None - ) -> WindowedRocksDBStorePartition: - db_partition = WindowedRocksDBStorePartition( - path=path, options=self._options, changelog_producer=changelog_producer - ) - return db_partition + def create_new_partition(self, partition) -> WindowedRocksDBStorePartition: + path = str((self._partitions_dir / str(partition)).absolute()) - def assign_partition(self, partition: int) -> WindowedRocksDBStorePartition: - return cast( - WindowedRocksDBStorePartition, super().assign_partition(partition=partition) - ) + changelog_producer: Optional[ChangelogProducer] = None + if self._changelog_producer_factory: + changelog_producer = ( + self._changelog_producer_factory.get_partition_producer(partition) + ) - def start_partition_transaction( - self, partition: int - ) -> WindowedRocksDBPartitionTransaction: - return cast( - WindowedRocksDBPartitionTransaction, - super().start_partition_transaction(partition=partition), + return WindowedRocksDBStorePartition( + path=path, options=self._options, changelog_producer=changelog_producer ) diff --git a/quixstreams/state/rocksdb/windowed/transaction.py b/quixstreams/state/rocksdb/windowed/transaction.py index 445332a86..473d3cdc1 100644 --- a/quixstreams/state/rocksdb/windowed/transaction.py +++ b/quixstreams/state/rocksdb/windowed/transaction.py @@ -2,18 +2,18 @@ from rocksdict import ReadOptions +from quixstreams.state.metadata import DELETED, PREFIX_SEPARATOR, DEFAULT_PREFIX from quixstreams.state.recovery import ChangelogProducer +from quixstreams.state.serialization import ( + serialize, + LoadsFunc, + DumpsFunc, +) + from .metadata import LATEST_EXPIRED_WINDOW_TIMESTAMP_KEY, LATEST_EXPIRED_WINDOW_CF_NAME from .serialization import encode_window_key, encode_window_prefix, parse_window_key from .state import WindowedTransactionState -from ..metadata import ( - METADATA_CF_NAME, - LATEST_TIMESTAMP_KEY, - PREFIX_SEPARATOR, -) -from ..serialization import int_to_int64_bytes, serialize -from ..transaction import RocksDBPartitionTransaction, DELETED, DEFAULT_PREFIX -from ..types import LoadsFunc, DumpsFunc +from ..transaction import RocksDBPartitionTransaction if TYPE_CHECKING: from .partition import WindowedRocksDBStorePartition @@ -86,21 +86,9 @@ def delete_window(self, start_ms: int, end_ms: int, prefix: bytes): key = encode_window_key(start_ms, end_ms) self.delete(key=key, prefix=prefix) - def flush( - self, - processed_offset: Optional[int] = None, - changelog_offset: Optional[int] = None, - ): - cf_handle = self._partition.get_column_family_handle(METADATA_CF_NAME) - self._batch.put( - LATEST_TIMESTAMP_KEY, - int_to_int64_bytes(self._latest_timestamp_ms), - cf_handle, - ) - super().flush( - processed_offset=processed_offset, changelog_offset=changelog_offset - ) + def _flush(self, processed_offset: Optional[int], changelog_offset: Optional[int]): self._partition.set_latest_timestamp(self._latest_timestamp_ms) + super()._flush(processed_offset, changelog_offset) def expire_windows( self, duration_ms: int, prefix: bytes, grace_ms: int = 0 diff --git a/quixstreams/state/rocksdb/serialization.py b/quixstreams/state/serialization.py similarity index 89% rename from quixstreams/state/rocksdb/serialization.py rename to quixstreams/state/serialization.py index 5f25fe4f3..421adc376 100644 --- a/quixstreams/state/rocksdb/serialization.py +++ b/quixstreams/state/serialization.py @@ -1,7 +1,6 @@ import struct -from typing import Any +from typing import Any, Callable -from quixstreams.state.types import DumpsFunc, LoadsFunc from .exceptions import StateSerializationError __all__ = ( @@ -15,6 +14,9 @@ _int_pack = _int_packer.pack _int_unpack = _int_packer.unpack +DumpsFunc = Callable[[Any], bytes] +LoadsFunc = Callable[[bytes], Any] + def serialize(value: Any, dumps: DumpsFunc) -> bytes: try: diff --git a/quixstreams/state/types.py b/quixstreams/state/types.py index 00621ae83..9f33a071b 100644 --- a/quixstreams/state/types.py +++ b/quixstreams/state/types.py @@ -1,305 +1,7 @@ -import enum -from typing import Protocol, Any, Optional, Callable, Dict, ClassVar, Tuple, List +import logging +from typing import Protocol, Any, Optional, Tuple, List -from quixstreams.models import ConfluentKafkaMessageProto - -DumpsFunc = Callable[[Any], bytes] -LoadsFunc = Callable[[bytes], Any] - - -class Store(Protocol): - """ - Abstract state store. - - It keeps track of individual store partitions and provides access to the - partitions' transactions. - """ - - options_type: ClassVar[object] - - @property - def topic(self) -> str: - """ - Topic name - """ - ... - - @property - def name(self) -> str: - """ - Store name - """ - ... - - @property - def partitions(self) -> Dict[int, "StorePartition"]: - """ - Mapping of assigned store partitions - :return: dict of "{partition: }" - """ - ... - - def assign_partition(self, partition: int) -> "StorePartition": - """ - Assign new store partition - - :param partition: partition number - :return: instance of `StorePartition` - """ - ... - - def revoke_partition(self, partition: int): - """ - Revoke assigned store partition - - :param partition: partition number - """ - ... - - def start_partition_transaction(self, partition: int) -> "PartitionTransaction": - """ - Start a new partition transaction. - - `PartitionTransaction` is the primary interface for working with data in Stores. - :param partition: partition number - :return: instance of `PartitionTransaction` - """ - - def close(self): - """ - Close store and revoke all store partitions - """ - ... - - def __enter__(self): ... - - def __exit__(self, exc_type, exc_val, exc_tb): ... - - -class StorePartition(Protocol): - """ - A base class to access state in the underlying storage. - It represents a single instance of some storage (e.g. a single database for - the persistent storage). - - """ - - @property - def path(self) -> str: - """ - Absolute path to RocksDB database folder - """ - ... - - def begin(self) -> "PartitionTransaction": - """ - State new `PartitionTransaction` - """ - - def recover_from_changelog_message( - self, changelog_message: ConfluentKafkaMessageProto, committed_offset: int - ): - """ - Updates state from a given changelog message. - - :param changelog_message: A raw Confluent message read from a changelog topic. - :param committed_offset: latest committed offset for the partition - """ - ... - - def get_processed_offset(self) -> Optional[int]: - """ - Get last processed offset for the given partition - :return: offset or `None` if there's no processed offset yet - """ - ... - - def get_changelog_offset(self) -> Optional[int]: - """ - Get offset that the changelog is up-to-date with. - :return: offset or `None` if there's no processed offset yet - """ - ... - - def set_changelog_offset(self, changelog_offset: int): - """ - Set the changelog offset based on a message (usually an "offset-only" message). - - Used during recovery. - - :param changelog_offset: A changelog offset - """ - ... - - -class State(Protocol): - """ - Primary interface for working with key-value state data from `StreamingDataFrame` - """ - - def get(self, key: Any, default: Any = None) -> Optional[Any]: - """ - Get the value for key if key is present in the state, else default - - :param key: key - :param default: default value to return if the key is not found - :return: value or None if the key is not found and `default` is not provided - """ - ... - - def set(self, key: Any, value: Any): - """ - Set value for the key. - :param key: key - :param value: value - """ - ... - - def delete(self, key: Any): - """ - Delete value for the key. - - This function always returns `None`, even if value is not found. - :param key: key - """ - ... - - def exists(self, key: Any) -> bool: - """ - Check if the key exists in state. - :param key: key - :return: True if key exists, False otherwise - """ - ... - - -class PartitionTransaction(Protocol): - """ - A transaction class to perform simple key-value operations like - "get", "set", "delete" and "exists" on a single storage partition. - """ - - def as_state(self, prefix: Any) -> State: - """ - Create an instance implementing the `State` protocol to be provided - to `StreamingDataFrame` functions. - All operations called on this State object will be prefixed with - the supplied `prefix`. - - :return: an instance implementing the `State` protocol - """ - ... - - def get(self, key: Any, prefix: bytes, default: Any = None) -> Optional[Any]: - """ - Get the value for key if key is present in the state, else default - - :param key: key - :param prefix: a key prefix - :param default: default value to return if the key is not found - :return: value or None if the key is not found and `default` is not provided - """ - ... - - def set(self, key: Any, prefix: bytes, value: Any): - """ - Set value for the key. - :param key: key - :param prefix: a key prefix - :param value: value - """ - ... - - def delete(self, key: Any, prefix: bytes): - """ - Delete value for the key. - - This function always returns `None`, even if value is not found. - :param key: key - :param prefix: a key prefix - """ - ... - - def exists(self, key: Any, prefix: bytes) -> bool: - """ - Check if the key exists in state. - :param key: key - :param prefix: a key prefix - :return: True if key exists, False otherwise - """ - ... - - @property - def failed(self) -> bool: - """ - Return `True` if transaction failed to update data at some point. - - Failed transactions cannot be re-used. - :return: bool - """ - ... - - @property - def completed(self) -> bool: - """ - Return `True` if transaction is successfully completed. - - Completed transactions cannot be re-used. - :return: bool - """ - ... - - @property - def prepared(self) -> bool: - """ - Return `True` if transaction is prepared completed. - - Prepared transactions cannot receive new updates, but can be flushed. - :return: bool - """ - ... - - def prepare(self, processed_offset: int): - """ - Produce changelog messages to the changelog topic for all changes accumulated - in this transaction and prepare transcation to flush its state to the state - store. - - After successful `prepare()`, the transaction status is changed to PREPARED, - and it cannot receive updates anymore. - - If changelog is disabled for this application, no updates will be produced - to the changelog topic. - - :param processed_offset: the offset of the latest processed message - """ - - @property - def changelog_topic_partition(self) -> Optional[Tuple[str, int]]: - """ - Return the changelog topic-partition for the StorePartition of this transaction. - - Returns `None` if changelog_producer is not provided. - - :return: (topic, partition) or None - """ - - def flush( - self, - processed_offset: Optional[int] = None, - changelog_offset: Optional[int] = None, - ): - """ - Flush the recent updates to the storage. - - :param processed_offset: offset of the last processed message, optional. - :param changelog_offset: offset of the last produced changelog message, - optional. - """ - - def __enter__(self): ... - - def __exit__(self, exc_type, exc_val, exc_tb): ... +logger = logging.getLogger(__name__) class WindowedState(Protocol): @@ -513,14 +215,3 @@ def flush(self): Flush the recovery update to the storage. """ ... - - -class PartitionTransactionStatus(enum.Enum): - STARTED = 1 # Transaction is started and accepts updates - - PREPARED = 2 # Transaction is prepared, it can no longer receive updates - # and can only be flushed - - COMPLETE = 3 # Transaction is fully completed, it cannot be used anymore - - FAILED = 4 # Transaction is failed, it cannot be used anymore diff --git a/tests/test_quixstreams/test_state/fixtures.py b/tests/test_quixstreams/test_state/fixtures.py index 20184a187..acee5c8f0 100644 --- a/tests/test_quixstreams/test_state/fixtures.py +++ b/tests/test_quixstreams/test_state/fixtures.py @@ -7,7 +7,7 @@ from quixstreams.kafka import Consumer from quixstreams.models import TopicManager from quixstreams.state.recovery import RecoveryPartition, RecoveryManager -from quixstreams.state.types import StorePartition +from quixstreams.state.base import StorePartition @pytest.fixture() diff --git a/tests/test_quixstreams/test_state/test_recovery/test_recovery_manager.py b/tests/test_quixstreams/test_state/test_recovery/test_recovery_manager.py index 339b32c55..c4cf72fc3 100644 --- a/tests/test_quixstreams/test_state/test_recovery/test_recovery_manager.py +++ b/tests/test_quixstreams/test_state/test_recovery/test_recovery_manager.py @@ -6,8 +6,8 @@ from quixstreams.kafka import Consumer from quixstreams.models import TopicManager, TopicConfig from quixstreams.state.exceptions import InvalidStoreChangelogOffset +from quixstreams.state.metadata import CHANGELOG_CF_MESSAGE_HEADER from quixstreams.state.rocksdb import RocksDBStorePartition -from quixstreams.state.rocksdb.metadata import CHANGELOG_CF_MESSAGE_HEADER from tests.utils import ConfluentKafkaMessageStub diff --git a/tests/test_quixstreams/test_state/test_rocksdb/test_partition.py b/tests/test_quixstreams/test_state/test_rocksdb/test_partition.py index 3da6649f5..95e37d39c 100644 --- a/tests/test_quixstreams/test_state/test_rocksdb/test_partition.py +++ b/tests/test_quixstreams/test_state/test_rocksdb/test_partition.py @@ -5,14 +5,14 @@ import pytest from rocksdict import Rdict +from quixstreams.state.exceptions import ColumnFamilyHeaderMissing from quixstreams.state.rocksdb import ( RocksDBStorePartition, RocksDBOptions, ColumnFamilyAlreadyExists, ColumnFamilyDoesNotExist, - ColumnFamilyHeaderMissing, ) -from quixstreams.state.rocksdb.metadata import ( +from quixstreams.state.metadata import ( CHANGELOG_CF_MESSAGE_HEADER, PREFIX_SEPARATOR, CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER, diff --git a/tests/test_quixstreams/test_state/test_rocksdb/test_transaction.py b/tests/test_quixstreams/test_state/test_rocksdb/test_transaction.py index 9d5cf2d81..6f8778cfc 100644 --- a/tests/test_quixstreams/test_state/test_rocksdb/test_transaction.py +++ b/tests/test_quixstreams/test_state/test_rocksdb/test_transaction.py @@ -4,21 +4,22 @@ from unittest.mock import patch import pytest -import rocksdict -from quixstreams.state.rocksdb import ( +from quixstreams.state.exceptions import ( StateSerializationError, StateTransactionError, + InvalidChangelogOffset, +) +from quixstreams.state.rocksdb import ( RocksDBStorePartition, RocksDBOptions, RocksDBPartitionTransaction, - InvalidChangelogOffset, ) -from quixstreams.state.rocksdb.metadata import ( +from quixstreams.state.metadata import ( CHANGELOG_CF_MESSAGE_HEADER, CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER, ) -from quixstreams.state.rocksdb.serialization import serialize +from quixstreams.state.serialization import serialize from quixstreams.utils.json import dumps TEST_KEYS = [ @@ -204,13 +205,20 @@ def test_delete_serialization_error(self, key, rocksdb_partition): def test_get_deserialization_error(self, rocksdb_partition): bytes_ = secrets.token_bytes(10) string_ = "string" - - batch = rocksdict.WriteBatch(raw_mode=True) - # Set non-deserializable key and valid value - batch.put(bytes_, serialize(string_, dumps=dumps)) - # Set valid key and non-deserializable value - batch.put(serialize(string_, dumps=dumps), bytes_) - rocksdb_partition.write(batch) + rocksdb_partition.write( + data={ + "default": { + "": { + # Set non-deserializable key and valid value + bytes_: serialize(string_, dumps=dumps), + # Set valid key and non-deserializable value + serialize(string_, dumps=dumps): bytes_, + } + } + }, + processed_offset=None, + changelog_offset=None, + ) with rocksdb_partition.begin() as tx: with pytest.raises(StateSerializationError): diff --git a/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_partition.py b/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_partition.py index 0e7110302..a1758ba2b 100644 --- a/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_partition.py +++ b/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_partition.py @@ -1,6 +1,6 @@ import pytest -from quixstreams.state.rocksdb.metadata import ( +from quixstreams.state.metadata import ( CHANGELOG_CF_MESSAGE_HEADER, PREFIX_SEPARATOR, ) diff --git a/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_serialization.py b/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_serialization.py index 093ac460b..2d3a50387 100644 --- a/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_serialization.py +++ b/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_serialization.py @@ -1,6 +1,6 @@ import pytest -from quixstreams.state.rocksdb.metadata import PREFIX_SEPARATOR +from quixstreams.state.metadata import PREFIX_SEPARATOR from quixstreams.state.rocksdb.windowed.serialization import ( encode_window_key, parse_window_key, diff --git a/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_transaction.py b/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_transaction.py index 2a664c3cc..9343a08fe 100644 --- a/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_transaction.py +++ b/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_transaction.py @@ -1,6 +1,6 @@ import pytest -from quixstreams.state.rocksdb.metadata import ( +from quixstreams.state.metadata import ( CHANGELOG_CF_MESSAGE_HEADER, CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER, ) From a632e9bbfa475cd3c845e57372763d6d9613f9a7 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Wed, 2 Oct 2024 11:29:59 +0200 Subject: [PATCH 2/6] state: refactor to base transaction --- quixstreams/state/base/partition.py | 59 +++++++++++++--- quixstreams/state/base/state.py | 2 +- quixstreams/state/base/transaction.py | 24 +++---- quixstreams/state/rocksdb/__init__.py | 1 - quixstreams/state/rocksdb/partition.py | 29 ++------ quixstreams/state/rocksdb/transaction.py | 70 ------------------- .../state/rocksdb/windowed/transaction.py | 4 +- tests/test_quixstreams/test_checkpointing.py | 4 +- .../{test_rocksdb => }/test_transaction.py | 17 ++--- 9 files changed, 78 insertions(+), 132 deletions(-) delete mode 100644 quixstreams/state/rocksdb/transaction.py rename tests/test_quixstreams/test_state/{test_rocksdb => }/test_transaction.py (97%) diff --git a/quixstreams/state/base/partition.py b/quixstreams/state/base/partition.py index ce651e6c1..08cfaaafa 100644 --- a/quixstreams/state/base/partition.py +++ b/quixstreams/state/base/partition.py @@ -1,7 +1,7 @@ import logging from abc import ABC, abstractmethod -from typing import Optional +from typing import Optional, Union, Any, TYPE_CHECKING from quixstreams.models import ConfluentKafkaMessageProto from quixstreams.state.exceptions import ColumnFamilyHeaderMissing @@ -9,10 +9,15 @@ CHANGELOG_CF_MESSAGE_HEADER, CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER, ) +from quixstreams.state.serialization import DumpsFunc, LoadsFunc from quixstreams.utils.json import loads as json_loads from .transaction import PartitionTransaction, CACHE_TYPE +if TYPE_CHECKING: + from quixstreams.state.recovery import ChangelogProducer + + __all__ = ("StorePartition",) logger = logging.getLogger(__name__) @@ -25,14 +30,16 @@ class StorePartition(ABC): the persistent storage). """ - def __init__(self) -> None: + def __init__( + self, + dumps: DumpsFunc, + loads: LoadsFunc, + changelog_producer: "ChangelogProducer", + ) -> None: super().__init__() - - @abstractmethod - def begin(self) -> PartitionTransaction: - """ - Start new `PartitionTransaction` - """ + self._dumps = dumps + self._loads = loads + self._changelog_producer = changelog_producer @abstractmethod def close(self): ... @@ -73,6 +80,42 @@ def write( :param changelog_offset: The changelog message offset of the data. """ + @abstractmethod + def get( + self, key: bytes, default: Any = None, cf_name: str = "default" + ) -> Union[None, bytes, Any]: + """ + Get a key from the store + + :param key: a key encoded to `bytes` + :param default: a default value to return if the key is not found. + :param cf_name: rocksdb column family name. Default - "default" + :return: a value if the key is present in the store. Otherwise, `default` + """ + + @abstractmethod + def exists(self, key: bytes, cf_name: str = "default") -> bool: + """ + Check if a key is present in the store. + + :param key: a key encoded to `bytes`. + :param cf_name: rocksdb column family name. Default - "default" + :return: `True` if the key is present, `False` otherwise. + """ + + def begin(self) -> PartitionTransaction: + """ + Start a new `PartitionTransaction` + + Using `PartitionTransaction` is a recommended way for accessing the data. + """ + return PartitionTransaction( + partition=self, + dumps=self._dumps, + loads=self._loads, + changelog_producer=self._changelog_producer, + ) + def recover_from_changelog_message( self, changelog_message: ConfluentKafkaMessageProto, committed_offset: int ): diff --git a/quixstreams/state/base/state.py b/quixstreams/state/base/state.py index 5167b8b9c..1a4431497 100644 --- a/quixstreams/state/base/state.py +++ b/quixstreams/state/base/state.py @@ -3,7 +3,7 @@ from typing import Any, Optional, TYPE_CHECKING if TYPE_CHECKING: - from quixstreams.state.base import PartitionTransaction + from .transaction import PartitionTransaction __all__ = ("State", "TransactionState") diff --git a/quixstreams/state/base/transaction.py b/quixstreams/state/base/transaction.py index fc3f73b3d..2f2fc1af6 100644 --- a/quixstreams/state/base/transaction.py +++ b/quixstreams/state/base/transaction.py @@ -10,7 +10,7 @@ TYPE_CHECKING, ) -from abc import ABC, abstractmethod +from abc import ABC from quixstreams.state.exceptions import ( StateTransactionError, @@ -36,8 +36,8 @@ from .state import State, TransactionState if TYPE_CHECKING: + from quixstreams.state.recovery import ChangelogProducer from .partition import StorePartition - from ..recovery import ChangelogProducer __all__ = ("PartitionTransactionStatus", "PartitionTransaction", "CACHE_TYPE") @@ -99,14 +99,6 @@ def __init__( self._update_cache: CACHE_TYPE = {} - @abstractmethod - def _get( - self, key_serialized: bytes, default: Any = None, cf_name: str = "default" - ) -> Optional[Any]: ... - - @abstractmethod - def _exists(self, key_serialized: bytes, cf_name: str) -> bool: ... - @property def changelog_producer(self) -> Optional["ChangelogProducer"]: return self._changelog_producer @@ -198,7 +190,9 @@ def get( cf_name: str = "default", ) -> Optional[Any]: """ - Get the value for key if key is present in the state, else default + Get a key from the store. + + It returns `None` if the key is not found and `default` is not provided. :param key: key :param prefix: a key prefix @@ -218,7 +212,11 @@ def get( if cached is not UNDEFINED: return self._deserialize_value(cached) - return self._get(key_serialized, default, cf_name) + stored = self._partition.get(key_serialized, UNDEFINED, cf_name) + if stored is UNDEFINED: + return default + + return self._deserialize_value(stored) @validate_transaction_status(PartitionTransactionStatus.STARTED) def set(self, key: Any, value: Any, prefix: bytes, cf_name: str = "default"): @@ -277,7 +275,7 @@ def exists(self, key: Any, prefix: bytes, cf_name: str = "default") -> bool: if cached is not UNDEFINED: return True - return self._exists(key_serialized, cf_name) + return self._partition.exists(key_serialized, cf_name=cf_name) @validate_transaction_status(PartitionTransactionStatus.STARTED) def prepare(self, processed_offset: int): diff --git a/quixstreams/state/rocksdb/__init__.py b/quixstreams/state/rocksdb/__init__.py index 7e4416821..0ff35eb8d 100644 --- a/quixstreams/state/rocksdb/__init__.py +++ b/quixstreams/state/rocksdb/__init__.py @@ -3,5 +3,4 @@ from .options import * from .partition import * from .store import * -from .transaction import * from .types import * diff --git a/quixstreams/state/rocksdb/partition.py b/quixstreams/state/rocksdb/partition.py index f97e83e07..d4c96025b 100644 --- a/quixstreams/state/rocksdb/partition.py +++ b/quixstreams/state/rocksdb/partition.py @@ -22,9 +22,6 @@ CHANGELOG_OFFSET_KEY, ) from .options import RocksDBOptions -from .transaction import ( - RocksDBPartitionTransaction, -) from .types import RocksDBOptionsType __all__ = ("RocksDBStorePartition",) @@ -55,34 +52,18 @@ def __init__( options: Optional[RocksDBOptionsType] = None, changelog_producer: Optional[ChangelogProducer] = None, ): - super().__init__() + if not options: + options = RocksDBOptions() + + super().__init__(options.dumps, options.loads, changelog_producer) self._path = path - self._options = options or RocksDBOptions() + self._options = options self._rocksdb_options = self._options.to_options() - self._dumps = self._options.dumps - self._loads = self._options.loads self._open_max_retries = self._options.open_max_retries self._open_retry_backoff = self._options.open_retry_backoff self._db = self._init_rocksdb() self._cf_cache: Dict[str, Rdict] = {} self._cf_handle_cache: Dict[str, ColumnFamily] = {} - self._changelog_producer = changelog_producer - - def begin( - self, - ) -> RocksDBPartitionTransaction: - """ - Create a new `RocksDBPartitionTransaction` object. - Using `RocksDBPartitionTransaction` is a recommended way for accessing the data. - - :return: an instance of `RocksDBPartitionTransaction` - """ - return RocksDBPartitionTransaction( - partition=self, - dumps=self._dumps, - loads=self._loads, - changelog_producer=self._changelog_producer, - ) def _changelog_recover_flush(self, changelog_offset: int, batch: WriteBatch): """ diff --git a/quixstreams/state/rocksdb/transaction.py b/quixstreams/state/rocksdb/transaction.py deleted file mode 100644 index 299c11720..000000000 --- a/quixstreams/state/rocksdb/transaction.py +++ /dev/null @@ -1,70 +0,0 @@ -import logging -from typing import Any, Optional - -from quixstreams.state.metadata import UNDEFINED -from quixstreams.state.base import PartitionTransaction - -__all__ = ("RocksDBPartitionTransaction",) - -logger = logging.getLogger(__name__) - - -class RocksDBPartitionTransaction(PartitionTransaction): - """ - A transaction class to perform simple key-value operations like - "get", "set", "delete" and "exists" on a single RocksDB partition. - - Serialization - ************* - `RocksDBTransaction` automatically serializes keys and values to bytes. - - Prefixing - ********* - Methods `get()`, `set()`, `delete()` and `exists()` methods require prefixes for - the keys. - Normally, the Kafka message keys are supposed to be used as prefixes. - - Transactional properties - ************************ - `RocksDBTransaction` uses a combination of in-memory update cache - and RocksDB's WriteBatch in order to accumulate all the state mutations - in a single batch, flush them atomically, and allow the updates be visible - within the transaction before it's flushed (aka "read-your-own-writes" problem). - - If any mutation fails during the transaction - (e.g., failed to write the updates to the RocksDB), the whole transaction - will be marked as failed and cannot be used anymore. - In this case, a new `RocksDBTransaction` should be created. - - `RocksDBTransaction` can be used only once. - """ - - def _get( - self, key_serialized: bytes, default: Any = None, cf_name: str = "default" - ) -> Optional[Any]: - """ - Get a key from the store. - - It returns `None` if the key is not found and `default` is not provided. - - :param key_serialized: a key to get from DB - :param default: value to return if the key is not present in the state. - It can be of any type. - :param cf_name: rocksdb column family name. Default - "default" - :return: value or `default` - """ - # The value is not found in cache, check the db - stored = self._partition.get(key_serialized, UNDEFINED, cf_name=cf_name) - if stored is UNDEFINED: - return default - return self._deserialize_value(stored) - - def _exists(self, key_serialized: bytes, cf_name: str = "default") -> bool: - """ - Check if a key exists in the store. - - :param key_serialized: a key to get from DB - :param cf_name: rocksdb column family name. Default - "default" - :return: `True` if the key exists, `False` otherwise. - """ - return self._partition.exists(key_serialized, cf_name=cf_name) diff --git a/quixstreams/state/rocksdb/windowed/transaction.py b/quixstreams/state/rocksdb/windowed/transaction.py index 473d3cdc1..b9287b980 100644 --- a/quixstreams/state/rocksdb/windowed/transaction.py +++ b/quixstreams/state/rocksdb/windowed/transaction.py @@ -9,17 +9,17 @@ LoadsFunc, DumpsFunc, ) +from quixstreams.state.base.transaction import PartitionTransaction from .metadata import LATEST_EXPIRED_WINDOW_TIMESTAMP_KEY, LATEST_EXPIRED_WINDOW_CF_NAME from .serialization import encode_window_key, encode_window_prefix, parse_window_key from .state import WindowedTransactionState -from ..transaction import RocksDBPartitionTransaction if TYPE_CHECKING: from .partition import WindowedRocksDBStorePartition -class WindowedRocksDBPartitionTransaction(RocksDBPartitionTransaction): +class WindowedRocksDBPartitionTransaction(PartitionTransaction): __slots__ = ("_latest_timestamp_ms",) def __init__( diff --git a/tests/test_quixstreams/test_checkpointing.py b/tests/test_quixstreams/test_checkpointing.py index 1b33f8610..1eb05cb64 100644 --- a/tests/test_quixstreams/test_checkpointing.py +++ b/tests/test_quixstreams/test_checkpointing.py @@ -16,8 +16,8 @@ from quixstreams.sinks import SinkManager, BatchingSink, SinkBackpressureError from quixstreams.sinks.base import SinkBatch from quixstreams.state import StateStoreManager +from quixstreams.state.base import PartitionTransaction from quixstreams.state.exceptions import StoreNotRegisteredError, StoreTransactionFailed -from quixstreams.state.rocksdb import RocksDBPartitionTransaction from tests.utils import DummySink @@ -309,7 +309,7 @@ def test_commit_has_failed_transactions_fails( with ( contextlib.suppress(ValueError), patch.object( - RocksDBPartitionTransaction, + PartitionTransaction, "_serialize_key", side_effect=ValueError("test"), ), diff --git a/tests/test_quixstreams/test_state/test_rocksdb/test_transaction.py b/tests/test_quixstreams/test_state/test_transaction.py similarity index 97% rename from tests/test_quixstreams/test_state/test_rocksdb/test_transaction.py rename to tests/test_quixstreams/test_state/test_transaction.py index 6f8778cfc..425cbb080 100644 --- a/tests/test_quixstreams/test_state/test_rocksdb/test_transaction.py +++ b/tests/test_quixstreams/test_state/test_transaction.py @@ -5,16 +5,13 @@ import pytest +from quixstreams.state.base import PartitionTransaction from quixstreams.state.exceptions import ( StateSerializationError, StateTransactionError, InvalidChangelogOffset, ) -from quixstreams.state.rocksdb import ( - RocksDBStorePartition, - RocksDBOptions, - RocksDBPartitionTransaction, -) +from quixstreams.state.rocksdb import RocksDBOptions from quixstreams.state.metadata import ( CHANGELOG_CF_MESSAGE_HEADER, CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER, @@ -48,7 +45,7 @@ ] -class TestRocksDBPartitionTransaction: +class TestPartitionTransaction: def test_transaction_complete(self, rocksdb_partition): with rocksdb_partition.begin() as tx: ... @@ -68,7 +65,7 @@ def test_transaction_doesnt_write_empty_batch( with rocksdb_partition_factory( changelog_producer=changelog_producer_mock ) as partition: - with patch.object(RocksDBStorePartition, "write") as mocked: + with patch.object(partition, "write") as mocked: with partition.begin() as tx: tx.get("key", prefix=prefix) @@ -259,7 +256,7 @@ def test_update_key_failed_transaction_failed(self, operation, rocksdb_partition prefix = b"__key__" with patch.object( - RocksDBPartitionTransaction, + PartitionTransaction, "_serialize_key", side_effect=ValueError("test"), ): @@ -395,9 +392,7 @@ def test_flush_failed_transaction_failed(self, rocksdb_partition): """ prefix = b"__key__" - with patch.object( - RocksDBStorePartition, "write", side_effect=ValueError("test") - ): + with patch.object(rocksdb_partition, "write", side_effect=ValueError("test")): with rocksdb_partition.begin() as tx: tx.set("key", "value", prefix=prefix) From 9dd97edc49444e068f934603e6637ced442b7ba0 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Thu, 3 Oct 2024 11:04:12 +0200 Subject: [PATCH 3/6] fix typing --- quixstreams/state/base/partition.py | 8 +++++-- quixstreams/state/base/transaction.py | 3 +-- .../state/rocksdb/windowed/partition.py | 18 ++++++++++----- .../state/rocksdb/windowed/transaction.py | 22 +++++++++++++++++-- 4 files changed, 39 insertions(+), 12 deletions(-) diff --git a/quixstreams/state/base/partition.py b/quixstreams/state/base/partition.py index 08cfaaafa..99795027f 100644 --- a/quixstreams/state/base/partition.py +++ b/quixstreams/state/base/partition.py @@ -34,7 +34,7 @@ def __init__( self, dumps: DumpsFunc, loads: LoadsFunc, - changelog_producer: "ChangelogProducer", + changelog_producer: Optional["ChangelogProducer"], ) -> None: super().__init__() self._dumps = dumps @@ -46,7 +46,11 @@ def close(self): ... @abstractmethod def _recover_from_changelog_message( - self, changelog_message: ConfluentKafkaMessageProto, committed_offset: int + self, + changelog_message: ConfluentKafkaMessageProto, + cf_name: str, + processed_offset: Optional[int], + committed_offset: int, ): ... @abstractmethod diff --git a/quixstreams/state/base/transaction.py b/quixstreams/state/base/transaction.py index 2f2fc1af6..56840b3da 100644 --- a/quixstreams/state/base/transaction.py +++ b/quixstreams/state/base/transaction.py @@ -22,7 +22,6 @@ DELETED, PREFIX_SEPARATOR, UNDEFINED, - Undefined, DEFAULT_PREFIX, ) from quixstreams.state.serialization import ( @@ -42,7 +41,7 @@ __all__ = ("PartitionTransactionStatus", "PartitionTransaction", "CACHE_TYPE") logger = logging.getLogger(__name__) -CACHE_TYPE = Dict[str, Dict[bytes, Dict[bytes, Union[bytes, Undefined]]]] +CACHE_TYPE = Dict[str, Dict[bytes, Dict[bytes, Union[bytes, DELETED]]]] class PartitionTransactionStatus(enum.Enum): diff --git a/quixstreams/state/rocksdb/windowed/partition.py b/quixstreams/state/rocksdb/windowed/partition.py index ecde8d9d9..52a19ac37 100644 --- a/quixstreams/state/rocksdb/windowed/partition.py +++ b/quixstreams/state/rocksdb/windowed/partition.py @@ -80,18 +80,24 @@ def write( data: Dict, processed_offset: Optional[int], changelog_offset: Optional[int], + batch: Optional[WriteBatch] = None, + latest_timestamp_ms: Optional[int] = None, ): batch = WriteBatch(raw_mode=True) - cf_handle = self.get_column_family_handle(METADATA_CF_NAME) - batch.put( - LATEST_TIMESTAMP_KEY, - int_to_int64_bytes(self._latest_timestamp_ms), - cf_handle, - ) + if latest_timestamp_ms: + cf_handle = self.get_column_family_handle(METADATA_CF_NAME) + batch.put( + LATEST_TIMESTAMP_KEY, + int_to_int64_bytes(latest_timestamp_ms), + cf_handle, + ) super().write( data=data, processed_offset=processed_offset, changelog_offset=changelog_offset, batch=batch, ) + + if latest_timestamp_ms: + self.set_latest_timestamp(latest_timestamp_ms) diff --git a/quixstreams/state/rocksdb/windowed/transaction.py b/quixstreams/state/rocksdb/windowed/transaction.py index b9287b980..fb52fe98d 100644 --- a/quixstreams/state/rocksdb/windowed/transaction.py +++ b/quixstreams/state/rocksdb/windowed/transaction.py @@ -10,6 +10,7 @@ DumpsFunc, ) from quixstreams.state.base.transaction import PartitionTransaction +from quixstreams.state.exceptions import InvalidChangelogOffset from .metadata import LATEST_EXPIRED_WINDOW_TIMESTAMP_KEY, LATEST_EXPIRED_WINDOW_CF_NAME from .serialization import encode_window_key, encode_window_prefix, parse_window_key @@ -87,8 +88,25 @@ def delete_window(self, start_ms: int, end_ms: int, prefix: bytes): self.delete(key=key, prefix=prefix) def _flush(self, processed_offset: Optional[int], changelog_offset: Optional[int]): - self._partition.set_latest_timestamp(self._latest_timestamp_ms) - super()._flush(processed_offset, changelog_offset) + if not self._update_cache: + return + + if changelog_offset is not None: + current_changelog_offset = self._partition.get_changelog_offset() + if ( + current_changelog_offset is not None + and changelog_offset < current_changelog_offset + ): + raise InvalidChangelogOffset( + "Cannot set changelog offset lower than already saved one" + ) + + self._partition.write( + data=self._update_cache, + processed_offset=processed_offset, + changelog_offset=changelog_offset, + latest_timestamp_ms=self._latest_timestamp_ms, + ) def expire_windows( self, duration_ms: int, prefix: bytes, grace_ms: int = 0 From 8f6c53ebc6ef0972a691c09235113cc456e0fc4b Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Thu, 3 Oct 2024 11:14:10 +0200 Subject: [PATCH 4/6] fix cache type --- quixstreams/state/base/transaction.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/quixstreams/state/base/transaction.py b/quixstreams/state/base/transaction.py index 56840b3da..2f2fc1af6 100644 --- a/quixstreams/state/base/transaction.py +++ b/quixstreams/state/base/transaction.py @@ -22,6 +22,7 @@ DELETED, PREFIX_SEPARATOR, UNDEFINED, + Undefined, DEFAULT_PREFIX, ) from quixstreams.state.serialization import ( @@ -41,7 +42,7 @@ __all__ = ("PartitionTransactionStatus", "PartitionTransaction", "CACHE_TYPE") logger = logging.getLogger(__name__) -CACHE_TYPE = Dict[str, Dict[bytes, Dict[bytes, Union[bytes, DELETED]]]] +CACHE_TYPE = Dict[str, Dict[bytes, Dict[bytes, Union[bytes, Undefined]]]] class PartitionTransactionStatus(enum.Enum): From dfe71cbddb039ea2872890164bf3b3acb1c96616 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Thu, 3 Oct 2024 11:54:47 +0200 Subject: [PATCH 5/6] review comments --- quixstreams/state/rocksdb/metadata.py | 2 -- quixstreams/state/rocksdb/windowed/metadata.py | 2 ++ quixstreams/state/rocksdb/windowed/partition.py | 15 +++++---------- quixstreams/state/serialization.py | 2 ++ 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/quixstreams/state/rocksdb/metadata.py b/quixstreams/state/rocksdb/metadata.py index 719eea079..ab549ed1a 100644 --- a/quixstreams/state/rocksdb/metadata.py +++ b/quixstreams/state/rocksdb/metadata.py @@ -1,5 +1,3 @@ PROCESSED_OFFSET_KEY = b"__topic_offset__" CHANGELOG_OFFSET_KEY = b"__changelog_offset__" -LATEST_TIMESTAMP_KEY = b"__topic_latest_timestamp__" - METADATA_CF_NAME = "__metadata__" diff --git a/quixstreams/state/rocksdb/windowed/metadata.py b/quixstreams/state/rocksdb/windowed/metadata.py index c21669245..1f7a84f7e 100644 --- a/quixstreams/state/rocksdb/windowed/metadata.py +++ b/quixstreams/state/rocksdb/windowed/metadata.py @@ -1,3 +1,5 @@ LATEST_EXPIRED_WINDOW_TIMESTAMP_KEY = b"__expired_start_gt__" LATEST_EXPIRED_WINDOW_CF_NAME = "__expiration-index__" + +LATEST_TIMESTAMP_KEY = b"__topic_latest_timestamp__" diff --git a/quixstreams/state/rocksdb/windowed/partition.py b/quixstreams/state/rocksdb/windowed/partition.py index 52a19ac37..b9865d3ad 100644 --- a/quixstreams/state/rocksdb/windowed/partition.py +++ b/quixstreams/state/rocksdb/windowed/partition.py @@ -6,16 +6,11 @@ from quixstreams.state.serialization import int_from_int64_bytes, int_to_int64_bytes from quixstreams.state.recovery import ChangelogProducer -from .metadata import LATEST_EXPIRED_WINDOW_CF_NAME +from .metadata import LATEST_EXPIRED_WINDOW_CF_NAME, LATEST_TIMESTAMP_KEY from .transaction import WindowedRocksDBPartitionTransaction from .. import ColumnFamilyDoesNotExist -from ..metadata import ( - METADATA_CF_NAME, - LATEST_TIMESTAMP_KEY, -) -from ..partition import ( - RocksDBStorePartition, -) +from ..metadata import METADATA_CF_NAME +from ..partition import RocksDBStorePartition from ..types import RocksDBOptionsType logger = logging.getLogger(__name__) @@ -85,7 +80,7 @@ def write( ): batch = WriteBatch(raw_mode=True) - if latest_timestamp_ms: + if latest_timestamp_ms is not None: cf_handle = self.get_column_family_handle(METADATA_CF_NAME) batch.put( LATEST_TIMESTAMP_KEY, @@ -99,5 +94,5 @@ def write( batch=batch, ) - if latest_timestamp_ms: + if latest_timestamp_ms is not None: self.set_latest_timestamp(latest_timestamp_ms) diff --git a/quixstreams/state/serialization.py b/quixstreams/state/serialization.py index 421adc376..04cedcb80 100644 --- a/quixstreams/state/serialization.py +++ b/quixstreams/state/serialization.py @@ -4,6 +4,8 @@ from .exceptions import StateSerializationError __all__ = ( + "DumpsFunc", + "LoadsFunc", "serialize", "deserialize", "int_to_int64_bytes", From be50769e00baff8291acce3586ed8b681ee3d079 Mon Sep 17 00:00:00 2001 From: Quentin Dawans Date: Thu, 3 Oct 2024 11:58:08 +0200 Subject: [PATCH 6/6] docstrings --- quixstreams/state/base/partition.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quixstreams/state/base/partition.py b/quixstreams/state/base/partition.py index 99795027f..019568241 100644 --- a/quixstreams/state/base/partition.py +++ b/quixstreams/state/base/partition.py @@ -157,7 +157,7 @@ def _should_apply_changelog( """ Determine whether the changelog update should be skipped. - :param headers: changelog message headers + :param processed_offset: changelog message processed offset. :param committed_offset: latest committed offset of the source topic partition :return: True if update should be applied, else False. """