Skip to content

Commit 6c6305d

Browse files
authored
Feature: ConnectionConfig (#371)
- Provide a more convenient way for users to connect to Kafka clusters that require authentication with `ConnectionConfig` - Removed rarely used parameters from `Application` + consumer/producer (assignment strategy, partitioner) - Added arguments for logger and callbacks for producer/consumer so users could change them - Improved `QuixConfigBuilder` config extraction to use new API endpoint, which simplified it.
1 parent 0fa09d1 commit 6c6305d

27 files changed

+4411
-4142
lines changed

LICENSES/LICENSE.pydantic

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
The MIT License (MIT)
2+
3+
Copyright (c) 2017 to present Pydantic Services Inc. and individual contributors.
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

docs/api-reference/application.md

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
class Application()
1111
```
1212

13-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3d3aca1106fe7f136d5005bdfd380dc4118c118b/quixstreams/app.py#L59)
13+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L54)
1414

1515
The main Application class.
1616

@@ -58,12 +58,11 @@ app.run(dataframe=df)
5858
#### Application.\_\_init\_\_
5959

6060
```python
61-
def __init__(broker_address: Optional[str] = None,
61+
def __init__(broker_address: Optional[Union[str, ConnectionConfig]] = None,
6262
quix_sdk_token: Optional[str] = None,
6363
consumer_group: Optional[str] = None,
6464
auto_offset_reset: AutoOffsetReset = "latest",
6565
commit_interval: float = 5.0,
66-
partitioner: Partitioner = "murmur2",
6766
consumer_extra_config: Optional[dict] = None,
6867
producer_extra_config: Optional[dict] = None,
6968
state_dir: str = "state",
@@ -78,17 +77,21 @@ def __init__(broker_address: Optional[str] = None,
7877
auto_create_topics: bool = True,
7978
use_changelog_topics: bool = True,
8079
quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None,
81-
topic_manager: Optional[TopicManager] = None)
80+
topic_manager: Optional[TopicManager] = None,
81+
request_timeout: float = 30,
82+
topic_create_timeout: float = 60)
8283
```
8384

84-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3d3aca1106fe7f136d5005bdfd380dc4118c118b/quixstreams/app.py#L97)
85+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L92)
8586

8687

8788
<br>
8889
***Arguments:***
8990

90-
- `broker_address`: Kafka broker host and port in format `<host>:<port>`.
91-
Passed as `bootstrap.servers` to `confluent_kafka.Consumer`.
91+
- `broker_address`: Connection settings for Kafka.
92+
Used by Producer, Consumer, and Admin clients.
93+
Accepts string with Kafka broker host and port formatted as `<host>:<port>`,
94+
or a ConnectionConfig object if authentication is required.
9295
Either this OR `quix_sdk_token` must be set to use `Application` (not both).
9396
Linked Environment Variable: `Quix__Broker__Address`.
9497
Default: `None`
@@ -105,8 +108,6 @@ Default - "quixstreams-default" (set during init)
105108
- `commit_interval`: How often to commit the processed messages in seconds.
106109
Default - 5.0.
107110
- `auto_offset_reset`: Consumer `auto.offset.reset` setting
108-
- `partitioner`: A function to be used to determine the outgoing message
109-
partition.
110111
- `consumer_extra_config`: A dictionary with additional options that
111112
will be passed to `confluent_kafka.Consumer` as is.
112113
- `producer_extra_config`: A dictionary with additional options that
@@ -130,6 +131,8 @@ Default - `True`
130131
- `use_changelog_topics`: Use changelog topics to back stateful operations
131132
Default - `True`
132133
- `topic_manager`: A `TopicManager` instance
134+
- `request_timeout`: timeout (seconds) for REST-based requests
135+
- `topic_create_timeout`: timeout (seconds) for topic create finalization
133136
<br><br>***Error Handlers***<br>
134137
To handle errors, `Application` accepts callbacks triggered when
135138
exceptions occur on different stages of stream processing. If the callback
@@ -157,7 +160,6 @@ instead of the default one.
157160
def Quix(cls,
158161
consumer_group: Optional[str] = None,
159162
auto_offset_reset: AutoOffsetReset = "latest",
160-
partitioner: Partitioner = "murmur2",
161163
consumer_extra_config: Optional[dict] = None,
162164
producer_extra_config: Optional[dict] = None,
163165
state_dir: str = "state",
@@ -172,10 +174,12 @@ def Quix(cls,
172174
quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None,
173175
auto_create_topics: bool = True,
174176
use_changelog_topics: bool = True,
175-
topic_manager: Optional[QuixTopicManager] = None) -> Self
177+
topic_manager: Optional[QuixTopicManager] = None,
178+
request_timeout: float = 30,
179+
topic_create_timeout: float = 60) -> Self
176180
```
177181

178-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3d3aca1106fe7f136d5005bdfd380dc4118c118b/quixstreams/app.py#L303)
182+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L305)
179183

180184
>***NOTE:*** DEPRECATED: use Application with `quix_sdk_token` argument instead.
181185

@@ -223,8 +227,6 @@ Linked Environment Variable: `Quix__Consumer__Group`.
223227
Default - "quixstreams-default" (set during init).
224228
>***NOTE:*** Quix Applications will prefix it with the Quix workspace id.
225229
- `auto_offset_reset`: Consumer `auto.offset.reset` setting
226-
- `partitioner`: A function to be used to determine the outgoing message
227-
partition.
228230
- `consumer_extra_config`: A dictionary with additional options that
229231
will be passed to `confluent_kafka.Consumer` as is.
230232
- `producer_extra_config`: A dictionary with additional options that
@@ -248,6 +250,8 @@ Default - `True`
248250
- `use_changelog_topics`: Use changelog topics to back stateful operations
249251
Default - `True`
250252
- `topic_manager`: A `QuixTopicManager` instance
253+
- `request_timeout`: timeout (seconds) for REST-based requests
254+
- `topic_create_timeout`: timeout (seconds) for topic create finalization
251255
<br><br>***Error Handlers***<br>
252256
To handle errors, `Application` accepts callbacks triggered when
253257
exceptions occur on different stages of stream processing. If the callback
@@ -285,7 +289,7 @@ def topic(name: str,
285289
timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic
286290
```
287291

288-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3d3aca1106fe7f136d5005bdfd380dc4118c118b/quixstreams/app.py#L439)
292+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L443)
289293

290294
Create a topic definition.
291295

@@ -366,7 +370,7 @@ topic = app.topic("input-topic", timestamp_extractor=custom_ts_extractor)
366370
def dataframe(topic: Topic) -> StreamingDataFrame
367371
```
368372

369-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3d3aca1106fe7f136d5005bdfd380dc4118c118b/quixstreams/app.py#L519)
373+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L523)
370374

371375
A simple helper method that generates a `StreamingDataFrame`, which is used
372376

@@ -416,7 +420,7 @@ to be used as an input topic.
416420
def stop(fail: bool = False)
417421
```
418422

419-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3d3aca1106fe7f136d5005bdfd380dc4118c118b/quixstreams/app.py#L558)
423+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L562)
420424

421425
Stop the internal poll loop and the message processing.
422426

@@ -443,7 +447,7 @@ to unhandled exception, and it shouldn't commit the current checkpoint.
443447
def get_producer() -> Producer
444448
```
445449

446-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3d3aca1106fe7f136d5005bdfd380dc4118c118b/quixstreams/app.py#L581)
450+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L585)
447451

448452
Create and return a pre-configured Producer instance.
449453
The Producer is initialized with params passed to Application.
@@ -478,7 +482,7 @@ with app.get_producer() as producer:
478482
def get_consumer(auto_commit_enable: bool = True) -> Consumer
479483
```
480484

481-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3d3aca1106fe7f136d5005bdfd380dc4118c118b/quixstreams/app.py#L612)
485+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L615)
482486

483487
Create and return a pre-configured Consumer instance.
484488
The Consumer is initialized with params passed to Application.
@@ -523,7 +527,7 @@ with app.get_consumer() as consumer:
523527
def clear_state()
524528
```
525529

526-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3d3aca1106fe7f136d5005bdfd380dc4118c118b/quixstreams/app.py#L656)
530+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L658)
527531

528532
Clear the state of the application.
529533

@@ -537,7 +541,7 @@ Clear the state of the application.
537541
def run(dataframe: StreamingDataFrame)
538542
```
539543

540-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3d3aca1106fe7f136d5005bdfd380dc4118c118b/quixstreams/app.py#L662)
544+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/app.py#L664)
541545

542546
Start processing data from Kafka using provided `StreamingDataFrame`
543547

docs/api-reference/context.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
def set_message_context(context: Optional[MessageContext])
1313
```
1414

15-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3d3aca1106fe7f136d5005bdfd380dc4118c118b/quixstreams/context.py#L21)
15+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/context.py#L21)
1616

1717
Set a MessageContext for the current message in the given `contextvars.Context`
1818

@@ -55,7 +55,7 @@ sdf = sdf.update(lambda value: alter_context(value))
5555
def message_context() -> MessageContext
5656
```
5757

58-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3d3aca1106fe7f136d5005bdfd380dc4118c118b/quixstreams/context.py#L52)
58+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/context.py#L52)
5959

6060
Get a MessageContext for the current message, which houses most of the message
6161

@@ -96,7 +96,7 @@ instance of `MessageContext`
9696
def message_key() -> Any
9797
```
9898

99-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/3d3aca1106fe7f136d5005bdfd380dc4118c118b/quixstreams/context.py#L83)
99+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/e4fe9845a6b3ae9985af686f3d54f5c78074c770/quixstreams/context.py#L83)
100100

101101
Get the current message's key.
102102

0 commit comments

Comments
 (0)