Skip to content

Commit 2167b1b

Browse files
authored
Add commit_every configuration parameter (#416)
* Fix the validation in Checkpoint.store_offset()` * Add a `commit_every` config parameter
1 parent 8f126ab commit 2167b1b

File tree

6 files changed

+67
-11
lines changed

6 files changed

+67
-11
lines changed

docs/advanced/checkpointing.md

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,14 @@ We call this process a “checkpointing”.
66

77
The goal of checkpointing is to ensure that applications can recover after failures and reprocess records from Kafka producing the same results as if the failure never happened.
88

9-
Currently, Quix Streams provides *At-Least-Once* processing guarantees, which means that each incoming message will be processed, but it may happen multiple times and generate duplicated outputs.
9+
10+
Quix Streams supports both *At-Least-Once* and *Exactly-Once* processing guarantees, which can be changed using the `processing_guarantee` parameter.
11+
12+
- When At-Least-Once guarantee is enabled (the default), each incoming message is guaranteed to be processed, but it may happen multiple times and generate duplicated outputs in case of failure
13+
- If Exactly-Once guarantee is enabled, the outputs are guaranteed to be unique for every message at the cost of increased latency.
14+
15+
See the [Configuration](../configuration.md#processing-guarantees) page to learn more about processing guarantees.
16+
1017

1118
## Under the Hood
1219

@@ -61,9 +68,9 @@ During recovery, the app will apply changelog updates to the local state stores.
6168

6269
## Configuring the Checkpointing
6370

64-
Users may configure how often the checkpoints are committed by passing the `commit_interval` parameter to the `Application` class.
71+
Users may configure how often the checkpoints are committed by passing the `commit_interval` and `commit_every` parameters to the `Application` class.
6572

66-
The default commit interval is 5 seconds.
73+
By default, the `commit_interval` is 5 seconds, and the `commit_every` is 0, and only the commit interval is taken into account.
6774

6875
Changing the commit interval will have several implications for the application:
6976

@@ -80,8 +87,12 @@ Changing the commit interval will have several implications for the application:
8087
However, it will reduce memory usage and limit the number of potentially reprocessed messages, reducing duplicates.
8188

8289
- If `commit_interval` is set to `0`, the application will commit a checkpoint for every processed Kafka message.
90+
- If `commit_every` is set, the application will commit after processing N messages across all assigned partitions.
91+
- You may use `commit_interval` to get more granular control over the commit schedule.
92+
- For example, if `commit_every=1000` and `commit_interval=5.0`, the application will commit the checkpoint as soon as 1000 messages are processed or 5s interval is elapsed.
93+
94+
When configuring the `commit_interval` and `commit_every`, take into account such factors as the number of unique keys in the input topics, hardware, and infrastructure.
8395

84-
When configuring the commit interval, take into account such factors as the number of unique keys in the input topics, hardware, and infrastructure.
8596

8697
## Limitations
8798

docs/configuration.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,14 @@ Consumer group is also used in the state directory path and as a prefix for chan
2626

2727
- **`commit_interval`** - How often to commit the processed offsets and state in seconds.
2828
**Default** - `5.0`.
29-
See the [Checkpointing](advanced/checkpointing.md) page for more information about
29+
See the [Checkpointing](advanced/checkpointing.md#configuring-the-checkpointing) page for more information about
3030
the `commit_interval` parameter.
3131

32+
- **`commit_every`** - Commit the checkpoint after processing N offsets.
33+
**Default** - `0`.
34+
See the [Checkpointing](advanced/checkpointing.md#configuring-the-checkpointing) page for more information about
35+
the `commit_every` parameter.
36+
3237
- **`auto_offset_reset`** - Consumer `auto.offset.reset` setting.
3338
It determines where the consumer should start reading messages from.
3439
See more `auto.offset.reset` in this [article](https://www.quix.io/blog/kafka-auto-offset-reset-use-cases-and-pitfalls#the-auto-offset-reset-configuration).

quixstreams/app.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ def __init__(
9898
consumer_group: Optional[str] = None,
9999
auto_offset_reset: AutoOffsetReset = "latest",
100100
commit_interval: float = 5.0,
101+
commit_every: int = 0,
101102
consumer_extra_config: Optional[dict] = None,
102103
producer_extra_config: Optional[dict] = None,
103104
state_dir: str = "state",
@@ -138,6 +139,15 @@ def __init__(
138139
>***NOTE:*** Quix Applications will prefix it with the Quix workspace id.
139140
:param commit_interval: How often to commit the processed messages in seconds.
140141
Default - 5.0.
142+
:param commit_every: Commit the checkpoint after processing N messages.
143+
Use this parameter for more granular control of the commit schedule.
144+
If the value is > 0, the application will commit the checkpoint after
145+
processing the specified number of messages across all the assigned
146+
partitions.
147+
If the value is <= 0, only the `commit_interval` will be considered.
148+
Default - 0.
149+
>***NOTE:*** Only input offsets are counted, and the application
150+
> may produce more results than the number of incoming messages.
141151
:param auto_offset_reset: Consumer `auto.offset.reset` setting
142152
:param consumer_extra_config: A dictionary with additional options that
143153
will be passed to `confluent_kafka.Consumer` as is.
@@ -256,6 +266,7 @@ def __init__(
256266
self._consumer_group = consumer_group
257267
self._auto_offset_reset = auto_offset_reset
258268
self._commit_interval = commit_interval
269+
self._commit_every = commit_every
259270
self._producer_extra_config = producer_extra_config
260271
self._consumer_extra_config = consumer_extra_config
261272
self._processing_guarantee = processing_guarantee
@@ -312,6 +323,7 @@ def __init__(
312323
)
313324
self._processing_context = ProcessingContext(
314325
commit_interval=self._commit_interval,
326+
commit_every=commit_every,
315327
producer=self._producer,
316328
consumer=self._consumer,
317329
state_manager=self._state_manager,
@@ -730,6 +742,7 @@ def run(
730742
f'consumer_group="{self._consumer_group}" '
731743
f'auto_offset_reset="{self._auto_offset_reset}" '
732744
f"commit_interval={self._commit_interval}s "
745+
f"commit_every={self._commit_every} "
733746
f'processing_guarantee="{self._processing_guarantee}"'
734747
)
735748
if self.is_quix_app:

quixstreams/checkpointing/checkpoint.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def __init__(
3434
consumer: Consumer,
3535
state_manager: StateStoreManager,
3636
exactly_once: bool = False,
37+
commit_every: int = 0,
3738
):
3839
self._created_at = time.monotonic()
3940
# A mapping of <(topic, partition): processed offset>
@@ -47,15 +48,21 @@ def __init__(
4748
self._consumer = consumer
4849
self._producer = producer
4950
self._exactly_once = exactly_once
51+
self._commit_every = commit_every
52+
self._total_offsets_processed = 0
5053

5154
if self._exactly_once:
5255
self._producer.begin_transaction()
5356

5457
def expired(self) -> bool:
5558
"""
56-
Returns `True` if checkpoint deadline has expired.
59+
Returns `True` if checkpoint deadline has expired OR
60+
if the total number of processed offsets exceeded the "commit_every" limit
61+
when it's defined.
5762
"""
58-
return (time.monotonic() - self._commit_interval) >= self._created_at
63+
return (time.monotonic() - self._commit_interval) >= self._created_at or (
64+
0 < self._commit_every <= self._total_offsets_processed
65+
)
5966

6067
def empty(self) -> bool:
6168
"""
@@ -77,12 +84,13 @@ def store_offset(self, topic: str, partition: int, offset: int):
7784
# same checkpoint.
7885
# It shouldn't normally happen, but a lot of logic relies on it,
7986
# and it's better to be safe.
80-
if offset < stored_offset:
87+
if offset <= stored_offset:
8188
raise InvalidStoredOffset(
8289
f"Cannot store offset smaller or equal than already processed"
8390
f" one: {offset} <= {stored_offset}"
8491
)
8592
self._tp_offsets[(topic, partition)] = offset
93+
self._total_offsets_processed += 1
8694

8795
def get_store_transaction(
8896
self, topic: str, partition: int, store_name: str = DEFAULT_STATE_STORE_NAME

quixstreams/processing_context.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class ProcessingContext:
2929
consumer: RowConsumer
3030
state_manager: StateStoreManager
3131
exactly_once: bool = False
32+
commit_every: int = 0
3233
_checkpoint: Optional[Checkpoint] = dataclasses.field(
3334
init=False, repr=False, default=None
3435
)
@@ -53,9 +54,10 @@ def init_checkpoint(self):
5354
"""
5455
Initialize a new checkpoint
5556
"""
56-
logger.debug(f"Starting a checkpoint...")
57+
logger.debug(f"Initializing a checkpoint...")
5758
self._checkpoint = Checkpoint(
5859
commit_interval=self.commit_interval,
60+
commit_every=self.commit_every,
5961
state_manager=self.state_manager,
6062
producer=self.producer,
6163
consumer=self.consumer,

tests/test_quixstreams/test_checkpointing.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@
2121
def checkpoint_factory(state_manager, consumer, row_producer_factory):
2222
def factory(
2323
commit_interval: float = 1,
24+
commit_every: int = 0,
2425
consumer_: Optional[Consumer] = None,
2526
producer_: Optional[RowProducer] = None,
2627
state_manager_: Optional[StateStoreManager] = None,
2728
exactly_once: bool = False,
2829
):
2930
return Checkpoint(
3031
commit_interval=commit_interval,
32+
commit_every=commit_every,
3133
producer=producer_ or row_producer_factory(transactional=exactly_once),
3234
consumer=consumer_ or consumer,
3335
state_manager=state_manager_ or state_manager,
@@ -60,15 +62,30 @@ def test_exactly_once_init(self, checkpoint_factory):
6062
mock_producer.begin_transaction.assert_called()
6163

6264
@pytest.mark.parametrize("commit_interval, expired", [(0, True), (999, False)])
63-
def test_expired(self, commit_interval, expired, checkpoint_factory):
65+
def test_expired_with_commit_interval(
66+
self, commit_interval, expired, checkpoint_factory
67+
):
6468
checkpoint = checkpoint_factory(commit_interval=commit_interval)
6569
assert checkpoint.expired() == expired
6670

71+
def test_expired_with_commit_every(self, checkpoint_factory):
72+
checkpoint = checkpoint_factory(commit_interval=999, commit_every=2)
73+
checkpoint.store_offset("topic", 0, 0)
74+
assert not checkpoint.expired()
75+
76+
checkpoint.store_offset("topic", 0, 1)
77+
assert checkpoint.expired()
78+
79+
def test_expired_with_commit_every_and_commit_interval(self, checkpoint_factory):
80+
checkpoint = checkpoint_factory(commit_interval=0, commit_every=10)
81+
checkpoint.store_offset("topic", 0, 0)
82+
assert checkpoint.expired()
83+
6784
def test_store_already_processed_offset_fails(self, checkpoint_factory):
6885
checkpoint = checkpoint_factory()
6986
checkpoint.store_offset("topic", 0, 10)
7087
with pytest.raises(InvalidStoredOffset):
71-
checkpoint.store_offset("topic", 0, 9)
88+
checkpoint.store_offset("topic", 0, 10)
7289

7390
@pytest.mark.parametrize("exactly_once", [False, True])
7491
def test_commit_no_state_success(

0 commit comments

Comments
 (0)