10
10
class Application ()
11
11
```
12
12
13
- [[VIEW SOURCE ]](https :// github .com / quixio / quix - streams / blob / ea3d07177df3f11deb3c51e8337534408f5f68c1 / quixstreams / app .py # L55 )
13
+ [[VIEW SOURCE ]](https :// github .com / quixio / quix - streams / blob / 209d74b1262aa6d90dddd1a01473c83d9d10fdce / quixstreams / app .py # L56 )
14
14
15
15
The main Application class .
16
16
@@ -40,7 +40,7 @@ Most functionality is explained the various methods, except for
40
40
```python
41
41
from quixstreams import Application
42
42
43
- # Set up an `app = Application` and `sdf = StreamingDataFrame`;
43
+ # Set up an `app = Application` and `sdf = StreamingDataFrame`;
44
44
# add some operations to `sdf` and then run everything.
45
45
46
46
app = Application (broker_address = ' localhost:9092' , consumer_group = ' group' )
@@ -79,10 +79,11 @@ def __init__(broker_address: Optional[Union[str, ConnectionConfig]] = None,
79
79
quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None ,
80
80
topic_manager: Optional[TopicManager] = None ,
81
81
request_timeout: float = 30 ,
82
- topic_create_timeout: float = 60 )
82
+ topic_create_timeout: float = 60 ,
83
+ processing_guarantee: ProcessingGuarantee = " at-least-once" )
83
84
```
84
85
85
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ ea3d07177df3f11deb3c51e8337534408f5f68c1 / quixstreams/ app.py# L93 )
86
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ 209d74b1262aa6d90dddd1a01473c83d9d10fdce / quixstreams/ app.py# L94 )
86
87
87
88
88
89
< br>
@@ -134,6 +135,7 @@ Default - `True`
134
135
- `topic_manager` : A `TopicManager` instance
135
136
- `request_timeout` : timeout (seconds) for REST - based requests
136
137
- `topic_create_timeout` : timeout (seconds) for topic create finalization
138
+ - `processing_guarantee` : Use " exactly-once" or " at-least-once" processing.
137
139
< br>< br> *** Error Handlers*** < br>
138
140
To handle errors, `Application` accepts callbacks triggered when
139
141
exceptions occur on different stages of stream processing. If the callback
@@ -158,29 +160,33 @@ instead of the default one.
158
160
159
161
```python
160
162
@ classmethod
161
- def Quix(cls ,
162
- consumer_group: Optional[str ] = None ,
163
- auto_offset_reset: AutoOffsetReset = " latest" ,
164
- consumer_extra_config: Optional[dict ] = None ,
165
- producer_extra_config: Optional[dict ] = None ,
166
- state_dir: str = " state" ,
167
- rocksdb_options: Optional[RocksDBOptionsType] = None ,
168
- on_consumer_error: Optional[ConsumerErrorCallback] = None ,
169
- on_processing_error: Optional[ProcessingErrorCallback] = None ,
170
- on_producer_error: Optional[ProducerErrorCallback] = None ,
171
- on_message_processed: Optional[MessageProcessedCallback] = None ,
172
- consumer_poll_timeout: float = 1.0 ,
173
- producer_poll_timeout: float = 0.0 ,
174
- loglevel: Optional[LogLevel] = " INFO" ,
175
- quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None ,
176
- auto_create_topics: bool = True ,
177
- use_changelog_topics: bool = True ,
178
- topic_manager: Optional[QuixTopicManager] = None ,
179
- request_timeout: float = 30 ,
180
- topic_create_timeout: float = 60 ) -> Self
163
+ def Quix(
164
+ cls ,
165
+ consumer_group: Optional[str ] = None ,
166
+ auto_offset_reset: AutoOffsetReset = " latest" ,
167
+ consumer_extra_config: Optional[dict ] = None ,
168
+ producer_extra_config: Optional[dict ] = None ,
169
+ state_dir: str = " state" ,
170
+ rocksdb_options: Optional[RocksDBOptionsType] = None ,
171
+ on_consumer_error: Optional[ConsumerErrorCallback] = None ,
172
+ on_processing_error: Optional[ProcessingErrorCallback] = None ,
173
+ on_producer_error: Optional[ProducerErrorCallback] = None ,
174
+ on_message_processed: Optional[MessageProcessedCallback] = None ,
175
+ consumer_poll_timeout: float = 1.0 ,
176
+ producer_poll_timeout: float = 0.0 ,
177
+ loglevel: Optional[LogLevel] = " INFO" ,
178
+ quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None ,
179
+ auto_create_topics: bool = True ,
180
+ use_changelog_topics: bool = True ,
181
+ topic_manager: Optional[QuixTopicManager] = None ,
182
+ request_timeout: float = 30 ,
183
+ topic_create_timeout: float = 60 ,
184
+ processing_guarantee: Literal[" at-least-once" ,
185
+ " exactly-once" ] = " exactly-once"
186
+ ) -> Self
181
187
```
182
188
183
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ ea3d07177df3f11deb3c51e8337534408f5f68c1 / quixstreams/ app.py# L313 )
189
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ 209d74b1262aa6d90dddd1a01473c83d9d10fdce / quixstreams/ app.py# L330 )
184
190
185
191
> *** NOTE :*** DEPRECATED : use Application with `quix_sdk_token` argument instead.
186
192
@@ -253,6 +259,7 @@ Default - `True`
253
259
- `topic_manager` : A `QuixTopicManager` instance
254
260
- `request_timeout` : timeout (seconds) for REST - based requests
255
261
- `topic_create_timeout` : timeout (seconds) for topic create finalization
262
+ - `processing_guarantee` : Use " exactly-once" or " at-least-once" processing.
256
263
< br>< br> *** Error Handlers*** < br>
257
264
To handle errors, `Application` accepts callbacks triggered when
258
265
exceptions occur on different stages of stream processing. If the callback
@@ -290,7 +297,7 @@ def topic(name: str,
290
297
timestamp_extractor: Optional[TimestampExtractor] = None ) -> Topic
291
298
```
292
299
293
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ ea3d07177df3f11deb3c51e8337534408f5f68c1 / quixstreams/ app.py# L451 )
300
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ 209d74b1262aa6d90dddd1a01473c83d9d10fdce / quixstreams/ app.py# L471 )
294
301
295
302
Create a topic definition.
296
303
@@ -371,7 +378,7 @@ topic = app.topic("input-topic", timestamp_extractor=custom_ts_extractor)
371
378
def dataframe(topic: Topic) -> StreamingDataFrame
372
379
```
373
380
374
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ ea3d07177df3f11deb3c51e8337534408f5f68c1 / quixstreams/ app.py# L531 )
381
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ 209d74b1262aa6d90dddd1a01473c83d9d10fdce / quixstreams/ app.py# L551 )
375
382
376
383
A simple helper method that generates a `StreamingDataFrame` , which is used
377
384
@@ -421,7 +428,7 @@ to be used as an input topic.
421
428
def stop(fail: bool = False )
422
429
```
423
430
424
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ ea3d07177df3f11deb3c51e8337534408f5f68c1 / quixstreams/ app.py# L570 )
431
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ 209d74b1262aa6d90dddd1a01473c83d9d10fdce / quixstreams/ app.py# L590 )
425
432
426
433
Stop the internal poll loop and the message processing.
427
434
@@ -448,7 +455,7 @@ to unhandled exception, and it shouldn't commit the current checkpoint.
448
455
def get_producer() -> Producer
449
456
```
450
457
451
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ ea3d07177df3f11deb3c51e8337534408f5f68c1 / quixstreams/ app.py# L593 )
458
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ 209d74b1262aa6d90dddd1a01473c83d9d10fdce / quixstreams/ app.py# L613 )
452
459
453
460
Create and return a pre- configured Producer instance.
454
461
The Producer is initialized with params passed to Application.
@@ -483,19 +490,23 @@ with app.get_producer() as producer:
483
490
def get_consumer(auto_commit_enable: bool = True ) -> Consumer
484
491
```
485
492
486
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ ea3d07177df3f11deb3c51e8337534408f5f68c1 / quixstreams/ app.py# L623 )
493
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ 209d74b1262aa6d90dddd1a01473c83d9d10fdce / quixstreams/ app.py# L643 )
487
494
488
495
Create and return a pre- configured Consumer instance.
489
496
The Consumer is initialized with params passed to Application.
490
497
491
- It' s useful for consuming data from Kafka outside the standard Application processing flow.
492
- (e.g. to consume test data from a topic).
493
- Using it within the StreamingDataFrame functions is not recommended, as it creates a new Consumer instance
498
+ It' s useful for consuming data from Kafka outside the standard
499
+ Application processing flow.
500
+ (e.g., to consume test data from a topic).
501
+ Using it within the StreamingDataFrame functions is not recommended, as it
502
+ creates a new Consumer instance
494
503
each time, which is not optimized for repeated use in a streaming pipeline.
495
504
496
- Note: By default this consumer does not autocommit consumed offsets to allow exactly- once processing.
505
+ Note: By default, this consumer does not autocommit the consumed offsets to allow
506
+ at- least- once processing.
497
507
To store the offset call store_offsets() after processing a message.
498
- If autocommit is necessary set `enable.auto.offset.store` to True in the consumer config when creating the app.
508
+ If autocommit is necessary set `enable.auto.offset.store` to True in
509
+ the consumer config when creating the app.
499
510
500
511
501
512
< br>
@@ -528,7 +539,7 @@ with app.get_consumer() as consumer:
528
539
def clear_state()
529
540
```
530
541
531
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ ea3d07177df3f11deb3c51e8337534408f5f68c1 / quixstreams/ app.py# L666 )
542
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ 209d74b1262aa6d90dddd1a01473c83d9d10fdce / quixstreams/ app.py# L690 )
532
543
533
544
Clear the state of the application.
534
545
@@ -542,7 +553,7 @@ Clear the state of the application.
542
553
def run(dataframe: StreamingDataFrame)
543
554
```
544
555
545
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ ea3d07177df3f11deb3c51e8337534408f5f68c1 / quixstreams/ app.py# L672 )
556
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ 209d74b1262aa6d90dddd1a01473c83d9d10fdce / quixstreams/ app.py# L696 )
546
557
547
558
Start processing data from Kafka using provided `StreamingDataFrame`
548
559
0 commit comments