Skip to content

Commit 811a9d4

Browse files
Detach sinks and backpressure from topic-partitions (#786)
Co-authored-by: Remy Gwaramadze <gwaramadze@users.noreply.github.com>
1 parent a68d474 commit 811a9d4

File tree

20 files changed

+219
-303
lines changed

20 files changed

+219
-303
lines changed

quixstreams/app.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,9 @@ def __init__(
337337

338338
self._source_manager = SourceManager()
339339
self._sink_manager = SinkManager()
340-
self._pausing_manager = PausingManager(consumer=self._consumer)
340+
self._pausing_manager = PausingManager(
341+
consumer=self._consumer, topic_manager=self._topic_manager
342+
)
341343
self._processing_context = ProcessingContext(
342344
commit_interval=self._config.commit_interval,
343345
commit_every=self._config.commit_every,

quixstreams/checkpointing/checkpoint.py

Lines changed: 41 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -181,13 +181,46 @@ def commit(self):
181181
Commit the checkpoint.
182182
183183
This method will:
184-
1. Produce the changelogs for each state store
185-
2. Flush the producer to ensure everything is delivered.
186-
3. Commit topic offsets.
187-
4. Flush each state store partition to the disk.
184+
1. Flush the registered sinks if any
185+
2. Produce the changelogs for each state store
186+
3. Flush the producer to ensure everything is delivered.
187+
4. Commit topic offsets.
188+
5. Flush each state store partition to the disk.
188189
"""
189190

190-
# Step 1. Produce the changelogs
191+
# Step 1. Flush sinks
192+
logger.debug("Checkpoint: flushing sinks")
193+
backpressured = False
194+
for sink in self._sink_manager.sinks:
195+
if backpressured:
196+
# Drop the accumulated data for the other sinks
197+
# if one of them is backpressured to limit the number of duplicates
198+
# when the data is reprocessed again
199+
sink.on_paused()
200+
continue
201+
202+
try:
203+
sink.flush()
204+
except SinkBackpressureError as exc:
205+
logger.warning(
206+
f'Backpressure for sink "{sink}" is detected, '
207+
f"all partitions will be paused and resumed again "
208+
f"in {exc.retry_after}s"
209+
)
210+
# The backpressure is detected from the sink
211+
# Pause the assignment to let it cool down and seek it back to
212+
# the first processed offsets of this Checkpoint (it must be equal
213+
# to the last committed offset).
214+
self._pausing_manager.pause(
215+
resume_after=exc.retry_after,
216+
offsets_to_seek=self._starting_tp_offsets.copy(),
217+
)
218+
backpressured = True
219+
if backpressured:
220+
# Exit early if backpressure is detected
221+
return
222+
223+
# Step 2. Produce the changelogs
191224
for (
192225
topic,
193226
partition,
@@ -201,7 +234,7 @@ def commit(self):
201234
)
202235
transaction.prepare(processed_offsets={topic: offset})
203236

204-
# Step 2. Flush producer to trigger all delivery callbacks and ensure that
237+
# Step 3. Flush producer to trigger all delivery callbacks and ensure that
205238
# all messages are produced
206239
logger.debug("Checkpoint: flushing producer")
207240
unproduced_msg_count = self._producer.flush()
@@ -211,55 +244,10 @@ def commit(self):
211244
f"the producer flush timeout"
212245
)
213246

214-
logger.debug("Checkpoint: flushing sinks")
215-
sinks = self._sink_manager.sinks
216-
# Step 3. Flush sinks
217-
for (topic, partition), offset in self._tp_offsets.items():
218-
for sink in sinks:
219-
if self._pausing_manager.is_paused(topic=topic, partition=partition):
220-
# The topic-partition is paused, skip flushing other sinks for
221-
# this TP.
222-
# Note: when flushing multiple sinks for the same TP, some
223-
# of them can be flushed before one of the sinks is backpressured.
224-
sink.on_paused(topic=topic, partition=partition)
225-
continue
226-
227-
try:
228-
sink.flush(topic=topic, partition=partition)
229-
except SinkBackpressureError as exc:
230-
logger.warning(
231-
f'Backpressure for sink "{sink}" is detected, '
232-
f"the partition will be paused and resumed again "
233-
f"in {exc.retry_after}s; "
234-
f'partition="{topic}[{partition}]" '
235-
f"processed_offset={offset}"
236-
)
237-
# The backpressure is detected from the sink
238-
# Pause the partition to let it cool down and seek it back to
239-
# the first processed offset of this Checkpoint (it must be equal
240-
# to the last committed offset).
241-
offset_to_seek = self._starting_tp_offsets[(topic, partition)]
242-
self._pausing_manager.pause(
243-
topic=topic,
244-
partition=partition,
245-
resume_after=exc.retry_after,
246-
offset_to_seek=offset_to_seek,
247-
)
248-
249247
# Step 4. Commit offsets to Kafka
250-
# First, filter out offsets of the paused topic partitions.
251-
tp_offsets = {
252-
(topic, partition): offset
253-
for (topic, partition), offset in self._tp_offsets.items()
254-
if not self._pausing_manager.is_paused(topic=topic, partition=partition)
255-
}
256-
if not tp_offsets:
257-
# No offsets to commit because every partition is paused, exiting early
258-
return
259-
260248
offsets = [
261249
TopicPartition(topic=topic, partition=partition, offset=offset + 1)
262-
for (topic, partition), offset in tp_offsets.items()
250+
for (topic, partition), offset in self._tp_offsets.items()
263251
]
264252

265253
if self._exactly_once:
@@ -281,16 +269,7 @@ def commit(self):
281269
# offsets.
282270
# Get produced offsets after flushing the producer
283271
produced_offsets = self._producer.offsets
284-
for (
285-
topic,
286-
partition,
287-
store_name,
288-
), transaction in self._store_transactions.items():
289-
offset = tp_offsets.get((topic, partition))
290-
# Offset can be None if the partition is paused
291-
if offset is None:
292-
continue
293-
272+
for transaction in self._store_transactions.values():
294273
# Get the changelog topic-partition for the given transaction
295274
# It can be None if changelog topics are disabled in the app config
296275
changelog_tp = transaction.changelog_topic_partition

quixstreams/processing/context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def resume_ready_partitions(self):
9898
self.pausing_manager.resume_if_ready()
9999

100100
def on_partition_revoke(self, topic: str, partition: int):
101-
self.pausing_manager.revoke(topic=topic, partition=partition)
101+
self.pausing_manager.reset()
102102

103103
def __enter__(self):
104104
self.sink_manager.start_sinks()

quixstreams/processing/pausing.py

Lines changed: 54 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import logging
22
import sys
33
import time
4-
from typing import Dict, Tuple
54

65
from confluent_kafka import TopicPartition
76

87
from quixstreams.kafka import BaseConsumer
8+
from quixstreams.models import TopicManager
99

1010
logger = logging.getLogger(__name__)
1111

@@ -18,91 +18,79 @@ class PausingManager:
1818
the timeout is elapsed.
1919
"""
2020

21-
_paused_tps: Dict[Tuple[str, int], float]
21+
_resume_at: float
2222

23-
def __init__(self, consumer: BaseConsumer):
23+
def __init__(self, consumer: BaseConsumer, topic_manager: TopicManager):
2424
self._consumer = consumer
25-
self._paused_tps = {}
26-
self._next_resume_at = _MAX_FLOAT
25+
self._topic_manager = topic_manager
26+
self.reset()
2727

2828
def pause(
2929
self,
30-
topic: str,
31-
partition: int,
32-
offset_to_seek: int,
30+
offsets_to_seek: dict[tuple[str, int], int],
3331
resume_after: float,
3432
):
3533
"""
36-
Pause the topic-partition for a certain period of time.
34+
Pause all partitions for the certain period of time and seek the partitions
35+
provided in the `offsets_to_seek` dict.
3736
3837
This method is supposed to be called in case of backpressure from Sinks.
3938
"""
40-
if self.is_paused(topic=topic, partition=partition):
41-
# Exit early if the TP is already paused
42-
return
43-
44-
# Add a TP to the dict to avoid repetitive pausing
4539
resume_at = time.monotonic() + resume_after
46-
self._paused_tps[(topic, partition)] = resume_at
47-
# Remember when the next TP should be resumed to exit early
48-
# in the resume_if_ready() calls.
49-
# Partitions are rarely paused, but the resume checks can be done
50-
# thousands times a sec.
51-
self._next_resume_at = min(self._next_resume_at, resume_at)
52-
tp = TopicPartition(topic=topic, partition=partition, offset=offset_to_seek)
53-
position, *_ = self._consumer.position([tp])
54-
logger.debug(
55-
f'Pausing topic partition "{topic}[{partition}]" for {resume_after}s; '
56-
f"current_offset={position.offset}"
57-
)
58-
self._consumer.pause(partitions=[tp])
59-
# Seek the TP back to the "offset_to_seek" to start from it on resume.
60-
# The "offset_to_seek" is provided by the Checkpoint and is expected to be the
61-
# first offset processed in the checkpoint.
62-
logger.debug(
63-
f'Seek the paused partition "{topic}[{partition}]" back to '
64-
f"offset {tp.offset}"
65-
)
66-
self._consumer.seek(partition=tp)
67-
68-
def is_paused(self, topic: str, partition: int) -> bool:
69-
"""
70-
Check if the topic-partition is already paused
71-
"""
72-
return (topic, partition) in self._paused_tps
40+
self._resume_at = min(self._resume_at, resume_at)
41+
42+
# Pause only data TPs excluding changelog TPs
43+
non_changelog_tps = self._get_non_changelog_assigned_tps()
44+
45+
for tp in non_changelog_tps:
46+
position, *_ = self._consumer.position([tp])
47+
logger.debug(
48+
f'Pausing topic partition "{tp.topic}[{tp.partition}]" for {resume_after}s; '
49+
f"position={position.offset}"
50+
)
51+
self._consumer.pause(partitions=[tp])
52+
# Seek the TP back to the "offset_to_seek" to start from it on resume.
53+
# The "offset_to_seek" is provided by the Checkpoint and is expected to be the
54+
# first offset processed in the checkpoint.
55+
seek_offset = offsets_to_seek.get((tp.topic, tp.partition))
56+
if seek_offset is not None:
57+
logger.debug(
58+
f'Seek the paused partition "{tp.topic}[{tp.partition}]" back to '
59+
f"offset {seek_offset}"
60+
)
61+
self._consumer.seek(
62+
partition=TopicPartition(
63+
topic=tp.topic, partition=tp.partition, offset=seek_offset
64+
)
65+
)
7366

7467
def resume_if_ready(self):
7568
"""
76-
Resume consuming from topic-partitions after the wait period has elapsed.
69+
Resume consuming from assigned data partitions after the wait period has elapsed.
7770
"""
78-
now = time.monotonic()
79-
if self._next_resume_at > now:
80-
# Nothing to resume yet, exit early
71+
if self._resume_at > time.monotonic():
8172
return
8273

83-
tps_to_resume = [
84-
tp for tp, resume_at in self._paused_tps.items() if resume_at <= now
85-
]
86-
for topic, partition in tps_to_resume:
87-
logger.debug(f'Resuming topic partition "{topic}[{partition}]"')
74+
# Resume only data TPs excluding changelog TPs
75+
non_changelog_tps = self._get_non_changelog_assigned_tps()
76+
77+
for tp in non_changelog_tps:
78+
logger.debug(f'Resuming topic partition "{tp.topic}[{tp.partition}]"')
8879
self._consumer.resume(
89-
partitions=[TopicPartition(topic=topic, partition=partition)]
80+
partitions=[TopicPartition(topic=tp.topic, partition=tp.partition)]
9081
)
91-
self._paused_tps.pop((topic, partition))
92-
self._reset_next_resume_at()
82+
self.reset()
9383

94-
def revoke(self, topic: str, partition: int):
84+
def reset(self):
85+
# Reset the timeout back to its initial state
86+
self._resume_at = _MAX_FLOAT
87+
88+
def _get_non_changelog_assigned_tps(self) -> list[TopicPartition]:
9589
"""
96-
Remove partition from the list of paused TPs if it's revoked
90+
Get assigned topic partitions for non-changelog topics.
9791
"""
98-
tp = (topic, partition)
99-
if tp not in self._paused_tps:
100-
return
101-
self._paused_tps.pop(tp)
102-
self._reset_next_resume_at()
103-
104-
def _reset_next_resume_at(self):
105-
if self._paused_tps:
106-
self._next_resume_at = min(self._paused_tps.values())
107-
else:
108-
self._next_resume_at = _MAX_FLOAT
92+
return [
93+
tp
94+
for tp in self._consumer.assignment()
95+
if tp.topic in self._topic_manager.non_changelog_topics
96+
]

quixstreams/sinks/base/exceptions.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,12 @@ class SinkBackpressureError(QuixException):
88
An exception to be raised by Sinks during flush() call
99
to signal a backpressure event to the application.
1010
11-
When raised, the app will drop the accumulated sink batch,
12-
pause the corresponding topic partition for
13-
a timeout specified in `retry_after`, and resume it when it's elapsed.
11+
When raised, the app will drop the accumulated sink batches,
12+
pause all assigned topic partitions for
13+
a timeout specified in `retry_after`, and resume them when it's elapsed.
1414
1515
:param retry_after: a timeout in seconds to pause for
16-
:param topic: a topic name to pause
17-
:param partition: a partition number to pause
1816
"""
1917

20-
def __init__(self, retry_after: float, topic: str, partition: int):
18+
def __init__(self, retry_after: float):
2119
self.retry_after = retry_after
22-
self.topic = topic
23-
self.partition = partition

0 commit comments

Comments
 (0)