10
10
class Application ()
11
11
```
12
12
13
- [[VIEW SOURCE ]](https :// github .com / quixio / quix - streams / blob / main / quixstreams / app .py # L61 )
13
+ [[VIEW SOURCE ]](https :// github .com / quixio / quix - streams / blob / main / quixstreams / app .py # L63 )
14
14
15
15
The main Application class .
16
16
@@ -74,7 +74,7 @@ def __init__(broker_address: Optional[Union[str, ConnectionConfig]] = None,
74
74
on_message_processed: Optional[MessageProcessedCallback] = None ,
75
75
consumer_poll_timeout: float = 1.0 ,
76
76
producer_poll_timeout: float = 0.0 ,
77
- loglevel: Optional[LogLevel] = " INFO" ,
77
+ loglevel: Optional[Union[ int , LogLevel] ] = " INFO" ,
78
78
auto_create_topics: bool = True ,
79
79
use_changelog_topics: bool = True ,
80
80
quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None ,
@@ -84,7 +84,7 @@ def __init__(broker_address: Optional[Union[str, ConnectionConfig]] = None,
84
84
processing_guarantee: ProcessingGuarantee = " at-least-once" )
85
85
```
86
86
87
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L99 )
87
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L101 )
88
88
89
89
90
90
< br>
@@ -184,7 +184,7 @@ def Quix(
184
184
on_message_processed: Optional[MessageProcessedCallback] = None ,
185
185
consumer_poll_timeout: float = 1.0 ,
186
186
producer_poll_timeout: float = 0.0 ,
187
- loglevel: Optional[LogLevel] = " INFO" ,
187
+ loglevel: Optional[Union[ int , LogLevel] ] = " INFO" ,
188
188
quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None ,
189
189
auto_create_topics: bool = True ,
190
190
use_changelog_topics: bool = True ,
@@ -196,7 +196,7 @@ def Quix(
196
196
) -> Self
197
197
```
198
198
199
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L353 )
199
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L350 )
200
200
201
201
> *** NOTE :*** DEPRECATED : use Application with `quix_sdk_token` argument instead.
202
202
@@ -307,7 +307,7 @@ def topic(name: str,
307
307
timestamp_extractor: Optional[TimestampExtractor] = None ) -> Topic
308
308
```
309
309
310
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L494 )
310
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L491 )
311
311
312
312
Create a topic definition.
313
313
@@ -385,10 +385,11 @@ topic = app.topic("input-topic", timestamp_extractor=custom_ts_extractor)
385
385
# ### Application.dataframe
386
386
387
387
```python
388
- def dataframe(topic: Topic) -> StreamingDataFrame
388
+ def dataframe(topic: Optional[Topic] = None ,
389
+ source: Optional[BaseSource] = None ) -> StreamingDataFrame
389
390
```
390
391
391
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L574 )
392
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L571 )
392
393
393
394
A simple helper method that generates a `StreamingDataFrame` , which is used
394
395
@@ -438,7 +439,7 @@ to be used as an input topic.
438
439
def stop(fail: bool = False )
439
440
```
440
441
441
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L613 )
442
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L617 )
442
443
443
444
Stop the internal poll loop and the message processing.
444
445
@@ -465,7 +466,7 @@ to unhandled exception, and it shouldn't commit the current checkpoint.
465
466
def get_producer() -> Producer
466
467
```
467
468
468
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L636 )
469
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L662 )
469
470
470
471
Create and return a pre- configured Producer instance.
471
472
The Producer is initialized with params passed to Application.
@@ -500,7 +501,7 @@ with app.get_producer() as producer:
500
501
def get_consumer(auto_commit_enable: bool = True ) -> Consumer
501
502
```
502
503
503
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L666 )
504
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L692 )
504
505
505
506
Create and return a pre- configured Consumer instance.
506
507
The Consumer is initialized with params passed to Application.
@@ -549,10 +550,34 @@ with app.get_consumer() as consumer:
549
550
def clear_state()
550
551
```
551
552
552
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L713 )
553
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L739 )
553
554
554
555
Clear the state of the application.
555
556
557
+ < a id = " quixstreams.app.Application.add_source" >< / a>
558
+
559
+ < br>< br>
560
+
561
+ # ### Application.add\_source
562
+
563
+ ```python
564
+ def add_source(source: BaseSource, topic: Optional[Topic] = None ) -> Topic
565
+ ```
566
+
567
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L745)
568
+
569
+ Add a source to the application.
570
+
571
+ See :class :`quixstreams.sources.base.BaseSource` for more details.
572
+
573
+
574
+ < br>
575
+ ** * Arguments:***
576
+
577
+ - `source` : a :class :`quixstreams.sources.BaseSource` instance
578
+ - `topic` : the :class :`quixstreams.models.Topic` instance the source will produce to
579
+ Default: the source default
580
+
556
581
< a id = " quixstreams.app.Application.run" >< / a>
557
582
558
583
< br>< br>
@@ -563,7 +588,7 @@ Clear the state of the application.
563
588
def run(dataframe: StreamingDataFrame)
564
589
```
565
590
566
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L719 )
591
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L764 )
567
592
568
593
Start processing data from Kafka using provided `StreamingDataFrame`
569
594
@@ -603,7 +628,7 @@ app.run(dataframe=df)
603
628
class ApplicationConfig(BaseSettings)
604
629
```
605
630
606
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L975 )
631
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1049 )
607
632
608
633
Immutable object holding the application configuration
609
634
@@ -626,7 +651,7 @@ def settings_customise_sources(
626
651
) -> Tuple[PydanticBaseSettingsSource, ... ]
627
652
```
628
653
629
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1009 )
654
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1083 )
630
655
631
656
Included to ignore reading/ setting values from the environment
632
657
@@ -640,7 +665,7 @@ Included to ignore reading/setting values from the environment
640
665
def copy(** kwargs) -> Self
641
666
```
642
667
643
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1022 )
668
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1096 )
644
669
645
670
Update the application config and return a copy
646
671
0 commit comments