Skip to content

Commit e52f475

Browse files
authored
bugfix for topic creation with quix-based external broker (#530)
Confirmed this solved the issue based on testing with @luisquix
1 parent 525e671 commit e52f475

File tree

3 files changed

+19
-10
lines changed

3 files changed

+19
-10
lines changed

quixstreams/platforms/quix/config.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ def create_topics(
371371
topics: List[Topic],
372372
timeout: Optional[float] = None,
373373
finalize_timeout: Optional[float] = None,
374+
prepend: bool = True,
374375
):
375376
"""
376377
Create topics in a Quix cluster.
@@ -379,17 +380,18 @@ def create_topics(
379380
:param timeout: response timeout (seconds); Default 30
380381
:param finalize_timeout: topic finalization timeout (seconds); Default 60
381382
marked as "Ready" (and thus ready to produce to/consume from).
383+
:param prepend: whether to prepend workspace_id during creation attempt.
382384
"""
383385
logger.info("Attempting to create topics...")
384386
current_topics = {t["id"]: t for t in self.get_topics(timeout=timeout)}
385387
finalize = set()
386388
for topic in topics:
387-
topic_name = self.prepend_workspace_id(topic.name)
388-
exists = self.prepend_workspace_id(topic_name) in current_topics
389-
if not exists or current_topics[topic_name]["status"] != "Ready":
389+
name = self.prepend_workspace_id(topic.name) if prepend else topic.name
390+
exists = name in current_topics
391+
if not exists or current_topics[name]["status"] != "Ready":
390392
if exists:
391393
logger.debug(
392-
f"Topic {self.strip_workspace_id_prefix(topic_name)} exists but does "
394+
f"Topic {self.strip_workspace_id_prefix(name)} exists but does "
393395
f"not have 'Ready' status. Added to finalize check."
394396
)
395397
else:
@@ -398,14 +400,14 @@ def create_topics(
398400
# TODO: more robust error handling to better identify issues
399401
# See how it's handled in the admin client and maybe consolidate
400402
# logic via TopicManager
401-
except HTTPError as e:
403+
except QuixApiRequestFailure as e:
402404
# Topic was maybe created by another instance
403-
if "already exists" not in e.response.text:
405+
if "already exists" not in e.error_text:
404406
raise
405-
finalize.add(topic_name)
407+
finalize.add(name)
406408
else:
407409
logger.debug(
408-
f"Topic {self.strip_workspace_id_prefix(topic_name)} exists and is Ready"
410+
f"Topic {self.strip_workspace_id_prefix(name)} exists and is Ready"
409411
)
410412
logger.info(
411413
"Topic creations acknowledged; waiting for 'Ready' statuses..."

quixstreams/platforms/quix/topic_manager.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,10 @@ def _create_topics(
6363
:param timeout: creation acknowledge timeout (seconds)
6464
:param create_timeout: topic finalization timeout (seconds)
6565
"""
66+
# note: prepending was already performed as needed, so we skip doing it
67+
# within `create_topics`
6668
self._quix_config_builder.create_topics(
67-
topics, timeout=timeout, finalize_timeout=create_timeout
69+
topics, timeout=timeout, finalize_timeout=create_timeout, prepend=False
6870
)
6971

7072
def _resolve_topic_name(self, name: str) -> str:

tests/test_quixstreams/test_platforms/test_quix/test_config.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,7 @@ def test_create_topics_parallel_create_attempt(self, topic_manager_topic_factory
596596

597597
mock_response = create_autospec(Response)
598598
mock_response.text = "already exists"
599+
mock_response.status_code = 400
599600

600601
timeout = 4.5
601602
finalize_timeout = 9.5
@@ -605,7 +606,11 @@ def test_create_topics_parallel_create_attempt(self, topic_manager_topic_factory
605606
)
606607
stack = ExitStack()
607608
create_topic = stack.enter_context(patch.object(cfg_builder, "_create_topic"))
608-
create_topic.side_effect = HTTPError(response=mock_response)
609+
create_topic.side_effect = QuixApiRequestFailure(
610+
url="url",
611+
status_code=mock_response.status_code,
612+
error_text=mock_response.text,
613+
)
609614
get_topics = stack.enter_context(patch.object(cfg_builder, "get_topics"))
610615
get_topics.return_value = get_topics_return
611616
finalize = stack.enter_context(patch.object(cfg_builder, "_finalize_create"))

0 commit comments

Comments
 (0)