Skip to content

Commit 1ae078b

Browse files
committed
Check partitions status after committing in checkpoint
1 parent 8acaf75 commit 1ae078b

File tree

3 files changed

+70
-5
lines changed

3 files changed

+70
-5
lines changed

quixstreams/checkpointing/checkpoint.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import time
33
from typing import Dict, Tuple
44

5-
from confluent_kafka import TopicPartition
5+
from confluent_kafka import TopicPartition, KafkaException
66

77
from quixstreams.kafka import Consumer
88
from quixstreams.rowproducer import RowProducer
@@ -12,7 +12,11 @@
1212
DEFAULT_STATE_STORE_NAME,
1313
)
1414
from quixstreams.state.exceptions import StoreTransactionFailed
15-
from .exceptions import InvalidStoredOffset, CheckpointProducerTimeout
15+
from .exceptions import (
16+
InvalidStoredOffset,
17+
CheckpointProducerTimeout,
18+
CheckpointConsumerCommitError,
19+
)
1620

1721
logger = logging.getLogger(__name__)
1822

@@ -145,7 +149,14 @@ def commit(self):
145149
for (topic, partition), offset in self._tp_offsets.items()
146150
]
147151
logger.debug("Checkpoint: commiting consumer")
148-
self._consumer.commit(offsets=offsets, asynchronous=False)
152+
try:
153+
partitions = self._consumer.commit(offsets=offsets, asynchronous=False)
154+
except KafkaException as e:
155+
raise CheckpointConsumerCommitError(e.args[0]) from None
156+
157+
for partition in partitions:
158+
if partition.error:
159+
raise CheckpointConsumerCommitError(partition.error)
149160

150161
# Step 4. Flush state store partitions to the disk together with changelog
151162
# offsets
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
from quixstreams.exceptions import QuixException
2+
from quixstreams.kafka.exceptions import KafkaConsumerException
23

34

45
class InvalidStoredOffset(QuixException): ...
56

67

78
class CheckpointProducerTimeout(QuixException): ...
9+
10+
11+
class CheckpointConsumerCommitError(KafkaConsumerException): ...

tests/test_quixstreams/test_checkpointing.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
from unittest.mock import patch, MagicMock
44

55
import pytest
6-
from confluent_kafka import TopicPartition
6+
from confluent_kafka import TopicPartition, KafkaException, KafkaError
77

88
from quixstreams.checkpointing import Checkpoint, InvalidStoredOffset
9-
from quixstreams.checkpointing.exceptions import CheckpointProducerTimeout
9+
from quixstreams.checkpointing.exceptions import (
10+
CheckpointProducerTimeout,
11+
CheckpointConsumerCommitError,
12+
)
1013
from quixstreams.kafka import Consumer
1114
from quixstreams.rowproducer import RowProducer
1215
from quixstreams.state import StateStoreManager
@@ -354,3 +357,50 @@ def test_incomplete_flush(
354357
str(err.value)
355358
== "'1' messages failed to be produced before the producer flush timeout"
356359
)
360+
361+
def test_failed_commit(
362+
self, checkpoint_factory, state_manager_factory, rowproducer_mock
363+
):
364+
consumer_mock = MagicMock(spec_set=Consumer)
365+
consumer_mock.commit.side_effect = KafkaException(KafkaError(1, "test error"))
366+
367+
state_manager = state_manager_factory(producer=rowproducer_mock)
368+
checkpoint = checkpoint_factory(
369+
consumer_=consumer_mock,
370+
state_manager_=state_manager,
371+
producer_=rowproducer_mock,
372+
)
373+
checkpoint.store_offset("topic", 0, 0)
374+
375+
with pytest.raises(CheckpointConsumerCommitError) as err:
376+
checkpoint.commit()
377+
378+
assert (
379+
str(err.value)
380+
== '<CheckpointConsumerCommitError code="1" description="test error">'
381+
)
382+
383+
def test_failed_commit_partition(
384+
self, checkpoint_factory, state_manager_factory, rowproducer_mock
385+
):
386+
consumer_mock = MagicMock(spec_set=Consumer)
387+
388+
topic_partition = MagicMock(spec=TopicPartition)
389+
topic_partition.error = KafkaError(1, "test error")
390+
391+
consumer_mock.commit.return_value = [topic_partition]
392+
state_manager = state_manager_factory(producer=rowproducer_mock)
393+
checkpoint = checkpoint_factory(
394+
consumer_=consumer_mock,
395+
state_manager_=state_manager,
396+
producer_=rowproducer_mock,
397+
)
398+
checkpoint.store_offset("topic", 0, 0)
399+
400+
with pytest.raises(CheckpointConsumerCommitError) as err:
401+
checkpoint.commit()
402+
403+
assert (
404+
str(err.value)
405+
== '<CheckpointConsumerCommitError code="1" description="test error">'
406+
)

0 commit comments

Comments
 (0)