Skip to content

Commit dd7c2ab

Browse files
authored
sources: Ensure process was started before checking for error (#506)
If the first assignment a consumer get is empty the sources won't be started. In that situation the app run loop will check for error in sources and fail.
1 parent d09e7fa commit dd7c2ab

File tree

2 files changed

+7
-1
lines changed

2 files changed

+7
-1
lines changed

quixstreams/sources/manager.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ def raise_for_error(self) -> None:
117117
Raise a `quixstreams.sources.manager.SourceException`
118118
if the child process was terminated with an exception.
119119
"""
120+
if not self.started:
121+
return
122+
120123
if super().is_alive():
121124
return
122125

tests/test_quixstreams/test_sources/test_manager.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def test_terminate_source(self):
9999
"when,exitcode", [("run", 0), ("cleanup", 0), ("stop", -9)]
100100
)
101101
@pytest.mark.parametrize("pickleable", [True, False])
102-
def test_raise_for_error_run(self, when, exitcode, pickleable):
102+
def test_raise_for_error(self, when, exitcode, pickleable):
103103
manager = SourceManager()
104104

105105
finished = multiprocessing.Event()
@@ -110,6 +110,9 @@ def test_raise_for_error_run(self, when, exitcode, pickleable):
110110
source.configure(Topic("topic", None), None)
111111
process = manager.register(source)
112112

113+
# never raise when not started
114+
manager.raise_for_error()
115+
113116
manager.start_sources()
114117

115118
finished.wait(1)

0 commit comments

Comments
 (0)