Skip to content

Commit f8f3cab

Browse files
authored
Log recovery progress (#941)
* Log recovery progress * Log recovery status from RecoveryManager * Use correct changelog offset
1 parent bbcccde commit f8f3cab

File tree

1 file changed

+28
-3
lines changed

1 file changed

+28
-3
lines changed

quixstreams/state/recovery.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import time
23
from typing import Dict, List, Optional
34

45
from confluent_kafka import OFFSET_BEGINNING
@@ -71,6 +72,10 @@ def __repr__(self):
7172
def changelog_name(self) -> str:
7273
return self._changelog_name
7374

75+
@property
76+
def changelog_highwater(self) -> int:
77+
return self._changelog_highwater
78+
7479
@property
7580
def partition_num(self) -> int:
7681
return self._partition_num
@@ -315,6 +320,7 @@ def __init__(self, consumer: BaseConsumer, topic_manager: TopicManager):
315320
self._consumer = consumer
316321
self._topic_manager = topic_manager
317322
self._recovery_partitions: Dict[int, Dict[str, RecoveryPartition]] = {}
323+
self._last_progress_logged_time = time.monotonic()
318324

319325
@property
320326
def partitions(self) -> Dict[int, Dict[str, RecoveryPartition]]:
@@ -529,9 +535,7 @@ def revoke_partition(self, partition_num: int):
529535
def _update_recovery_status(self):
530536
rp_revokes = []
531537
for rp in dict_values(self._recovery_partitions):
532-
position = self._consumer.position(
533-
[ConfluentPartition(rp.changelog_name, rp.partition_num)]
534-
)[0].offset
538+
position = self._get_changelog_offset(rp)
535539
rp.set_recovery_consume_position(position)
536540
if rp.finished_recovery_check:
537541
rp_revokes.append(rp)
@@ -549,12 +553,33 @@ def _recovery_loop(self) -> None:
549553
A RecoveryPartition is unassigned immediately once fully updated.
550554
"""
551555
while self.recovering:
556+
self._log_recovery_progress()
552557
if (msg := self._consumer.poll(1)) is None:
553558
self._update_recovery_status()
554559
else:
555560
msg = raise_for_msg_error(msg)
556561
rp = self._recovery_partitions[msg.partition()][msg.topic()]
557562
rp.recover_from_changelog_message(changelog_message=msg)
558563

564+
def _log_recovery_progress(self) -> None:
565+
"""
566+
Periodically log the recovery progress of all RecoveryPartitions.
567+
"""
568+
if self._last_progress_logged_time < time.monotonic() - 10:
569+
for rp in dict_values(self._recovery_partitions):
570+
last_consumed_offset = self._get_changelog_offset(rp) - 1
571+
logger.info(
572+
f"Recovery progress for {rp}: {last_consumed_offset} / {rp.changelog_highwater}"
573+
)
574+
self._last_progress_logged_time = time.monotonic()
575+
576+
def _get_changelog_offset(self, rp: RecoveryPartition) -> int:
577+
"""
578+
Get the current offset of the changelog partition.
579+
"""
580+
return self._consumer.position(
581+
[ConfluentPartition(rp.changelog_name, rp.partition_num)]
582+
)[0].offset
583+
559584
def stop_recovery(self):
560585
self._running = False

0 commit comments

Comments
 (0)