Skip to content

Commit 8acaf75

Browse files
authored
Configure the producer flush timeout with the max.poll.interval.ms (#382)
1 parent 6c6305d commit 8acaf75

File tree

6 files changed

+81
-29
lines changed

6 files changed

+81
-29
lines changed

quixstreams/app.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
# Enforce idempotent producing for the internal RowProducer
5151
_default_producer_extra_config = {"enable.idempotence": True}
5252

53+
_default_max_poll_interval_ms = 300000
54+
5355

5456
class Application:
5557
"""
@@ -260,6 +262,10 @@ def __init__(
260262
broker_address=broker_address,
261263
extra_config=producer_extra_config,
262264
on_error=on_producer_error,
265+
flush_timeout=consumer_extra_config.get(
266+
"max.poll.interval.ms", _default_max_poll_interval_ms
267+
)
268+
/ 1000, # convert to seconds
263269
)
264270
self._consumer_poll_timeout = consumer_poll_timeout
265271
self._producer_poll_timeout = producer_poll_timeout

quixstreams/checkpointing/checkpoint.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
DEFAULT_STATE_STORE_NAME,
1313
)
1414
from quixstreams.state.exceptions import StoreTransactionFailed
15-
from .exceptions import InvalidStoredOffset
15+
from .exceptions import InvalidStoredOffset, CheckpointProducerTimeout
1616

1717
logger = logging.getLogger(__name__)
1818

@@ -129,7 +129,13 @@ def commit(self):
129129

130130
# Step 2. Flush producer to trigger all delivery callbacks and ensure that
131131
# all messages are produced
132-
self._producer.flush()
132+
logger.debug("Checkpoint: flushing producer")
133+
unproduced_msg_count = self._producer.flush()
134+
if unproduced_msg_count > 0:
135+
raise CheckpointProducerTimeout(
136+
f"'{unproduced_msg_count}' messages failed to be produced before the producer flush timeout"
137+
)
138+
133139
# Get produced offsets after flushing the producer
134140
produced_offsets = self._producer.offsets
135141

@@ -138,6 +144,7 @@ def commit(self):
138144
TopicPartition(topic=topic, partition=partition, offset=offset + 1)
139145
for (topic, partition), offset in self._tp_offsets.items()
140146
]
147+
logger.debug("Checkpoint: commiting consumer")
141148
self._consumer.commit(offsets=offsets, asynchronous=False)
142149

143150
# Step 4. Flush state store partitions to the disk together with changelog

quixstreams/checkpointing/exceptions.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@
22

33

44
class InvalidStoredOffset(QuixException): ...
5+
6+
7+
class CheckpointProducerTimeout(QuixException): ...

quixstreams/kafka/producer.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ def __init__(
4848
logger: logging.Logger = logger,
4949
error_callback: Callable[[KafkaError], None] = _default_error_cb,
5050
extra_config: Optional[dict] = None,
51+
flush_timeout: Optional[int] = None,
5152
):
5253
"""
5354
A wrapper around `confluent_kafka.Producer`.
@@ -64,6 +65,7 @@ def __init__(
6465
:param extra_config: A dictionary with additional options that
6566
will be passed to `confluent_kafka.Producer` as is.
6667
Note: values passed as arguments override values in `extra_config`.
68+
:param flush_timeout: The time the producer is waiting for all messages to be delivered.
6769
"""
6870
if isinstance(broker_address, str):
6971
broker_address = ConnectionConfig(bootstrap_servers=broker_address)
@@ -76,6 +78,7 @@ def __init__(
7678
**{"logger": logger, "error_cb": error_callback},
7779
}
7880
self._inner_producer: Optional[ConfluentProducer] = None
81+
self._flush_timeout = flush_timeout or -1
7982

8083
def produce(
8184
self,
@@ -151,11 +154,13 @@ def flush(self, timeout: Optional[float] = None) -> int:
151154
Wait for all messages in the Producer queue to be delivered.
152155
153156
:param float timeout: time to attempt flushing (seconds).
154-
None or -1 is infinite. Default: None
157+
None use producer default or -1 is infinite. Default: None
155158
156159
:return: number of messages remaining to flush
157160
"""
158-
return self._producer.flush(timeout=timeout if timeout is not None else -1)
161+
return self._producer.flush(
162+
timeout=timeout if timeout is not None else self._flush_timeout
163+
)
159164

160165
@property
161166
def _producer(self) -> ConfluentProducer:

quixstreams/rowproducer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,20 @@ class RowProducer:
3131
If producer fails and the callback returns `True`, the exception
3232
will be logged but not propagated.
3333
The default callback logs an exception and returns `False`.
34+
:param flush_timeout: The time the producer is waiting for all messages to be delivered.
3435
"""
3536

3637
def __init__(
3738
self,
3839
broker_address: Union[str, ConnectionConfig],
3940
extra_config: dict = None,
4041
on_error: Optional[ProducerErrorCallback] = None,
42+
flush_timeout: Optional[int] = None,
4143
):
4244
self._producer = Producer(
4345
broker_address=broker_address,
4446
extra_config=extra_config,
47+
flush_timeout=flush_timeout,
4548
)
4649

4750
self._on_error: Optional[ProducerErrorCallback] = (

tests/test_quixstreams/test_checkpointing.py

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from confluent_kafka import TopicPartition
77

88
from quixstreams.checkpointing import Checkpoint, InvalidStoredOffset
9+
from quixstreams.checkpointing.exceptions import CheckpointProducerTimeout
910
from quixstreams.kafka import Consumer
1011
from quixstreams.rowproducer import RowProducer
1112
from quixstreams.state import StateStoreManager
@@ -31,6 +32,13 @@ def factory(
3132
return factory
3233

3334

35+
@pytest.fixture()
36+
def rowproducer_mock(request):
37+
p = MagicMock(spec_set=RowProducer)
38+
p.flush.return_value = getattr(request, "param", 0)
39+
return p
40+
41+
3442
class TestCheckpoint:
3543
def test_empty_true(self, checkpoint_factory):
3644
checkpoint = checkpoint_factory()
@@ -68,13 +76,18 @@ def test_commit_no_state_success(
6876
assert tp.offset == processed_offset + 1
6977

7078
def test_commit_with_state_no_changelog_success(
71-
self, checkpoint_factory, consumer, state_manager_factory, topic_factory
79+
self,
80+
checkpoint_factory,
81+
consumer,
82+
state_manager_factory,
83+
topic_factory,
84+
rowproducer_mock,
7285
):
7386
topic_name, _ = topic_factory()
74-
producer_mock = MagicMock(spec_set=RowProducer)
75-
state_manager = state_manager_factory(producer=producer_mock)
87+
88+
state_manager = state_manager_factory(producer=rowproducer_mock)
7689
checkpoint = checkpoint_factory(
77-
consumer_=consumer, state_manager_=state_manager, producer_=producer_mock
90+
consumer_=consumer, state_manager_=state_manager, producer_=rowproducer_mock
7891
)
7992
processed_offset = 999
8093
key, value, prefix = "key", "value", b"__key__"
@@ -95,7 +108,7 @@ def test_commit_with_state_no_changelog_success(
95108
assert tp.offset == processed_offset + 1
96109

97110
# Check the producer is flushed
98-
assert producer_mock.flush.call_count == 1
111+
assert rowproducer_mock.flush.call_count == 1
99112

100113
# Check the state is flushed
101114
assert tx.completed
@@ -186,34 +199,32 @@ def test_commit_with_state_and_changelog_no_updates_success(
186199
assert not store_partition.get_processed_offset()
187200

188201
def test_commit_no_offsets_stored_noop(
189-
self, checkpoint_factory, state_manager_factory, topic_factory
202+
self, checkpoint_factory, state_manager_factory, topic_factory, rowproducer_mock
190203
):
191204
topic_name, _ = topic_factory()
192-
producer_mock = MagicMock(spec_set=RowProducer)
193205
consumer_mock = MagicMock(spec_set=Consumer)
194-
state_manager = state_manager_factory(producer=producer_mock)
206+
state_manager = state_manager_factory(producer=rowproducer_mock)
195207
checkpoint = checkpoint_factory(
196208
consumer_=consumer_mock,
197209
state_manager_=state_manager,
198-
producer_=producer_mock,
210+
producer_=rowproducer_mock,
199211
)
200212
# Commit the checkpoint without processing any messages
201213
checkpoint.commit()
202214

203215
# Check nothing is committed
204216
assert not consumer_mock.commit.call_count
205-
assert not producer_mock.flush.call_count
217+
assert not rowproducer_mock.flush.call_count
206218

207219
def test_commit_has_failed_transactions_fails(
208-
self, checkpoint_factory, state_manager_factory, topic_factory
220+
self, checkpoint_factory, state_manager_factory, topic_factory, rowproducer_mock
209221
):
210-
producer_mock = MagicMock(spec_set=RowProducer)
211222
consumer_mock = MagicMock(spec_set=Consumer)
212-
state_manager = state_manager_factory(producer=producer_mock)
223+
state_manager = state_manager_factory(producer=rowproducer_mock)
213224
checkpoint = checkpoint_factory(
214225
consumer_=consumer_mock,
215226
state_manager_=state_manager,
216-
producer_=producer_mock,
227+
producer_=rowproducer_mock,
217228
)
218229
processed_offset = 999
219230
key, value, prefix = "key", "value", b"__key__"
@@ -240,20 +251,19 @@ def test_commit_has_failed_transactions_fails(
240251
checkpoint.commit()
241252

242253
# The producer should not flush
243-
assert not producer_mock.flush.call_count
254+
assert not rowproducer_mock.flush.call_count
244255
# Consumer should not commit
245256
assert not consumer_mock.commit.call_count
246257

247258
def test_commit_producer_flush_fails(
248-
self, checkpoint_factory, state_manager_factory, topic_factory
259+
self, checkpoint_factory, state_manager_factory, topic_factory, rowproducer_mock
249260
):
250-
producer_mock = MagicMock(spec_set=RowProducer)
251261
consumer_mock = MagicMock(spec_set=Consumer)
252-
state_manager = state_manager_factory(producer=producer_mock)
262+
state_manager = state_manager_factory(producer=rowproducer_mock)
253263
checkpoint = checkpoint_factory(
254264
consumer_=consumer_mock,
255265
state_manager_=state_manager,
256-
producer_=producer_mock,
266+
producer_=rowproducer_mock,
257267
)
258268
processed_offset = 999
259269
key, value, prefix = "key", "value", b"__key__"
@@ -266,7 +276,7 @@ def test_commit_producer_flush_fails(
266276
tx.set(key=key, value=value, prefix=prefix)
267277
checkpoint.store_offset("topic", 0, processed_offset)
268278

269-
producer_mock.flush.side_effect = ValueError("Flush failure")
279+
rowproducer_mock.flush.side_effect = ValueError("Flush failure")
270280
# Checkpoint commit should fail if producer failed to flush
271281
with pytest.raises(ValueError):
272282
checkpoint.commit()
@@ -278,15 +288,14 @@ def test_commit_producer_flush_fails(
278288
assert not tx.completed
279289

280290
def test_commit_consumer_commit_fails(
281-
self, checkpoint_factory, state_manager_factory, topic_factory
291+
self, checkpoint_factory, state_manager_factory, topic_factory, rowproducer_mock
282292
):
283-
producer_mock = MagicMock(spec_set=RowProducer)
284293
consumer_mock = MagicMock(spec_set=Consumer)
285-
state_manager = state_manager_factory(producer=producer_mock)
294+
state_manager = state_manager_factory(producer=rowproducer_mock)
286295
checkpoint = checkpoint_factory(
287296
consumer_=consumer_mock,
288297
state_manager_=state_manager,
289-
producer_=producer_mock,
298+
producer_=rowproducer_mock,
290299
)
291300
processed_offset = 999
292301
key, value, prefix = "key", "value", b"__key__"
@@ -305,7 +314,7 @@ def test_commit_consumer_commit_fails(
305314
checkpoint.commit()
306315

307316
# Producer should flush
308-
assert producer_mock.flush.call_count
317+
assert rowproducer_mock.flush.call_count
309318
# The transaction should remain prepared, but not completed
310319
assert tx.prepared
311320
assert not tx.completed
@@ -326,3 +335,22 @@ def test_get_store_transaction_success(self, checkpoint_factory, state_manager):
326335
assert tx
327336
tx2 = checkpoint.get_store_transaction("topic", 0, "default")
328337
assert tx2 is tx
338+
339+
@pytest.mark.parametrize("rowproducer_mock", [1], indirect=True)
340+
def test_incomplete_flush(
341+
self, checkpoint_factory, consumer, state_manager_factory, rowproducer_mock
342+
):
343+
344+
state_manager = state_manager_factory(producer=rowproducer_mock)
345+
checkpoint = checkpoint_factory(
346+
consumer_=consumer, state_manager_=state_manager, producer_=rowproducer_mock
347+
)
348+
checkpoint.store_offset("topic", 0, 0)
349+
350+
with pytest.raises(CheckpointProducerTimeout) as err:
351+
checkpoint.commit()
352+
353+
assert (
354+
str(err.value)
355+
== "'1' messages failed to be produced before the producer flush timeout"
356+
)

0 commit comments

Comments
 (0)