Skip to content

Commit 3e686d0

Browse files
authored
Refactor recovery to support Stores belonging to multiple topics (#774)
1 parent 0115ebb commit 3e686d0

23 files changed

+468
-645
lines changed

quixstreams/app.py

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import signal
66
import time
77
import warnings
8+
from collections import defaultdict
89
from pathlib import Path
910
from typing import Callable, List, Literal, Optional, Protocol, Tuple, Type, Union
1011

@@ -923,40 +924,26 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]):
923924
non_changelog_tps = [
924925
tp for tp in topic_partitions if tp.topic in non_changelog_topics
925926
]
927+
committed_tps = self._consumer.committed(
928+
partitions=non_changelog_tps, timeout=30
929+
)
930+
committed_offsets: dict[int, dict[str, int]] = defaultdict(dict)
931+
for tp in committed_tps:
932+
if tp.error:
933+
raise RuntimeError(
934+
f"Failed to get committed offsets for "
935+
f'"{tp.topic}[{tp.partition}]" from the broker: {tp.error}'
936+
)
937+
committed_offsets[tp.partition][tp.topic] = tp.offset
938+
926939
for tp in non_changelog_tps:
927-
# Get the latest committed offset for the assigned topic partition
928-
tp_committed = self._consumer.committed([tp], timeout=30)[0]
929940
# Assign store partitions
930-
store_partitions = self._state_manager.on_partition_assign(
941+
self._state_manager.on_partition_assign(
931942
topic=tp.topic,
932943
partition=tp.partition,
933-
committed_offset=tp_committed.offset,
944+
committed_offsets=committed_offsets[tp.partition],
934945
)
935946

936-
# Check if the latest committed offset >= stored offset
937-
# Otherwise, the re-processed messages might use already updated
938-
# state, which can lead to inconsistent outputs
939-
stored_offsets = [
940-
offset
941-
for offset in (
942-
store_tp.get_processed_offset()
943-
for store_tp in store_partitions.values()
944-
)
945-
if offset is not None
946-
]
947-
min_stored_offset = min(stored_offsets) + 1 if stored_offsets else None
948-
if (
949-
min_stored_offset is not None
950-
and min_stored_offset > tp_committed.offset
951-
):
952-
logger.warning(
953-
f'Warning: offset "{tp_committed.offset}" '
954-
f"for topic partition "
955-
f'"{tp_committed.topic}[{tp_committed.partition}]" '
956-
f'is behind the stored offset "{min_stored_offset}". '
957-
f"It may lead to distortions in produced data."
958-
)
959-
960947
def _on_revoke(self, _, topic_partitions: List[TopicPartition]):
961948
"""
962949
Revoke partitions from consumer and state

quixstreams/checkpointing/checkpoint.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ def commit(self):
199199
f'Detected a failed transaction for store "{store_name}", '
200200
f"the checkpoint is aborted"
201201
)
202-
transaction.prepare(processed_offset=offset)
202+
transaction.prepare(processed_offsets={topic: offset})
203203

204204
# Step 2. Flush producer to trigger all delivery callbacks and ensure that
205205
# all messages are produced
@@ -299,6 +299,4 @@ def commit(self):
299299
changelog_offset = (
300300
produced_offsets.get(changelog_tp) if changelog_tp is not None else None
301301
)
302-
transaction.flush(
303-
processed_offset=offset, changelog_offset=changelog_offset
304-
)
302+
transaction.flush(changelog_offset=changelog_offset)

quixstreams/sources/base/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def _recover_state(self, source: StatefulSource) -> StorePartition:
160160
store_partitions = state_manager.on_partition_assign(
161161
topic=None,
162162
partition=source.assigned_store_partition,
163-
committed_offset=OFFSET_BEGINNING,
163+
committed_offsets={},
164164
)
165165

166166
if state_manager.recovery_required:

quixstreams/state/base/partition.py

Lines changed: 22 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,10 @@
22
from abc import ABC, abstractmethod
33
from typing import TYPE_CHECKING, Literal, Optional, Union
44

5-
from quixstreams.models import SuccessfulConfluentKafkaMessageProto
6-
from quixstreams.state.exceptions import ColumnFamilyHeaderMissing
75
from quixstreams.state.metadata import (
8-
CHANGELOG_CF_MESSAGE_HEADER,
9-
CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER,
106
Marker,
117
)
128
from quixstreams.state.serialization import DumpsFunc, LoadsFunc
13-
from quixstreams.utils.json import loads as json_loads
149

1510
from .transaction import PartitionTransaction, PartitionTransactionCache
1611

@@ -45,42 +40,34 @@ def __init__(
4540
def close(self): ...
4641

4742
@abstractmethod
48-
def _recover_from_changelog_message(
49-
self,
50-
changelog_message: SuccessfulConfluentKafkaMessageProto,
51-
cf_name: str,
52-
processed_offset: Optional[int],
53-
committed_offset: int,
54-
): ...
55-
56-
@abstractmethod
57-
def get_processed_offset(self) -> Optional[int]:
43+
def get_changelog_offset(self) -> Optional[int]:
5844
"""
59-
Get last processed offset for the given partition
45+
Get the changelog offset that the state is up-to-date with.
6046
:return: offset or `None` if there's no processed offset yet
6147
"""
6248
...
6349

6450
@abstractmethod
65-
def get_changelog_offset(self) -> Optional[int]:
51+
def write_changelog_offset(self, offset: int):
6652
"""
67-
Get offset that the changelog is up-to-date with.
68-
:return: offset or `None` if there's no processed offset yet
53+
Write a new changelog offset to the db.
54+
55+
To be used when we simply need to update the changelog offset without touching
56+
the actual data.
57+
58+
:param offset: new changelog offset
6959
"""
70-
...
7160

7261
@abstractmethod
7362
def write(
7463
self,
7564
cache: PartitionTransactionCache,
76-
processed_offset: Optional[int],
7765
changelog_offset: Optional[int],
7866
):
7967
"""
8068
Update the state with data from the update cache
8169
8270
:param cache: The modified data
83-
:param processed_offset: The offset processed to generate the data.
8471
:param changelog_offset: The changelog message offset of the data.
8572
"""
8673

@@ -92,7 +79,6 @@ def get(
9279
Get a key from the store
9380
9481
:param key: a key encoded to `bytes`
95-
:param default: a default value to return if the key is not found.
9682
:param cf_name: rocksdb column family name. Default - "default"
9783
:return: a value if the key is present in the store. Otherwise, `default`
9884
"""
@@ -107,6 +93,19 @@ def exists(self, key: bytes, cf_name: str = "default") -> bool:
10793
:return: `True` if the key is present, `False` otherwise.
10894
"""
10995

96+
@abstractmethod
97+
def recover_from_changelog_message(
98+
self, key: bytes, value: Optional[bytes], cf_name: str, offset: int
99+
):
100+
"""
101+
Updates state from a given changelog message.
102+
103+
:param key: changelog message key
104+
:param value: changelog message value
105+
:param cf_name: column family name
106+
:param offset: changelog message offset
107+
"""
108+
110109
def begin(self) -> PartitionTransaction:
111110
"""
112111
Start a new `PartitionTransaction`
@@ -120,58 +119,6 @@ def begin(self) -> PartitionTransaction:
120119
changelog_producer=self._changelog_producer,
121120
)
122121

123-
def recover_from_changelog_message(
124-
self,
125-
changelog_message: SuccessfulConfluentKafkaMessageProto,
126-
committed_offset: int,
127-
) -> None:
128-
"""
129-
Updates state from a given changelog message.
130-
131-
:param changelog_message: A raw Confluent message read from a changelog topic.
132-
:param committed_offset: latest committed offset for the partition
133-
"""
134-
headers = dict(changelog_message.headers() or ())
135-
# Parse the column family name from message headers
136-
cf_name = headers.get(CHANGELOG_CF_MESSAGE_HEADER, b"").decode()
137-
if not cf_name:
138-
raise ColumnFamilyHeaderMissing(
139-
f"Header '{CHANGELOG_CF_MESSAGE_HEADER}' missing from changelog message"
140-
)
141-
142-
# Parse the processed topic-partition-offset info from the changelog message
143-
# headers to determine whether the update should be applied or skipped.
144-
# It can be empty if the message was produced by the older version of the lib.
145-
processed_offset = json_loads(
146-
headers.get(CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER, b"null")
147-
)
148-
149-
self._recover_from_changelog_message(
150-
changelog_message,
151-
cf_name,
152-
processed_offset,
153-
committed_offset,
154-
)
155-
156-
def _should_apply_changelog(
157-
self, processed_offset: Optional[int], committed_offset: int
158-
) -> bool:
159-
"""
160-
Determine whether the changelog update should be skipped.
161-
162-
:param processed_offset: changelog message processed offset.
163-
:param committed_offset: latest committed offset of the source topic partition
164-
:return: True if update should be applied, else False.
165-
"""
166-
if processed_offset is not None:
167-
# Skip recovering from the message if its processed offset is ahead of the
168-
# current committed offset.
169-
# This way it will recover to a consistent state if the checkpointing code
170-
# produced the changelog messages but failed to commit
171-
# the source topic offset.
172-
return processed_offset < committed_offset
173-
return True
174-
175122
def __enter__(self):
176123
return self
177124

quixstreams/state/base/transaction.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from quixstreams.state.exceptions import InvalidChangelogOffset, StateTransactionError
2121
from quixstreams.state.metadata import (
2222
CHANGELOG_CF_MESSAGE_HEADER,
23-
CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER,
23+
CHANGELOG_PROCESSED_OFFSETS_MESSAGE_HEADER,
2424
DEFAULT_PREFIX,
2525
SEPARATOR,
2626
Marker,
@@ -398,7 +398,7 @@ def exists(self, key: K, prefix: bytes, cf_name: str = "default") -> bool:
398398
return self._partition.exists(key_serialized, cf_name=cf_name)
399399

400400
@validate_transaction_status(PartitionTransactionStatus.STARTED)
401-
def prepare(self, processed_offset: Optional[int]):
401+
def prepare(self, processed_offsets: Optional[dict[str, int]]):
402402
"""
403403
Produce changelog messages to the changelog topic for all changes accumulated
404404
in this transaction and prepare transaction to flush its state to the state
@@ -410,33 +410,32 @@ def prepare(self, processed_offset: Optional[int]):
410410
If changelog is disabled for this application, no updates will be produced
411411
to the changelog topic.
412412
413-
:param processed_offset: the offset of the latest processed message
413+
:param processed_offsets: the dict with <topic: offset> of the latest processed message
414414
"""
415415

416416
try:
417-
self._prepare(processed_offset=processed_offset)
417+
self._prepare(processed_offsets=processed_offsets)
418418
self._status = PartitionTransactionStatus.PREPARED
419419
except Exception:
420420
self._status = PartitionTransactionStatus.FAILED
421421
raise
422422

423-
def _prepare(self, processed_offset: Optional[int]):
423+
def _prepare(self, processed_offsets: Optional[dict[str, int]]):
424424
if self._changelog_producer is None:
425425
return
426426

427427
logger.debug(
428428
f"Flushing state changes to the changelog topic "
429429
f'topic_name="{self._changelog_producer.changelog_name}" '
430-
f"partition={self._changelog_producer.partition} "
431-
f"processed_offset={processed_offset}"
430+
f"partition={self._changelog_producer.partition}"
432431
)
433-
source_tp_offset_header = json_dumps(processed_offset)
432+
source_tp_offset_header = json_dumps(processed_offsets)
434433
column_families = self._update_cache.get_column_families()
435434

436435
for cf_name in column_families:
437436
headers: Headers = {
438437
CHANGELOG_CF_MESSAGE_HEADER: cf_name,
439-
CHANGELOG_PROCESSED_OFFSET_MESSAGE_HEADER: source_tp_offset_header,
438+
CHANGELOG_PROCESSED_OFFSETS_MESSAGE_HEADER: source_tp_offset_header,
440439
}
441440

442441
updates = self._update_cache.get_updates(cf_name=cf_name)
@@ -461,7 +460,6 @@ def _prepare(self, processed_offset: Optional[int]):
461460
)
462461
def flush(
463462
self,
464-
processed_offset: Optional[int] = None,
465463
changelog_offset: Optional[int] = None,
466464
):
467465
"""
@@ -476,18 +474,17 @@ def flush(
476474
not flush ANY data to the database including the offset to optimize
477475
I/O.
478476
479-
:param processed_offset: offset of the last processed message, optional.
480477
:param changelog_offset: offset of the last produced changelog message,
481478
optional.
482479
"""
483480
try:
484-
self._flush(processed_offset, changelog_offset)
481+
self._flush(changelog_offset)
485482
self._status = PartitionTransactionStatus.COMPLETE
486483
except Exception:
487484
self._status = PartitionTransactionStatus.FAILED
488485
raise
489486

490-
def _flush(self, processed_offset: Optional[int], changelog_offset: Optional[int]):
487+
def _flush(self, changelog_offset: Optional[int]):
491488
if self._update_cache.is_empty():
492489
return
493490

@@ -503,7 +500,6 @@ def _flush(self, processed_offset: Optional[int], changelog_offset: Optional[int
503500

504501
self._partition.write(
505502
cache=self._update_cache,
506-
processed_offset=processed_offset,
507503
changelog_offset=changelog_offset,
508504
)
509505

quixstreams/state/manager.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,15 +248,19 @@ def clear_stores(self) -> None:
248248
shutil.rmtree(self._state_dir, ignore_errors=True)
249249

250250
def on_partition_assign(
251-
self, topic: Optional[str], partition: int, committed_offset: int
251+
self,
252+
topic: Optional[str],
253+
partition: int,
254+
committed_offsets: dict[str, int],
252255
) -> Dict[str, StorePartition]:
253256
"""
254257
Assign store partitions for each registered store for the given `TopicPartition`
255258
and return a list of assigned `StorePartition` objects.
256259
257260
:param topic: Kafka topic name
258261
:param partition: Kafka topic partition
259-
:param committed_offset: latest committed offset for the partition
262+
:param committed_offsets: a dict with latest committed offsets
263+
of all assigned topics for this partition number.
260264
:return: list of assigned `StorePartition`
261265
"""
262266

@@ -268,7 +272,7 @@ def on_partition_assign(
268272
self._recovery_manager.assign_partition(
269273
topic=topic,
270274
partition=partition,
271-
committed_offset=committed_offset,
275+
committed_offsets=committed_offsets,
272276
store_partitions=store_partitions,
273277
)
274278
return store_partitions

0 commit comments

Comments
 (0)