Skip to content

Commit f838575

Browse files
authored
Add Quix__State__Dir environment variable to configure app state directory (#844)
1 parent 96276d6 commit f838575

File tree

2 files changed

+27
-8
lines changed

2 files changed

+27
-8
lines changed

quixstreams/app.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def __init__(
168168
>***NOTE:*** the environment variable is set for you in the Quix Cloud
169169
:param consumer_group: Kafka consumer group.
170170
Passed as `group.id` to `confluent_kafka.Consumer`.
171-
Linked Environment Variable: `Quix__Consumer__Group`.
171+
Linked Environment Variable: `Quix__Consumer_Group`.
172172
Default - "quixstreams-default" (set during init)
173173
>***NOTE:*** Quix Applications will prefix it with the Quix workspace id.
174174
:param commit_interval: How often to commit the processed messages in seconds.
@@ -188,6 +188,7 @@ def __init__(
188188
:param producer_extra_config: A dictionary with additional options that
189189
will be passed to `confluent_kafka.Producer` as is.
190190
:param state_dir: path to the application state directory.
191+
Linked Environment Variable: `Quix__State__Dir`.
191192
Default - `"state"`.
192193
:param rocksdb_options: RocksDB options.
193194
If `None`, the default options will be used.
@@ -232,7 +233,9 @@ def __init__(
232233
consumer_extra_config = consumer_extra_config or {}
233234

234235
if state_dir is None:
235-
state_dir = "/app/state" if is_quix_deployment() else "state"
236+
state_dir = os.getenv(
237+
"Quix__State__Dir", "/app/state" if is_quix_deployment() else "state"
238+
)
236239
state_dir = Path(state_dir)
237240

238241
# We can't use os.getenv as defaults (and have testing work nicely)

tests/test_quixstreams/test_app.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -479,12 +479,13 @@ def test_consumer_group_env(self):
479479
"""
480480
Sanity check consumer_group gets set from the environment via getenv.
481481
"""
482-
consumer_group = "my_group"
483-
with patch.dict(os.environ, {"Quix__Consumer__group": consumer_group}):
484-
app = Application(
485-
broker_address="my_address", consumer_group=consumer_group
486-
)
487-
assert app.config.consumer_group == consumer_group
482+
with patch.dict(os.environ, {"Quix__Consumer_Group": "environ_group"}):
483+
app = Application(broker_address="my_address")
484+
assert app.config.consumer_group == "environ_group"
485+
486+
with patch.dict(os.environ, {"Quix__Consumer_Group": "environ_group"}):
487+
app = Application(broker_address="my_address", consumer_group="app_group")
488+
assert app.config.consumer_group == "app_group"
488489

489490
def test_consumer_group_default(self):
490491
"""
@@ -494,6 +495,21 @@ def test_consumer_group_default(self):
494495
app = Application(broker_address="my_address")
495496
assert app.config.consumer_group == "quixstreams-default"
496497

498+
def test_state_dir_env(self):
499+
"""
500+
Sanity check state_dir gets set from the environment via getenv.
501+
"""
502+
app = Application(broker_address="my_address")
503+
assert app.config.state_dir == Path("state")
504+
505+
with patch.dict(os.environ, {"Quix__State__Dir": "/path/to/state"}):
506+
app = Application(broker_address="my_address")
507+
assert app.config.state_dir == Path("/path/to/state")
508+
509+
with patch.dict(os.environ, {"Quix__State__Dir": "/path/to/state"}):
510+
app = Application(broker_address="my_address", state_dir="/path/to/other")
511+
assert app.config.state_dir == Path("/path/to/other")
512+
497513

498514
class TestAppGroupBy:
499515
def test_group_by(

0 commit comments

Comments
 (0)