Skip to content

Commit f021321

Browse files
authored
Add API to export the application configuration (#470)
1 parent fa6cf26 commit f021321

File tree

4 files changed

+210
-81
lines changed

4 files changed

+210
-81
lines changed

quixstreams/app.py

Lines changed: 154 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44
import os
55
import signal
66
import warnings
7-
from typing import Optional, List, Callable, Union, Literal, get_args
7+
from typing import Optional, List, Callable, Union, Literal, Tuple, Type
88

99
from confluent_kafka import TopicPartition
1010
from typing_extensions import Self
11+
from pathlib import Path
12+
from pydantic import Field, AliasGenerator
13+
from pydantic_settings import PydanticBaseSettingsSource, SettingsConfigDict
1114

1215
from .context import set_message_context, copy_context
1316
from .dataframe import StreamingDataFrame
@@ -41,8 +44,9 @@
4144
from .state import StateStoreManager
4245
from .state.recovery import RecoveryManager
4346
from .state.rocksdb import RocksDBOptionsType
47+
from .utils.settings import BaseSettings
4448

45-
__all__ = ("Application",)
49+
__all__ = ("Application", "ApplicationConfig")
4650

4751
logger = logging.getLogger(__name__)
4852
ProcessingGuarantee = Literal["at-least-once", "exactly-once"]
@@ -155,7 +159,7 @@ def __init__(
155159
:param producer_extra_config: A dictionary with additional options that
156160
will be passed to `confluent_kafka.Producer` as is.
157161
:param state_dir: path to the application state directory.
158-
Default - `".state"`.
162+
Default - `"state"`.
159163
:param rocksdb_options: RocksDB options.
160164
If `None`, the default options will be used.
161165
:param consumer_poll_timeout: timeout for `RowConsumer.poll()`. Default - `1.0`s
@@ -195,22 +199,9 @@ def __init__(
195199
"""
196200
configure_logging(loglevel=loglevel)
197201

198-
if processing_guarantee not in get_args(ProcessingGuarantee):
199-
raise ValueError(
200-
f'Must provide a valid "processing_guarantee"; expected one of: '
201-
f'{get_args(ProcessingGuarantee)}, got "{processing_guarantee}"'
202-
)
203-
204202
producer_extra_config = producer_extra_config or {}
205203
consumer_extra_config = consumer_extra_config or {}
206204

207-
# Add default values to the producer config, but allow them to be overwritten
208-
# by the provided producer_extra_config dict
209-
producer_extra_config = {
210-
**_default_producer_extra_config,
211-
**producer_extra_config,
212-
}
213-
214205
# We can't use os.getenv as defaults (and have testing work nicely)
215206
# since it evaluates getenv when the function is defined.
216207
# In general this is just a most robust approach.
@@ -263,85 +254,100 @@ def __init__(
263254
consumer_extra_config.update(quix_app_config.librdkafka_extra_config)
264255
producer_extra_config.update(quix_app_config.librdkafka_extra_config)
265256

266-
self._broker_address = broker_address
267-
self._consumer_group = consumer_group
268-
self._auto_offset_reset = auto_offset_reset
269-
self._commit_interval = commit_interval
270-
self._commit_every = commit_every
271-
self._producer_extra_config = producer_extra_config
272-
self._consumer_extra_config = consumer_extra_config
273-
self._processing_guarantee = processing_guarantee
274-
self._consumer = RowConsumer(
257+
self._config = ApplicationConfig(
275258
broker_address=broker_address,
276259
consumer_group=consumer_group,
277260
auto_offset_reset=auto_offset_reset,
261+
commit_interval=commit_interval,
262+
commit_every=commit_every,
263+
# Add default values to the producer config, but allow them to be overwritten
264+
# by the provided producer_extra_config dict
265+
producer_extra_config={
266+
**_default_producer_extra_config,
267+
**producer_extra_config,
268+
},
269+
consumer_extra_config=consumer_extra_config,
270+
processing_guarantee=processing_guarantee,
271+
consumer_poll_timeout=consumer_poll_timeout,
272+
producer_poll_timeout=producer_poll_timeout,
273+
auto_create_topics=auto_create_topics,
274+
request_timeout=request_timeout,
275+
topic_create_timeout=topic_create_timeout,
276+
state_dir=state_dir,
277+
rocksdb_options=rocksdb_options,
278+
use_changelog_topics=use_changelog_topics,
279+
)
280+
281+
self._on_message_processed = on_message_processed
282+
self._on_processing_error = on_processing_error or default_on_processing_error
283+
284+
self._consumer = RowConsumer(
285+
broker_address=self._config.broker_address,
286+
consumer_group=self._config.consumer_group,
287+
auto_offset_reset=self._config.auto_offset_reset,
278288
auto_commit_enable=False, # Disable auto commit and manage commits manually
279-
extra_config=consumer_extra_config,
289+
extra_config=self._config.consumer_extra_config,
280290
on_error=on_consumer_error,
281291
)
282292
self._producer = RowProducer(
283-
broker_address=broker_address,
284-
extra_config=producer_extra_config,
293+
broker_address=self._config.broker_address,
294+
extra_config=self._config.producer_extra_config,
285295
on_error=on_producer_error,
286-
flush_timeout=consumer_extra_config.get(
287-
"max.poll.interval.ms", _default_max_poll_interval_ms
288-
)
289-
/ 1000, # convert to seconds
290-
transactional=self._uses_exactly_once,
296+
flush_timeout=self._config.flush_timeout,
297+
transactional=self._config.exactly_once,
291298
)
292-
self._consumer_poll_timeout = consumer_poll_timeout
293-
self._producer_poll_timeout = producer_poll_timeout
294-
self._on_processing_error = on_processing_error or default_on_processing_error
295-
self._on_message_processed = on_message_processed
296-
self._auto_create_topics = auto_create_topics
297299
self._running = False
298300
self._failed = False
299301

300302
if not topic_manager:
301303
topic_manager = topic_manager_factory(
302304
topic_admin=TopicAdmin(
303-
broker_address=broker_address,
304-
extra_config=producer_extra_config,
305+
broker_address=self._config.broker_address,
306+
extra_config=self._config.producer_extra_config,
305307
),
306-
consumer_group=consumer_group,
307-
timeout=request_timeout,
308-
create_timeout=topic_create_timeout,
308+
consumer_group=self._config.consumer_group,
309+
timeout=self._config.request_timeout,
310+
create_timeout=self._config.topic_create_timeout,
309311
)
310312
self._topic_manager = topic_manager
313+
314+
producer = None
315+
recovery_manager = None
316+
if self._config.use_changelog_topics:
317+
producer = self._producer
318+
recovery_manager = RecoveryManager(
319+
consumer=self._consumer,
320+
topic_manager=self._topic_manager,
321+
)
322+
311323
self._state_manager = StateStoreManager(
312-
group_id=consumer_group,
313-
state_dir=state_dir,
314-
rocksdb_options=rocksdb_options,
315-
producer=self._producer if use_changelog_topics else None,
316-
recovery_manager=(
317-
RecoveryManager(
318-
consumer=self._consumer,
319-
topic_manager=self._topic_manager,
320-
)
321-
if use_changelog_topics
322-
else None
323-
),
324+
group_id=self._config.consumer_group,
325+
state_dir=self._config.state_dir,
326+
rocksdb_options=self._config.rocksdb_options,
327+
producer=producer,
328+
recovery_manager=recovery_manager,
324329
)
330+
325331
self._sink_manager = SinkManager()
326332
self._pausing_manager = PausingManager(consumer=self._consumer)
327333
self._processing_context = ProcessingContext(
328-
commit_interval=self._commit_interval,
329-
commit_every=commit_every,
334+
commit_interval=self._config.commit_interval,
335+
commit_every=self._config.commit_every,
330336
producer=self._producer,
331337
consumer=self._consumer,
332338
state_manager=self._state_manager,
333-
exactly_once=self._uses_exactly_once,
339+
exactly_once=self._config.exactly_once,
334340
sink_manager=self._sink_manager,
335341
pausing_manager=self._pausing_manager,
336342
)
337343

338344
@property
339-
def is_quix_app(self) -> bool:
340-
return self._is_quix_app
345+
def config(self) -> "ApplicationConfig":
346+
return self._config
341347

342348
@property
343-
def _uses_exactly_once(self) -> bool:
344-
return self._processing_guarantee == "exactly-once"
349+
def is_quix_app(self) -> bool:
350+
return self._is_quix_app
345351

346352
@classmethod
347353
def Quix(
@@ -642,7 +648,7 @@ def get_producer(self) -> Producer:
642648
```python
643649
from quixstreams import Application
644650
645-
app = Application.Quix(...)
651+
app = Application(...)
646652
topic = app.topic("input")
647653
648654
with app.get_producer() as producer:
@@ -653,8 +659,8 @@ def get_producer(self) -> Producer:
653659
self._setup_topics()
654660

655661
return Producer(
656-
broker_address=self._broker_address,
657-
extra_config=self._producer_extra_config,
662+
broker_address=self._config.broker_address,
663+
extra_config=self._config.producer_extra_config,
658664
)
659665

660666
def get_consumer(self, auto_commit_enable: bool = True) -> Consumer:
@@ -680,7 +686,7 @@ def get_consumer(self, auto_commit_enable: bool = True) -> Consumer:
680686
```python
681687
from quixstreams import Application
682688
683-
app = Application.Quix(...)
689+
app = Application(...)
684690
topic = app.topic("input")
685691
686692
with app.get_consumer() as consumer:
@@ -697,11 +703,11 @@ def get_consumer(self, auto_commit_enable: bool = True) -> Consumer:
697703
self._setup_topics()
698704

699705
return Consumer(
700-
broker_address=self._broker_address,
701-
consumer_group=self._consumer_group,
702-
auto_offset_reset=self._auto_offset_reset,
706+
broker_address=self._config.broker_address,
707+
consumer_group=self._config.consumer_group,
708+
auto_offset_reset=self._config.auto_offset_reset,
703709
auto_commit_enable=auto_commit_enable,
704-
extra_config=self._consumer_extra_config,
710+
extra_config=self._config.consumer_extra_config,
705711
)
706712

707713
def clear_state(self):
@@ -743,12 +749,12 @@ def run(
743749

744750
logger.info(
745751
f"Starting the Application with the config: "
746-
f'broker_address="{self._broker_address}" '
747-
f'consumer_group="{self._consumer_group}" '
748-
f'auto_offset_reset="{self._auto_offset_reset}" '
749-
f"commit_interval={self._commit_interval}s "
750-
f"commit_every={self._commit_every} "
751-
f'processing_guarantee="{self._processing_guarantee}"'
752+
f'broker_address="{self._config.broker_address}" '
753+
f'consumer_group="{self._config.consumer_group}" '
754+
f'auto_offset_reset="{self._config.auto_offset_reset}" '
755+
f"commit_interval={self._config.commit_interval}s "
756+
f"commit_every={self._config.commit_every} "
757+
f'processing_guarantee="{self._config.processing_guarantee}"'
752758
)
753759
if self.is_quix_app:
754760
self._quix_runtime_init()
@@ -805,14 +811,14 @@ def _setup_topics(self):
805811
f'"{topic}"' for topic in self._topic_manager.all_topics
806812
)
807813
logger.info(f"Topics required for this application: {topics_list}")
808-
if self._auto_create_topics:
814+
if self._config.auto_create_topics:
809815
self._topic_manager.create_all_topics()
810816
self._topic_manager.validate_all_topics()
811817

812818
def _process_message(self, dataframe_composed):
813819
# Serve producer callbacks
814-
self._producer.poll(self._producer_poll_timeout)
815-
rows = self._consumer.poll_row(timeout=self._consumer_poll_timeout)
820+
self._producer.poll(self._config.producer_poll_timeout)
821+
rows = self._consumer.poll_row(timeout=self._config.consumer_poll_timeout)
816822

817823
if rows is None:
818824
return
@@ -964,3 +970,72 @@ def _on_sigint(self, *_):
964970
def _on_sigterm(self, *_):
965971
logger.debug(f"Received SIGTERM, stopping the processing loop")
966972
self.stop()
973+
974+
975+
class ApplicationConfig(BaseSettings):
976+
"""
977+
Immutable object holding the application configuration
978+
979+
For details see :class:`quixstreams.Application`
980+
"""
981+
982+
model_config = SettingsConfigDict(
983+
frozen=True,
984+
revalidate_instances="always",
985+
alias_generator=AliasGenerator(
986+
# used during model_dumps
987+
serialization_alias=lambda field_name: field_name.replace("_", "."),
988+
),
989+
)
990+
991+
broker_address: ConnectionConfig
992+
consumer_group: str
993+
auto_offset_reset: AutoOffsetReset = "latest"
994+
commit_interval: float = 5.0
995+
commit_every: int = 0
996+
producer_extra_config: dict = Field(default_factory=dict)
997+
consumer_extra_config: dict = Field(default_factory=dict)
998+
processing_guarantee: ProcessingGuarantee = "at-least-once"
999+
consumer_poll_timeout: float = 1.0
1000+
producer_poll_timeout: float = 0.0
1001+
auto_create_topics: bool = True
1002+
request_timeout: float = 30
1003+
topic_create_timeout: float = 60
1004+
state_dir: Path = "state"
1005+
rocksdb_options: Optional[RocksDBOptionsType] = None
1006+
use_changelog_topics: bool = True
1007+
1008+
@classmethod
1009+
def settings_customise_sources(
1010+
cls,
1011+
settings_cls: Type[BaseSettings],
1012+
init_settings: PydanticBaseSettingsSource,
1013+
env_settings: PydanticBaseSettingsSource,
1014+
dotenv_settings: PydanticBaseSettingsSource,
1015+
file_secret_settings: PydanticBaseSettingsSource,
1016+
) -> Tuple[PydanticBaseSettingsSource, ...]:
1017+
"""
1018+
Included to ignore reading/setting values from the environment
1019+
"""
1020+
return (init_settings,)
1021+
1022+
def copy(self, **kwargs) -> Self:
1023+
"""
1024+
Update the application config and return a copy
1025+
"""
1026+
copy = self.model_copy(update=kwargs)
1027+
copy.model_validate(copy, strict=True)
1028+
return copy
1029+
1030+
@property
1031+
def flush_timeout(self) -> float:
1032+
return (
1033+
self.consumer_extra_config.get(
1034+
"max.poll.interval.ms", _default_max_poll_interval_ms
1035+
)
1036+
/ 1000
1037+
) # convert to seconds
1038+
1039+
@property
1040+
def exactly_once(self) -> bool:
1041+
return self.processing_guarantee == "exactly-once"

quixstreams/state/rocksdb/types.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from typing import Protocol, Optional, Literal
2+
from typing_extensions import runtime_checkable
23

34
import rocksdict
45

@@ -7,6 +8,7 @@
78
CompressionType = Literal["none", "snappy", "zlib", "bz2", "lz4", "lz4hc", "zstd"]
89

910

11+
@runtime_checkable
1012
class RocksDBOptionsType(Protocol):
1113
write_buffer_size: int
1214
target_file_size_base: int

tests/test_quixstreams/test_app.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -523,15 +523,15 @@ def test_consumer_group_env(self):
523523
app = Application(
524524
broker_address="my_address", consumer_group=consumer_group
525525
)
526-
assert app._consumer_group == consumer_group
526+
assert app.config.consumer_group == consumer_group
527527

528528
def test_consumer_group_default(self):
529529
"""
530530
Sanity check behavior around getenv defaults
531531
"""
532532
with patch.dict(os.environ, {}, clear=True):
533533
app = Application(broker_address="my_address")
534-
assert app._consumer_group == "quixstreams-default"
534+
assert app.config.consumer_group == "quixstreams-default"
535535

536536

537537
class TestAppGroupBy:

0 commit comments

Comments
 (0)