Skip to content

Commit 16289d7

Browse files
authored
Delay starting sources until the consumer get assigned partitions (#495)
1 parent 97c72f3 commit 16289d7

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

quixstreams/app.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,7 @@ def _run_dataframe(self, dataframe):
862862

863863
def _run_sources(self):
864864
self._running = True
865+
self._source_manager.start_sources()
865866
while self._running:
866867
self._source_manager.raise_for_error()
867868

@@ -947,6 +948,12 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]):
947948
return
948949
logger.debug(f"Rebalancing: assigning partitions")
949950

951+
# Only start the sources once the consumer is assigned. Otherwise a source
952+
# can produce data before the consumer starts. If that happens on a new
953+
# consumer with `auto_offset_reset` set to `latest` the consumer will not
954+
# get the source data.
955+
self._source_manager.start_sources()
956+
950957
# First commit everything processed so far because assignment can take a while
951958
# and fail
952959
self._processing_context.commit_checkpoint(force=True)

quixstreams/sources/manager.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def __init__(self, source):
2828
self.source: BaseSource = source
2929

3030
self._exceptions: List[Exception] = []
31+
self._started = False
3132
self._stopping = False
3233

3334
# copy parent process log level to the child process
@@ -36,6 +37,10 @@ def __init__(self, source):
3637
# reader and writer pipe used to communicate from the child to the parent process
3738
self._rpipe, self._wpipe = multiprocessing.Pipe(duplex=False)
3839

40+
@property
41+
def started(self):
42+
return self._started
43+
3944
# --- CHILD PROCESS METHODS --- #
4045

4146
def _setup_signal_handlers(self):
@@ -54,6 +59,7 @@ def run(self) -> None:
5459
* Execution of the source `run` method
5560
* Reporting the source exceptions to the parent process
5661
"""
62+
self._started = True
5763
self._setup_signal_handlers()
5864
configure_logging(self._loglevel, str(self.source), pid=True)
5965
logger.info("Source started")
@@ -103,6 +109,7 @@ def _report_exception(self, err: BaseException) -> None:
103109

104110
def start(self) -> "SourceProcess":
105111
logger.info("Starting source %s", self.source)
112+
self._started = True
106113
return super().start()
107114

108115
def raise_for_error(self) -> None:
@@ -186,7 +193,8 @@ def topics(self) -> List[Topic]:
186193

187194
def start_sources(self) -> None:
188195
for process in self.processes:
189-
process.start()
196+
if not process.started:
197+
process.start()
190198

191199
def stop_sources(self) -> None:
192200
for process in self.processes:
@@ -221,7 +229,6 @@ def is_alive(self) -> bool:
221229
return False
222230

223231
def __enter__(self):
224-
self.start_sources()
225232
return self
226233

227234
def __exit__(self, *args, **kwargs):

0 commit comments

Comments
 (0)