Skip to content

Commit c51cdad

Browse files
Implement timestamp alignment by buffering the incoming Kafka messages (#857)
Co-authored-by: Remy Gwaramadze <gwaramadze@users.noreply.github.com>
1 parent 4b4476e commit c51cdad

File tree

12 files changed

+1070
-58
lines changed

12 files changed

+1070
-58
lines changed

quixstreams/app.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@
6969
# Enforce idempotent producing for the internal RowProducer
7070
_default_producer_extra_config = {"enable.idempotence": True}
7171

72+
# Default config for the internal consumer
73+
_default_consumer_extra_config = {
74+
"fetch.queue.backoff.ms": 100, # Make the consumer to fetch data more often
75+
}
76+
7277
# Force assignment strategy to be "range" for co-partitioning in internal Consumers
7378
consumer_extra_config_overrides = {"partition.assignment.strategy": "range"}
7479

@@ -151,6 +156,7 @@ def __init__(
151156
request_timeout: float = 30,
152157
topic_create_timeout: float = 60,
153158
processing_guarantee: ProcessingGuarantee = "at-least-once",
159+
max_partition_buffer_size: int = 10000,
154160
):
155161
"""
156162
:param broker_address: Connection settings for Kafka.
@@ -210,6 +216,11 @@ def __init__(
210216
:param request_timeout: timeout (seconds) for REST-based requests
211217
:param topic_create_timeout: timeout (seconds) for topic create finalization
212218
:param processing_guarantee: Use "exactly-once" or "at-least-once" processing.
219+
:param max_partition_buffer_size: the maximum number of messages to buffer per topic partition to consider it full.
220+
The buffering is used to consume messages in-order between multiple partitions with the same number.
221+
It is a soft limit, and the actual number of buffered messages can be up to x2 higher.
222+
Lower value decreases the memory use, but increases the latency.
223+
Default - `10000`.
213224
214225
<br><br>***Error Handlers***<br>
215226
To handle errors, `Application` accepts callbacks triggered when
@@ -305,7 +316,10 @@ def __init__(
305316
**_default_producer_extra_config,
306317
**producer_extra_config,
307318
},
308-
consumer_extra_config=consumer_extra_config,
319+
consumer_extra_config={
320+
**_default_consumer_extra_config,
321+
**consumer_extra_config,
322+
},
309323
processing_guarantee=processing_guarantee,
310324
consumer_poll_timeout=consumer_poll_timeout,
311325
producer_poll_timeout=producer_poll_timeout,
@@ -315,6 +329,7 @@ def __init__(
315329
state_dir=state_dir,
316330
rocksdb_options=rocksdb_options,
317331
use_changelog_topics=use_changelog_topics,
332+
max_partition_buffer_size=max_partition_buffer_size,
318333
)
319334

320335
self._on_message_processed = on_message_processed
@@ -634,6 +649,7 @@ def _get_rowconsumer(
634649
consumer_group=self._config.consumer_group,
635650
auto_offset_reset=self._config.auto_offset_reset,
636651
auto_commit_enable=False, # Disable auto commit and manage commits manually
652+
max_partition_buffer_size=self._config.max_partition_buffer_size,
637653
extra_config=extra_config,
638654
on_error=on_error,
639655
)
@@ -905,7 +921,10 @@ def _quix_runtime_init(self):
905921
def _process_message(self, dataframe_composed):
906922
# Serve producer callbacks
907923
self._producer.poll(self._config.producer_poll_timeout)
908-
rows = self._consumer.poll_row(timeout=self._config.consumer_poll_timeout)
924+
rows = self._consumer.poll_row(
925+
timeout=self._config.consumer_poll_timeout,
926+
buffered=self._dataframe_registry.requires_time_alignment,
927+
)
909928

910929
if rows is None:
911930
self._run_tracker.set_current_message_tp(None)
@@ -1100,6 +1119,7 @@ class ApplicationConfig(BaseSettings):
11001119
state_dir: Path = Path("state")
11011120
rocksdb_options: Optional[RocksDBOptionsType] = None
11021121
use_changelog_topics: bool = True
1122+
max_partition_buffer_size: int = 10000
11031123

11041124
@classmethod
11051125
def settings_customise_sources(

quixstreams/dataframe/dataframe.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1611,6 +1611,7 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame":
16111611
"""
16121612

16131613
merged_stream = self.stream.merge(other.stream)
1614+
self._registry.require_time_alignment()
16141615
return self.__dataframe_clone__(
16151616
*self.topics, *other.topics, stream=merged_stream
16161617
)

quixstreams/dataframe/registry.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,15 @@ def __init__(self) -> None:
2626
self._repartition_origins: set[str] = set()
2727
self._topics_to_stream_ids: dict[str, set[str]] = {}
2828
self._stream_ids_to_topics: dict[str, set[str]] = {}
29+
self._requires_time_alignment = False
30+
31+
@property
32+
def requires_time_alignment(self) -> bool:
33+
"""
34+
Check if registered StreamingDataFrames require topics to be read in timestamp-aligned way.
35+
That's normally required for the operations like `.concat()` and joins.
36+
"""
37+
return self._requires_time_alignment
2938

3039
@property
3140
def consumer_topics(self) -> list[Topic]:
@@ -131,3 +140,13 @@ def get_topics_for_stream_id(self, stream_id: str) -> list[str]:
131140
:return: a list of topic names
132141
"""
133142
return list(self._stream_ids_to_topics[stream_id])
143+
144+
def require_time_alignment(self):
145+
"""
146+
Require the time alignment for the topology.
147+
148+
This flag is set by individual StreamingDataFrames when certain operations like
149+
.concat() or joins are triggered, and it will inform the application to consume
150+
messages in the timestamp-aligned way for the correct processing.
151+
"""
152+
self._requires_time_alignment = True

quixstreams/kafka/consumer.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,22 @@ def consumer_group_metadata(self) -> GroupMetadata:
580580
"""
581581
return self._consumer.consumer_group_metadata()
582582

583+
def consume(
584+
self, num_messages: int = 1, timeout: Optional[float] = None
585+
) -> list[RawConfluentKafkaMessageProto]:
586+
"""
587+
Consumes a list of messages (possibly empty on timeout).
588+
Callbacks may be executed as a side effect of calling this method.
589+
590+
:param num_messages: The maximum number of messages to return.
591+
Default: `1`.
592+
:param timeout: The maximum time in seconds to block waiting for message, event or callback.
593+
Default: `None` (infinite).
594+
"""
595+
return self._consumer.consume(
596+
num_messages=num_messages, timeout=timeout if timeout is not None else -1
597+
)
598+
583599
@property
584600
def _consumer(self) -> ConfluentConsumer:
585601
"""

quixstreams/rowconsumer/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .consumer import RowConsumer as RowConsumer

0 commit comments

Comments
 (0)