10
10
class Application ()
11
11
```
12
12
13
- [[VIEW SOURCE ]](https :// github .com / quixio / quix - streams / blob / main / quixstreams / app .py # L80 )
13
+ [[VIEW SOURCE ]](https :// github .com / quixio / quix - streams / blob / main / quixstreams / app .py # L90 )
14
14
15
15
The main Application class .
16
16
@@ -85,7 +85,7 @@ def __init__(broker_address: Optional[Union[str, ConnectionConfig]] = None,
85
85
processing_guarantee: ProcessingGuarantee = " at-least-once" )
86
86
```
87
87
88
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L118 )
88
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L128 )
89
89
90
90
91
91
< br>
@@ -174,7 +174,7 @@ instead of the default one.
174
174
def Quix(cls , * args, ** kwargs)
175
175
```
176
176
177
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L364 )
177
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L375 )
178
178
179
179
RAISES EXCEPTION : DEPRECATED .
180
180
@@ -197,7 +197,7 @@ def topic(name: str,
197
197
timestamp_extractor: Optional[TimestampExtractor] = None ) -> Topic
198
198
```
199
199
200
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L396 )
200
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L407 )
201
201
202
202
Create a topic definition.
203
203
@@ -279,7 +279,7 @@ def dataframe(topic: Optional[Topic] = None,
279
279
source: Optional[BaseSource] = None ) -> StreamingDataFrame
280
280
```
281
281
282
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L476 )
282
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L487 )
283
283
284
284
A simple helper method that generates a `StreamingDataFrame` , which is used
285
285
@@ -335,7 +335,7 @@ to be used as an input topic.
335
335
def stop(fail: bool = False )
336
336
```
337
337
338
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L532 )
338
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L543 )
339
339
340
340
Stop the internal poll loop and the message processing.
341
341
@@ -362,7 +362,7 @@ to unhandled exception, and it shouldn't commit the current checkpoint.
362
362
def get_producer() -> Producer
363
363
```
364
364
365
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L577 )
365
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L588 )
366
366
367
367
Create and return a pre- configured Producer instance.
368
368
The Producer is initialized with params passed to Application.
@@ -397,7 +397,7 @@ with app.get_producer() as producer:
397
397
def get_consumer(auto_commit_enable: bool = True ) -> Consumer
398
398
```
399
399
400
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L632 )
400
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L643 )
401
401
402
402
Create and return a pre- configured Consumer instance.
403
403
@@ -454,7 +454,7 @@ with app.get_consumer() as consumer:
454
454
def clear_state()
455
455
```
456
456
457
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L682 )
457
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L693 )
458
458
459
459
Clear the state of the application.
460
460
@@ -468,7 +468,7 @@ Clear the state of the application.
468
468
def add_source(source: BaseSource, topic: Optional[Topic] = None ) -> Topic
469
469
```
470
470
471
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L688 )
471
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L699 )
472
472
473
473
Add a source to the application.
474
474
@@ -492,16 +492,41 @@ Note: the names of default topics are prefixed with "source__".
492
492
# ### Application.run
493
493
494
494
```python
495
- def run(dataframe: Optional[StreamingDataFrame] = None )
495
+ def run(dataframe: Optional[StreamingDataFrame] = None ,
496
+ timeout: float = 0.0 ,
497
+ count: int = 0 )
496
498
```
497
499
498
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L721 )
500
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L732 )
499
501
500
502
Start processing data from Kafka using provided `StreamingDataFrame`
501
503
502
504
Once started, it can be safely terminated with a `SIGTERM ` signal
503
505
(like Kubernetes does) or a typical `KeyboardInterrupt ` (`Ctrl+ C` ).
504
506
507
+ Alternatively, stop conditions can be set (typically for debugging purposes);
508
+ has the option of stopping after a number of messages, timeout, or both.
509
+
510
+ Not setting a timeout or count limit will result in the Application running
511
+ indefinitely (expected production behavior).
512
+
513
+
514
+ Stop Condition Details:
515
+
516
+ A timeout will immediately stop an Application once no new messages have
517
+ been consumed after T seconds (after rebalance and recovery).
518
+
519
+ A count will process N total records from ANY input / SDF topics (so
520
+ multiple input topics will very likely differ in their consume total!) after
521
+ an initial rebalance and recovery.
522
+ THEN , any remaining processes from things such as groupby (which uses internal
523
+ topics) will also be validated to ensure the results of said messages are
524
+ fully processed (this does NOT count towards the process total).
525
+ Note that without a timeout, the Application runs until the count is hit.
526
+
527
+ If timeout and count are used together (which is the recommended pattern for
528
+ debugging), either condition will trigger a stop.
529
+
505
530
506
531
507
532
< br>
@@ -518,9 +543,19 @@ topic = app.topic('test-topic')
518
543
df = app.dataframe(topic)
519
544
df.apply(lambda value , context : print (' New message' , value)
520
545
521
- app.run()
546
+ app.run() # could pass `timeout=5` here, for example
522
547
```
523
548
549
+
550
+ < br>
551
+ ** * Arguments:***
552
+
553
+ - `dataframe` : DEPRECATED - do not use; sdfs are now automatically tracked.
554
+ - `timeout` : maximum time to wait for a new message.
555
+ Default = 0.0 (infinite)
556
+ - `count` : how many input topic messages to process before stopping.
557
+ Default = 0 (infinite)
558
+
524
559
< a id = " quixstreams.app.Application.setup_topics" >< / a>
525
560
526
561
< br>< br>
@@ -531,7 +566,7 @@ app.run()
531
566
def setup_topics()
532
567
```
533
568
534
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L846 )
569
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L910 )
535
570
536
571
Validate the application topics
537
572
@@ -543,7 +578,7 @@ Validate the application topics
543
578
class ApplicationConfig(BaseSettings)
544
579
```
545
580
546
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1014 )
581
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1081 )
547
582
548
583
Immutable object holding the application configuration
549
584
@@ -566,7 +601,7 @@ def settings_customise_sources(
566
601
) -> Tuple[PydanticBaseSettingsSource, ... ]
567
602
```
568
603
569
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1049 )
604
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1116 )
570
605
571
606
Included to ignore reading/ setting values from the environment
572
607
@@ -580,7 +615,7 @@ Included to ignore reading/setting values from the environment
580
615
def copy(** kwargs) -> Self
581
616
```
582
617
583
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1062 )
618
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1129 )
584
619
585
620
Update the application config and return a copy
586
621
0 commit comments