Skip to content

Commit 664557e

Browse files
authored
Increase producer poll timeout and retries (#771)
1 parent d4846b7 commit 664557e

File tree

4 files changed

+20
-11
lines changed

4 files changed

+20
-11
lines changed

quixstreams/kafka/producer.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
KafkaError._DESTROY, # noqa: SLF001
3131
)
3232

33+
PRODUCER_POLL_TIMEOUT = 30.0
34+
PRODUCER_ON_ERROR_RETRIES = 10
35+
3336

3437
def _default_error_cb(error: KafkaError):
3538
error_code = error.code()
@@ -86,8 +89,8 @@ def produce(
8689
headers: Optional[Headers] = None,
8790
partition: Optional[int] = None,
8891
timestamp: Optional[int] = None,
89-
poll_timeout: float = 5.0,
90-
buffer_error_max_tries: int = 3,
92+
poll_timeout: float = PRODUCER_POLL_TIMEOUT,
93+
buffer_error_max_tries: int = PRODUCER_ON_ERROR_RETRIES,
9194
on_delivery: Optional[DeliveryCallback] = None,
9295
):
9396
"""

quixstreams/rowproducer.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@
1111
from .error_callbacks import ProducerErrorCallback, default_on_producer_error
1212
from .kafka.configuration import ConnectionConfig
1313
from .kafka.exceptions import KafkaProducerDeliveryError
14-
from .kafka.producer import Producer, TransactionalProducer
14+
from .kafka.producer import (
15+
PRODUCER_ON_ERROR_RETRIES,
16+
PRODUCER_POLL_TIMEOUT,
17+
Producer,
18+
TransactionalProducer,
19+
)
1520
from .models import Headers, Row, Topic
1621

1722
logger = logging.getLogger(__name__)
@@ -180,8 +185,8 @@ def produce(
180185
headers: Optional[Headers] = None,
181186
partition: Optional[int] = None,
182187
timestamp: Optional[int] = None,
183-
poll_timeout: float = 5.0,
184-
buffer_error_max_tries: int = 3,
188+
poll_timeout: float = PRODUCER_POLL_TIMEOUT,
189+
buffer_error_max_tries: int = PRODUCER_ON_ERROR_RETRIES,
185190
):
186191
self._raise_for_error()
187192

quixstreams/sources/base/source.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import Callable, Optional, Union
44

55
from quixstreams.checkpointing.exceptions import CheckpointProducerTimeout
6+
from quixstreams.kafka.producer import PRODUCER_ON_ERROR_RETRIES, PRODUCER_POLL_TIMEOUT
67
from quixstreams.models.messages import KafkaMessage
78
from quixstreams.models.topics import Topic
89
from quixstreams.models.types import Headers
@@ -322,8 +323,8 @@ def produce(
322323
headers: Optional[Headers] = None,
323324
partition: Optional[int] = None,
324325
timestamp: Optional[int] = None,
325-
poll_timeout: float = 5.0,
326-
buffer_error_max_tries: int = 3,
326+
poll_timeout: float = PRODUCER_POLL_TIMEOUT,
327+
buffer_error_max_tries: int = PRODUCER_ON_ERROR_RETRIES,
327328
) -> None:
328329
"""
329330
Produce a message to the configured source topic in Kafka.

tests/test_quixstreams/test_sources/test_core/test_csv.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ def test_read(self, tmp_path, producer):
4444
assert producer.produce.called
4545
assert producer.produce.call_count == 5
4646
assert producer.produce.call_args.kwargs == {
47-
"buffer_error_max_tries": 3,
47+
"buffer_error_max_tries": 10,
4848
"headers": None,
4949
"key": b"key5",
5050
"partition": None,
51-
"poll_timeout": 5.0,
51+
"poll_timeout": 30.0,
5252
"timestamp": 5,
5353
"topic": name,
5454
"value": b'{"key":"key5","field":"value5","timestamp":"5"}',
@@ -79,11 +79,11 @@ def test_read_no_extractors(self, tmp_path, producer):
7979
assert producer.produce.called
8080
assert producer.produce.call_count == 5
8181
assert producer.produce.call_args.kwargs == {
82-
"buffer_error_max_tries": 3,
82+
"buffer_error_max_tries": 10,
8383
"headers": None,
8484
"key": None,
8585
"partition": None,
86-
"poll_timeout": 5.0,
86+
"poll_timeout": 30.0,
8787
"timestamp": None,
8888
"topic": name,
8989
"value": b'{"key":"key5","field":"value5","timestamp":"5"}',

0 commit comments

Comments
 (0)