Skip to content

Commit c02e759

Browse files
authored
Exactly Once Processing (#385)
1 parent 72f658a commit c02e759

File tree

21 files changed

+1091
-256
lines changed

21 files changed

+1091
-256
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Quix Streams has the following benefits:
1818
- Support for many serialization formats, including JSON (and Quix-specific).
1919
- Support for stateful operations using RocksDB.
2020
- Support for aggregations over tumbling and hopping time windows.
21-
- "At-least-once" Kafka processing guarantees.
21+
- "At-least-once" and "exactly-once" Kafka processing guarantees.
2222
- Designed to run and scale resiliently via container orchestration (like Kubernetes).
2323
- Easily runs locally and in Jupyter Notebook for convenient development and debugging.
2424
- Seamless integration with the fully managed [Quix Cloud](https://quix.io/product) platform.
@@ -160,9 +160,9 @@ Here are some of the planned improvements:
160160
- [x] [Windowed aggregations over Tumbling & Hopping windows](https://quix.io/docs/quix-streams/v2-0-latest/windowing.html)
161161
- [x] [State recovery based on Kafka changelog topics](https://quix.io/docs/quix-streams/advanced/stateful-processing.html#fault-tolerance-recovery)
162162
- [x] [Group-by operation](https://quix.io/docs/quix-streams/groupby.html)
163+
- [X] ["Exactly Once" delivery guarantees for Kafka message processing (AKA transactions)](https://quix.io/docs/quix-streams/configuration.html#processing-guarantees)
163164
- [ ] Joins
164165
- [ ] Windowed aggregations over Sliding windows
165-
- [ ] "Exactly Once" delivery guarantees for Kafka message processing (AKA transactions)
166166
- [ ] Support for Avro and Protobuf formats
167167
- [ ] Schema Registry support
168168

docs/configuration.md

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ See more `auto.offset.reset` in this [article](https://www.quix.io/blog/kafka-au
3535
**Options**: `"latest"`, `"earliest"`.
3636
**Default** - `"latest"`.
3737

38+
- **`processing_guarantee`** - Use "at-least-once" or "exactly-once" processing guarantees.
39+
See [Processing Guarantees](#processing-guarantees) for more information.
40+
**Options**: `"at-least-once"` or `"exactly-once".
41+
**Default** - `"at-least-once"`.
3842

3943
## Authentication
4044

@@ -83,6 +87,77 @@ app = Application(
8387
`ConnectionConfig.from_librdkafka_dict(config, ignore_extras=True)` will additionally
8488
ignore irrelevant settings (but you will lose some validation checks).
8589

90+
## Processing Guarantees
91+
92+
This section concerns the `processing_guarantee` setting.
93+
94+
### What are "processing guarantees"?
95+
Kafka broadly has three guarantee levels/semantics associated with handling messages.
96+
97+
From weakest to strongest: `at-most-once`, `at-least-once`, and `exactly-once`.
98+
Stronger guarantees generally have larger overhead, and thus reduced speed.
99+
100+
These guarantees can be read literally: when consuming Kafka messages, you can
101+
guarantee each will be processed `X` times:
102+
103+
- `at-most-once` - a message will be processed not more than once.
104+
If the processing fails, the message will not be processed again.
105+
- `at-least-once` - a message may be processed more than once.
106+
It may lead to duplicates in the output topics, but the message won’t be lost and is guaranteed to be processed.
107+
- `exactly-once` - a message will be processed exactly once.
108+
The message is guaranteed to be processed without duplicated outputs but at the cost of higher latency and complexity.
109+
110+
### What options does Quix Streams offer?
111+
112+
Currently, Quix Streams offers `at-least-once` and `exactly-once`.
113+
114+
The default guarantee is `at-least-once`.
115+
116+
### Performance comparison
117+
118+
It is difficult to offer hard numbers for the speed difference between `exactly-once`
119+
and `at-least-once` because many factors are at play, though latency is perhaps
120+
the main contributor since `exactly-once` requires more communication with the broker.
121+
122+
With default `Application` settings, `exactly-once` will likely perform at worst
123+
10% slower than `at-least-once`, but given the nature of infrastructure/architecture,
124+
even this is not a guarantee.
125+
126+
You can minimize the impact of latency somewhat by adjusting the [`commit_interval` of
127+
the `Application`](#main-configuration-parameters), but be aware of [the
128+
performance considerations around `commit_interval`](advanced/checkpointing.md)).
129+
130+
131+
### Exactly-Once producer buffering
132+
133+
Because `exactly-once` messages do not fully commit or produce until the
134+
[checkpoint](advanced/checkpointing.md) successfully completes, any resulting produced
135+
messages are not immediately readable like with `at-least-once`.
136+
137+
Basically, the [`commit_interval` of an `Application`](#main-configuration-parameters)
138+
can be interpreted as the maximum amount of time before a produced message can be read
139+
(depending on how far into the checkpoint a message is processed). If you adjust it,
140+
be aware of other [performance considerations around `commit_interval`](advanced/checkpointing.md)).
141+
142+
> NOTE: `groupby` doubles this effect since it produces to another topic under the hood.
143+
144+
145+
### What processing guarantee is right for me?
146+
147+
To pick the right guarantee, you need to weigh an increase in speed
148+
vs. the potential to double-process a result, which may require other infrastructural
149+
considerations to handle appropriately.
150+
151+
In general, you may consider using `at-least-once` guarantees when:
152+
- The latency and processing speed are the primary concerns.
153+
- The downstream consumers can gracefully handle duplicated messages.
154+
155+
You may consider using `exactly-once` instead when:
156+
- Consistency and correctness of the outputs are critical.
157+
- Downstream consumers of the output topics cannot handle duplicated data.
158+
- Note: you may want to pick a smaller value for the `commit_interval` to commit checkpoints more often and reduce the latency.
159+
For more information about tuning the `commit_interval`, see the ["Configuring the Checkpointing" page](advanced/checkpointing.md).
160+
86161

87162
## State
88163
- **`state_dir`** - path to the application state directory.

quixstreams/app.py

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import os
55
import signal
66
import warnings
7-
from typing import Optional, List, Callable, Union
7+
from typing import Optional, List, Callable, Union, Literal, get_args
88

99
from confluent_kafka import TopicPartition
1010
from typing_extensions import Self
@@ -44,6 +44,7 @@
4444
__all__ = ("Application",)
4545

4646
logger = logging.getLogger(__name__)
47+
ProcessingGuarantee = Literal["at-least-once", "exactly-once"]
4748
MessageProcessedCallback = Callable[[str, int, int], None]
4849

4950
# Enforce idempotent producing for the internal RowProducer
@@ -78,7 +79,7 @@ class Application:
7879
```python
7980
from quixstreams import Application
8081
81-
# Set up an `app = Application` and `sdf = StreamingDataFrame`;
82+
# Set up an `app = Application` and `sdf = StreamingDataFrame`;
8283
# add some operations to `sdf` and then run everything.
8384
8485
app = Application(broker_address='localhost:9092', consumer_group='group')
@@ -114,6 +115,7 @@ def __init__(
114115
topic_manager: Optional[TopicManager] = None,
115116
request_timeout: float = 30,
116117
topic_create_timeout: float = 60,
118+
processing_guarantee: ProcessingGuarantee = "at-least-once",
117119
):
118120
"""
119121
:param broker_address: Connection settings for Kafka.
@@ -162,6 +164,7 @@ def __init__(
162164
:param topic_manager: A `TopicManager` instance
163165
:param request_timeout: timeout (seconds) for REST-based requests
164166
:param topic_create_timeout: timeout (seconds) for topic create finalization
167+
:param processing_guarantee: Use "exactly-once" or "at-least-once" processing.
165168
166169
<br><br>***Error Handlers***<br>
167170
To handle errors, `Application` accepts callbacks triggered when
@@ -180,6 +183,13 @@ def __init__(
180183
> NOTE: It is recommended to just use `quix_sdk_token` instead.
181184
"""
182185
configure_logging(loglevel=loglevel)
186+
187+
if processing_guarantee not in get_args(ProcessingGuarantee):
188+
raise ValueError(
189+
f'Must provide a valid "processing_guarantee"; expected one of: '
190+
f'{get_args(ProcessingGuarantee)}, got "{processing_guarantee}"'
191+
)
192+
183193
producer_extra_config = producer_extra_config or {}
184194
consumer_extra_config = consumer_extra_config or {}
185195

@@ -248,6 +258,7 @@ def __init__(
248258
self._commit_interval = commit_interval
249259
self._producer_extra_config = producer_extra_config
250260
self._consumer_extra_config = consumer_extra_config
261+
self._processing_guarantee = processing_guarantee
251262
self._consumer = RowConsumer(
252263
broker_address=broker_address,
253264
consumer_group=consumer_group,
@@ -264,6 +275,7 @@ def __init__(
264275
"max.poll.interval.ms", _default_max_poll_interval_ms
265276
)
266277
/ 1000, # convert to seconds
278+
transactional=self._uses_exactly_once,
267279
)
268280
self._consumer_poll_timeout = consumer_poll_timeout
269281
self._producer_poll_timeout = producer_poll_timeout
@@ -303,12 +315,17 @@ def __init__(
303315
producer=self._producer,
304316
consumer=self._consumer,
305317
state_manager=self._state_manager,
318+
exactly_once=self._uses_exactly_once,
306319
)
307320

308321
@property
309-
def is_quix_app(self):
322+
def is_quix_app(self) -> bool:
310323
return self._is_quix_app
311324

325+
@property
326+
def _uses_exactly_once(self) -> bool:
327+
return self._processing_guarantee == "exactly-once"
328+
312329
@classmethod
313330
def Quix(
314331
cls,
@@ -331,6 +348,7 @@ def Quix(
331348
topic_manager: Optional[QuixTopicManager] = None,
332349
request_timeout: float = 30,
333350
topic_create_timeout: float = 60,
351+
processing_guarantee: Literal["at-least-once", "exactly-once"] = "exactly-once",
334352
) -> Self:
335353
"""
336354
>***NOTE:*** DEPRECATED: use Application with `quix_sdk_token` argument instead.
@@ -398,6 +416,7 @@ def Quix(
398416
:param topic_manager: A `QuixTopicManager` instance
399417
:param request_timeout: timeout (seconds) for REST-based requests
400418
:param topic_create_timeout: timeout (seconds) for topic create finalization
419+
:param processing_guarantee: Use "exactly-once" or "at-least-once" processing.
401420
402421
<br><br>***Error Handlers***<br>
403422
To handle errors, `Application` accepts callbacks triggered when
@@ -445,6 +464,7 @@ def Quix(
445464
request_timeout=request_timeout,
446465
topic_create_timeout=topic_create_timeout,
447466
quix_config_builder=quix_config_builder,
467+
processing_guarantee=processing_guarantee,
448468
)
449469
return app
450470

@@ -625,14 +645,18 @@ def get_consumer(self, auto_commit_enable: bool = True) -> Consumer:
625645
Create and return a pre-configured Consumer instance.
626646
The Consumer is initialized with params passed to Application.
627647
628-
It's useful for consuming data from Kafka outside the standard Application processing flow.
629-
(e.g. to consume test data from a topic).
630-
Using it within the StreamingDataFrame functions is not recommended, as it creates a new Consumer instance
648+
It's useful for consuming data from Kafka outside the standard
649+
Application processing flow.
650+
(e.g., to consume test data from a topic).
651+
Using it within the StreamingDataFrame functions is not recommended, as it
652+
creates a new Consumer instance
631653
each time, which is not optimized for repeated use in a streaming pipeline.
632654
633-
Note: By default this consumer does not autocommit consumed offsets to allow exactly-once processing.
655+
Note: By default, this consumer does not autocommit the consumed offsets to allow
656+
at-least-once processing.
634657
To store the offset call store_offsets() after processing a message.
635-
If autocommit is necessary set `enable.auto.offset.store` to True in the consumer config when creating the app.
658+
If autocommit is necessary set `enable.auto.offset.store` to True in
659+
the consumer config when creating the app.
636660
637661
Example Snippet:
638662
@@ -705,14 +729,16 @@ def run(
705729
f'broker_address="{self._broker_address}" '
706730
f'consumer_group="{self._consumer_group}" '
707731
f'auto_offset_reset="{self._auto_offset_reset}" '
708-
f"commit_interval={self._commit_interval}s"
732+
f"commit_interval={self._commit_interval}s "
733+
f'processing_guarantee="{self._processing_guarantee}"'
709734
)
710735
if self.is_quix_app:
711736
self._quix_runtime_init()
712737

713738
self._setup_topics()
714739

715740
exit_stack = contextlib.ExitStack()
741+
exit_stack.enter_context(self._processing_context)
716742
exit_stack.enter_context(self._state_manager)
717743
exit_stack.enter_context(self._consumer)
718744
exit_stack.push(

quixstreams/checkpointing/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
from .checkpoint import Checkpoint as Checkpoint
2-
from .exceptions import InvalidStoredOffset as InvalidStoredOffset
1+
from .checkpoint import Checkpoint
2+
from .exceptions import InvalidStoredOffset

quixstreams/checkpointing/checkpoint.py

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def __init__(
3333
producer: RowProducer,
3434
consumer: Consumer,
3535
state_manager: StateStoreManager,
36+
exactly_once: bool = False,
3637
):
3738
self._created_at = time.monotonic()
3839
# A mapping of <(topic, partition): processed offset>
@@ -45,6 +46,10 @@ def __init__(
4546
self._state_manager = state_manager
4647
self._consumer = consumer
4748
self._producer = producer
49+
self._exactly_once = exactly_once
50+
51+
if self._exactly_once:
52+
self._producer.begin_transaction()
4853

4954
def expired(self) -> bool:
5055
"""
@@ -102,6 +107,15 @@ def get_store_transaction(
102107
self._store_transactions[(topic, partition, store_name)] = transaction
103108
return transaction
104109

110+
def close(self):
111+
"""
112+
Perform cleanup (when the checkpoint is empty) instead of committing.
113+
114+
Needed for exactly-once, as Kafka transactions are timeboxed.
115+
"""
116+
if self._exactly_once:
117+
self._producer.abort_transaction()
118+
105119
def commit(self):
106120
"""
107121
Commit the checkpoint.
@@ -113,10 +127,6 @@ def commit(self):
113127
4. Flush each state store partition to the disk.
114128
"""
115129

116-
if not self._tp_offsets:
117-
# No messages have been processed during this checkpoint, return
118-
return
119-
120130
# Step 1. Produce the changelogs
121131
for (
122132
topic,
@@ -148,15 +158,20 @@ def commit(self):
148158
TopicPartition(topic=topic, partition=partition, offset=offset + 1)
149159
for (topic, partition), offset in self._tp_offsets.items()
150160
]
151-
logger.debug("Checkpoint: commiting consumer")
152-
try:
153-
partitions = self._consumer.commit(offsets=offsets, asynchronous=False)
154-
except KafkaException as e:
155-
raise CheckpointConsumerCommitError(e.args[0]) from None
161+
if self._exactly_once:
162+
self._producer.commit_transaction(
163+
offsets, self._consumer.consumer_group_metadata()
164+
)
165+
else:
166+
logger.debug("Checkpoint: committing consumer")
167+
try:
168+
partitions = self._consumer.commit(offsets=offsets, asynchronous=False)
169+
except KafkaException as e:
170+
raise CheckpointConsumerCommitError(e.args[0]) from None
156171

157-
for partition in partitions:
158-
if partition.error:
159-
raise CheckpointConsumerCommitError(partition.error)
172+
for partition in partitions:
173+
if partition.error:
174+
raise CheckpointConsumerCommitError(partition.error)
160175

161176
# Step 4. Flush state store partitions to the disk together with changelog
162177
# offsets
@@ -175,10 +190,6 @@ def commit(self):
175190
changelog_offset = (
176191
produced_offsets.get(changelog_tp) if changelog_tp is not None else None
177192
)
178-
if changelog_offset is not None:
179-
# Increment the changelog offset by one to match the high watermark
180-
# in Kafka
181-
changelog_offset += 1
182193
transaction.flush(
183194
processed_offset=offset, changelog_offset=changelog_offset
184195
)

quixstreams/kafka/consumer.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
Consumer as ConfluentConsumer,
1010
KafkaError,
1111
)
12-
from confluent_kafka.admin import ClusterMetadata
12+
from confluent_kafka.admin import ClusterMetadata, GroupMetadata
1313

1414
from .configuration import ConnectionConfig
1515
from quixstreams.exceptions import PartitionAssignmentError, KafkaPartitionError
@@ -548,6 +548,12 @@ def close(self):
548548
self._consumer.close()
549549
logger.debug("Kafka consumer closed")
550550

551+
def consumer_group_metadata(self) -> GroupMetadata:
552+
"""
553+
Used by the producer during consumer offset sending for an EOS transaction.
554+
"""
555+
return self._consumer.consumer_group_metadata()
556+
551557
@property
552558
def _consumer(self) -> ConfluentConsumer:
553559
"""

0 commit comments

Comments
 (0)