10
10
class Application ()
11
11
```
12
12
13
- [[VIEW SOURCE ]](https :// github .com / quixio / quix - streams / blob / main / quixstreams / app .py # L63)
13
+ [[VIEW SOURCE ]](https :// github .com / quixio / quix - streams / blob / chore / reorganize - connectors / quixstreams / app .py # L63)
14
14
15
15
The main Application class .
16
16
@@ -66,7 +66,7 @@ def __init__(broker_address: Optional[Union[str, ConnectionConfig]] = None,
66
66
commit_every: int = 0 ,
67
67
consumer_extra_config: Optional[dict ] = None ,
68
68
producer_extra_config: Optional[dict ] = None ,
69
- state_dir: str = " state" ,
69
+ state_dir: Union[ str , Path] = Path( " state" ) ,
70
70
rocksdb_options: Optional[RocksDBOptionsType] = None ,
71
71
on_consumer_error: Optional[ConsumerErrorCallback] = None ,
72
72
on_processing_error: Optional[ProcessingErrorCallback] = None ,
@@ -84,7 +84,7 @@ def __init__(broker_address: Optional[Union[str, ConnectionConfig]] = None,
84
84
processing_guarantee: ProcessingGuarantee = " at-least-once" )
85
85
```
86
86
87
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main / quixstreams/ app.py# L101)
87
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ chore / reorganize - connectors / quixstreams/ app.py# L101)
88
88
89
89
90
90
< br>
@@ -170,126 +170,15 @@ instead of the default one.
170
170
171
171
```python
172
172
@ classmethod
173
- def Quix(
174
- cls ,
175
- consumer_group: Optional[str ] = None ,
176
- auto_offset_reset: AutoOffsetReset = " latest" ,
177
- consumer_extra_config: Optional[dict ] = None ,
178
- producer_extra_config: Optional[dict ] = None ,
179
- state_dir: str = " state" ,
180
- rocksdb_options: Optional[RocksDBOptionsType] = None ,
181
- on_consumer_error: Optional[ConsumerErrorCallback] = None ,
182
- on_processing_error: Optional[ProcessingErrorCallback] = None ,
183
- on_producer_error: Optional[ProducerErrorCallback] = None ,
184
- on_message_processed: Optional[MessageProcessedCallback] = None ,
185
- consumer_poll_timeout: float = 1.0 ,
186
- producer_poll_timeout: float = 0.0 ,
187
- loglevel: Optional[Union[int , LogLevel]] = " INFO" ,
188
- quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None ,
189
- auto_create_topics: bool = True ,
190
- use_changelog_topics: bool = True ,
191
- topic_manager: Optional[QuixTopicManager] = None ,
192
- request_timeout: float = 30 ,
193
- topic_create_timeout: float = 60 ,
194
- processing_guarantee: Literal[" at-least-once" ,
195
- " exactly-once" ] = " exactly-once"
196
- ) -> Self
173
+ def Quix(cls , * args, ** kwargs)
197
174
```
198
175
199
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main / quixstreams/ app.py# L353 )
176
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ chore / reorganize - connectors / quixstreams/ app.py# L355 )
200
177
201
- > *** NOTE : *** DEPRECATED : use Application with `quix_sdk_token` argument instead .
178
+ RAISES EXCEPTION : DEPRECATED .
202
179
203
- Initialize an Application to work with Quix Cloud,
204
- assuming environment is properly configured (by default in Quix Cloud).
205
-
206
- It takes the credentials from the environment and configures consumer and
207
- producer to properly connect to the Quix Cloud.
208
-
209
- > *** NOTE :*** Quix Cloud requires `consumer_group` and topic names to be
210
- prefixed with workspace id .
211
- If the application is created via `Application.Quix()` , the real consumer
212
- group will be `< workspace_id> - < consumer_group> ` ,
213
- and the real topic names will be `< workspace_id> - < topic_name> ` .
214
-
215
-
216
-
217
-
218
- < br>
219
- ** * Example Snippet:***
220
-
221
- ```python
222
- from quixstreams import Application
223
-
224
- # Set up an `app = Application.Quix` and `sdf = StreamingDataFrame`;
225
- # add some operations to `sdf` and then run everything. Also shows off how to
226
- # use the quix-specific serializers and deserializers.
227
-
228
- app = Application.Quix()
229
- input_topic = app.topic(" topic-in" , value_deserializer = " quix" )
230
- output_topic = app.topic(" topic-out" , value_serializer = " quix_timeseries" )
231
- df = app.dataframe(topic_in)
232
- df = df.to_topic(output_topic)
233
-
234
- app.run()
235
- ```
236
-
237
-
238
- < br>
239
- ** * Arguments:***
240
-
241
- - `consumer_group` : Kafka consumer group.
242
- Passed as `group.id` to `confluent_kafka.Consumer` .
243
- Linked Environment Variable: `Quix__Consumer__Group` .
244
- Default - " quixstreams-default" (set during init).
245
- > *** NOTE :*** Quix Applications will prefix it with the Quix workspace id .
246
- - `auto_offset_reset` : Consumer `auto.offset.reset` setting
247
- - `consumer_extra_config` : A dictionary with additional options that
248
- will be passed to `confluent_kafka.Consumer` as is .
249
- - `producer_extra_config` : A dictionary with additional options that
250
- will be passed to `confluent_kafka.Producer` as is .
251
- - `state_dir` : path to the application state directory.
252
- Default - `" .state" ` .
253
- - `rocksdb_options` : RocksDB options.
254
- If `None ` , the default options will be used.
255
- - `consumer_poll_timeout` : timeout for `RowConsumer.poll()` . Default - `1.0 ` s
256
- - `producer_poll_timeout` : timeout for `RowProducer.poll()` . Default - `0 ` s.
257
- - `on_message_processed` : a callback triggered when message is successfully
258
- processed.
259
- - `loglevel` : a log level for " quixstreams" logger.
260
- Should be a string or `None ` .
261
- If `None ` is passed, no logging will be configured.
262
- You may pass `None ` and configure " quixstreams" logger
263
- externally using `logging` library.
264
- Default - `" INFO" ` .
265
- - `auto_create_topics` : Create all `Topic` s made via `Application.topic()`
266
- Default - `True `
267
- - `use_changelog_topics` : Use changelog topics to back stateful operations
268
- Default - `True `
269
- - `topic_manager` : A `QuixTopicManager` instance
270
- - `request_timeout` : timeout (seconds) for REST - based requests
271
- - `topic_create_timeout` : timeout (seconds) for topic create finalization
272
- - `processing_guarantee` : Use " exactly-once" or " at-least-once" processing.
273
- < br>< br> *** Error Handlers*** < br>
274
- To handle errors, `Application` accepts callbacks triggered when
275
- exceptions occur on different stages of stream processing. If the callback
276
- returns `True ` , the exception will be ignored. Otherwise, the exception
277
- will be propagated and the processing will eventually stop.
278
- - `on_consumer_error` : triggered when internal `RowConsumer` fails to poll
279
- Kafka or cannot deserialize a message.
280
- - `on_processing_error` : triggered when exception is raised within
281
- `StreamingDataFrame.process()` .
282
- - `on_producer_error` : triggered when RowProducer fails to serialize
283
- or to produce a message to Kafka.
284
- < br>< br> *** Quix Cloud Parameters*** < br>
285
- - `quix_config_builder` : instance of `QuixKafkaConfigsBuilder` to be used
286
- instead of the default one.
287
-
288
-
289
- < br>
290
- ** * Returns:***
291
-
292
- `Application` object
180
+ use Application() with " quix_sdk_token" parameter or set the " Quix__Sdk__Token"
181
+ environment variable.
293
182
294
183
< a id = " quixstreams.app.Application.topic" >< / a>
295
184
@@ -307,7 +196,7 @@ def topic(name: str,
307
196
timestamp_extractor: Optional[TimestampExtractor] = None ) -> Topic
308
197
```
309
198
310
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main / quixstreams/ app.py# L494 )
199
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ chore / reorganize - connectors / quixstreams/ app.py# L370 )
311
200
312
201
Create a topic definition.
313
202
@@ -389,14 +278,19 @@ def dataframe(topic: Optional[Topic] = None,
389
278
source: Optional[BaseSource] = None ) -> StreamingDataFrame
390
279
```
391
280
392
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main / quixstreams/ app.py# L574 )
281
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ chore / reorganize - connectors / quixstreams/ app.py# L450 )
393
282
394
283
A simple helper method that generates a `StreamingDataFrame` , which is used
395
284
396
285
to define your message processing pipeline.
397
286
398
- See :class :`quixstreams.dataframe.StreamingDataFrame` for more details.
287
+ The topic is what the `StreamingDataFrame` will use as its input , unless
288
+ a source is provided (`topic` is optional when using a `source` ).
289
+
290
+ If both `topic` AND `source` are provided, the source will write to that topic
291
+ instead of its default topic (which the `StreamingDataFrame` then consumes).
399
292
293
+ See :class :`quixstreams.dataframe.StreamingDataFrame` for more details.
400
294
401
295
402
296
< br>
@@ -422,6 +316,7 @@ app.run()
422
316
423
317
- `topic` : a `quixstreams.models.Topic` instance
424
318
to be used as an input topic.
319
+ - `source` : a `quixstreams.sources` " BaseSource" instance
425
320
426
321
427
322
< br>
@@ -439,7 +334,7 @@ to be used as an input topic.
439
334
def stop(fail: bool = False )
440
335
```
441
336
442
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main / quixstreams/ app.py# L620 )
337
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ chore / reorganize - connectors / quixstreams/ app.py# L505 )
443
338
444
339
Stop the internal poll loop and the message processing.
445
340
@@ -466,7 +361,7 @@ to unhandled exception, and it shouldn't commit the current checkpoint.
466
361
def get_producer() -> Producer
467
362
```
468
363
469
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main / quixstreams/ app.py# L665 )
364
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ chore / reorganize - connectors / quixstreams/ app.py# L550 )
470
365
471
366
Create and return a pre- configured Producer instance.
472
367
The Producer is initialized with params passed to Application.
@@ -501,7 +396,7 @@ with app.get_producer() as producer:
501
396
def get_consumer(auto_commit_enable: bool = True ) -> Consumer
502
397
```
503
398
504
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main / quixstreams/ app.py# L695 )
399
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ chore / reorganize - connectors / quixstreams/ app.py# L580 )
505
400
506
401
Create and return a pre- configured Consumer instance.
507
402
@@ -558,7 +453,7 @@ with app.get_consumer() as consumer:
558
453
def clear_state()
559
454
```
560
455
561
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main / quixstreams/ app.py# L745 )
456
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ chore / reorganize - connectors / quixstreams/ app.py# L630 )
562
457
563
458
Clear the state of the application.
564
459
@@ -572,10 +467,12 @@ Clear the state of the application.
572
467
def add_source(source: BaseSource, topic: Optional[Topic] = None ) -> Topic
573
468
```
574
469
575
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main / quixstreams/ app.py# L751 )
470
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ chore / reorganize - connectors / quixstreams/ app.py# L636 )
576
471
577
472
Add a source to the application.
578
473
474
+ Use when no transformations (which requires a `StreamingDataFrame` ) are needed.
475
+
579
476
See :class :`quixstreams.sources.base.BaseSource` for more details.
580
477
581
478
@@ -593,10 +490,10 @@ Default: the source default
593
490
# ### Application.run
594
491
595
492
```python
596
- def run(dataframe: StreamingDataFrame)
493
+ def run(dataframe: Optional[ StreamingDataFrame] = None )
597
494
```
598
495
599
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main / quixstreams/ app.py# L770 )
496
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ chore / reorganize - connectors / quixstreams/ app.py# L657 )
600
497
601
498
Start processing data from Kafka using provided `StreamingDataFrame`
602
499
@@ -622,12 +519,6 @@ df.apply(lambda value, context: print('New message', value)
622
519
app.run()
623
520
```
624
521
625
-
626
- < br>
627
- ** * Arguments:***
628
-
629
- - `dataframe` : instance of `StreamingDataFrame`
630
-
631
522
< a id = " quixstreams.app.Application.setup_topics" >< / a>
632
523
633
524
< br>< br>
@@ -638,7 +529,7 @@ app.run()
638
529
def setup_topics()
639
530
```
640
531
641
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main / quixstreams/ app.py# L890 )
532
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ chore / reorganize - connectors / quixstreams/ app.py# L779 )
642
533
643
534
Validate and create the topics
644
535
@@ -650,7 +541,7 @@ Validate and create the topics
650
541
class ApplicationConfig(BaseSettings)
651
542
```
652
543
653
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main / quixstreams/ app.py# L1066 )
544
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ chore / reorganize - connectors / quixstreams/ app.py# L955 )
654
545
655
546
Immutable object holding the application configuration
656
547
@@ -673,7 +564,7 @@ def settings_customise_sources(
673
564
) -> Tuple[PydanticBaseSettingsSource, ... ]
674
565
```
675
566
676
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main / quixstreams/ app.py# L1101 )
567
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ chore / reorganize - connectors / quixstreams/ app.py# L990 )
677
568
678
569
Included to ignore reading/ setting values from the environment
679
570
@@ -687,7 +578,7 @@ Included to ignore reading/setting values from the environment
687
578
def copy(** kwargs) -> Self
688
579
```
689
580
690
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main / quixstreams/ app.py# L1114 )
581
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ chore / reorganize - connectors / quixstreams/ app.py# L1003 )
691
582
692
583
Update the application config and return a copy
693
584
0 commit comments