Skip to content

Commit 51c8064

Browse files
authored
Bug: configuring broker_address take priority over quix config (#384)
1 parent 26db741 commit 51c8064

File tree

2 files changed

+40
-37
lines changed

2 files changed

+40
-37
lines changed

quixstreams/app.py

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ def __init__(
121121
Accepts string with Kafka broker host and port formatted as `<host>:<port>`,
122122
or a ConnectionConfig object if authentication is required.
123123
Either this OR `quix_sdk_token` must be set to use `Application` (not both).
124+
Takes priority over quix auto-configuration.
124125
Linked Environment Variable: `Quix__Broker__Address`.
125126
Default: `None`
126127
:param quix_sdk_token: If using the Quix Cloud, the SDK token to connect with.
@@ -198,27 +199,32 @@ def __init__(
198199
"Quix__Consumer_Group", "quixstreams-default"
199200
)
200201

201-
if quix_config_builder:
202-
quix_app_source = "Quix Config Builder"
203-
if quix_config_builder and quix_sdk_token:
204-
raise warnings.warn(
205-
"'quix_config_builder' is not necessary when an SDK token is defined; "
206-
"we recommend letting the Application generate it automatically"
207-
)
208-
209-
if quix_sdk_token and not quix_config_builder:
210-
quix_app_source = "Quix SDK Token"
211-
quix_config_builder = QuixKafkaConfigsBuilder(quix_sdk_token=quix_sdk_token)
202+
if broker_address:
203+
# If broker_address is passed to the app it takes priority over any quix configuration
204+
self._is_quix_app = False
205+
topic_manager_factory = TopicManager
206+
if isinstance(broker_address, str):
207+
broker_address = ConnectionConfig(bootstrap_servers=broker_address)
208+
else:
209+
self._is_quix_app = True
210+
211+
if quix_config_builder:
212+
quix_app_source = "Quix Config Builder"
213+
if quix_sdk_token:
214+
warnings.warn(
215+
"'quix_config_builder' is not necessary when an SDK token is defined; "
216+
"we recommend letting the Application generate it automatically"
217+
)
218+
elif quix_sdk_token:
219+
quix_app_source = "Quix SDK Token"
220+
quix_config_builder = QuixKafkaConfigsBuilder(
221+
quix_sdk_token=quix_sdk_token
222+
)
223+
else:
224+
raise ValueError(
225+
'Either "broker_address" or "quix_sdk_token" must be provided'
226+
)
212227

213-
if broker_address and quix_config_builder:
214-
raise ValueError(
215-
'Cannot provide both "broker_address" and "quix_sdk_token"'
216-
)
217-
elif not (broker_address or quix_config_builder):
218-
raise ValueError(
219-
'Either "broker_address" or "quix_sdk_token" must be provided'
220-
)
221-
elif quix_config_builder:
222228
# SDK Token or QuixKafkaConfigsBuilder were provided
223229
logger.info(
224230
f"{quix_app_source} detected; "
@@ -235,13 +241,6 @@ def __init__(
235241
consumer_group = quix_app_config.consumer_group
236242
consumer_extra_config.update(quix_app_config.librdkafka_extra_config)
237243
producer_extra_config.update(quix_app_config.librdkafka_extra_config)
238-
else:
239-
# Only broker address is provided
240-
topic_manager_factory = TopicManager
241-
if isinstance(broker_address, str):
242-
broker_address = ConnectionConfig(bootstrap_servers=broker_address)
243-
244-
self._is_quix_app = bool(quix_config_builder)
245244

246245
self._broker_address = broker_address
247246
self._consumer_group = consumer_group
@@ -306,6 +305,10 @@ def __init__(
306305
state_manager=self._state_manager,
307306
)
308307

308+
@property
309+
def is_quix_app(self):
310+
return self._is_quix_app
311+
309312
@classmethod
310313
def Quix(
311314
cls,
@@ -704,7 +707,7 @@ def run(
704707
f'auto_offset_reset="{self._auto_offset_reset}" '
705708
f"commit_interval={self._commit_interval}s"
706709
)
707-
if self._is_quix_app:
710+
if self.is_quix_app:
708711
self._quix_runtime_init()
709712

710713
self._setup_topics()

tests/test_quixstreams/test_app.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -757,12 +757,13 @@ def get_cfg_builder(quix_sdk_token):
757757
) as consumer_init_mock, patch(
758758
"quixstreams.app.RowProducer"
759759
) as producer_init_mock:
760-
Application(
760+
app = Application(
761761
consumer_group=consumer_group,
762762
quix_sdk_token=quix_sdk_token,
763763
consumer_extra_config=extra_config,
764764
producer_extra_config=extra_config,
765765
)
766+
assert app.is_quix_app
766767

767768
# Check if items from the Quix config have been passed
768769
# to the low-level configs of producer and consumer
@@ -895,14 +896,13 @@ def get_cfg_builder(quix_sdk_token):
895896
assert consumer_call_kwargs["consumer_group"] == expected_workspace_cgroup
896897
assert consumer_call_kwargs["extra_config"] == expected_consumer_extra_config
897898

898-
def test_init_with_broker_id_raises(self):
899-
with pytest.raises(ValueError) as e_info:
900-
Application(
901-
broker_address="address",
902-
quix_config_builder=create_autospec(QuixKafkaConfigsBuilder),
903-
)
904-
error_str = 'Cannot provide both "broker_address" and "quix_sdk_token"'
905-
assert error_str in e_info.value.args
899+
def test_init_with_broker_id_dont_raises(self):
900+
app = Application(
901+
broker_address="address",
902+
quix_config_builder=create_autospec(QuixKafkaConfigsBuilder),
903+
)
904+
905+
assert not app.is_quix_app
906906

907907
def test_topic_name_and_config(self, quix_app_factory):
908908
"""

0 commit comments

Comments
 (0)