10
10
class Application ()
11
11
```
12
12
13
- [[VIEW SOURCE ]](https :// github .com / quixio / quix - streams / blob / main / quixstreams / app .py # L89 )
13
+ [[VIEW SOURCE ]](https :// github .com / quixio / quix - streams / blob / main / quixstreams / app .py # L94 )
14
14
15
15
The main Application class .
16
16
@@ -82,10 +82,11 @@ def __init__(broker_address: Optional[Union[str, ConnectionConfig]] = None,
82
82
topic_manager: Optional[TopicManager] = None ,
83
83
request_timeout: float = 30 ,
84
84
topic_create_timeout: float = 60 ,
85
- processing_guarantee: ProcessingGuarantee = " at-least-once" )
85
+ processing_guarantee: ProcessingGuarantee = " at-least-once" ,
86
+ max_partition_buffer_size: int = 10000 )
86
87
```
87
88
88
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L127 )
89
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L132 )
89
90
90
91
91
92
< br>
@@ -148,6 +149,12 @@ Default - `True`
148
149
- `request_timeout` : timeout (seconds) for REST - based requests
149
150
- `topic_create_timeout` : timeout (seconds) for topic create finalization
150
151
- `processing_guarantee` : Use " exactly-once" or " at-least-once" processing.
152
+ - `max_partition_buffer_size` : the maximum number of messages to buffer per topic partition to consider it full.
153
+ The buffering is used to consume messages in - order between multiple partitions with the same number.
154
+ It is a soft limit, and the actual number of buffered messages can be up to x2 higher.
155
+ Lower value decreases the memory use, but increases the latency.
156
+ Default - `10000 ` .
157
+
151
158
< br>< br> *** Error Handlers*** < br>
152
159
To handle errors, `Application` accepts callbacks triggered when
153
160
exceptions occur on different stages of stream processing. If the callback
@@ -175,7 +182,7 @@ instead of the default one.
175
182
def Quix(cls , * args, ** kwargs)
176
183
```
177
184
178
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L374 )
185
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L389 )
179
186
180
187
RAISES EXCEPTION : DEPRECATED .
181
188
@@ -198,7 +205,7 @@ def topic(name: str,
198
205
timestamp_extractor: Optional[TimestampExtractor] = None ) -> Topic
199
206
```
200
207
201
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L406 )
208
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L421 )
202
209
203
210
Create a topic definition.
204
211
@@ -280,7 +287,7 @@ def dataframe(topic: Optional[Topic] = None,
280
287
source: Optional[BaseSource] = None ) -> StreamingDataFrame
281
288
```
282
289
283
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L486 )
290
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L501 )
284
291
285
292
A simple helper method that generates a `StreamingDataFrame` , which is used
286
293
@@ -336,7 +343,7 @@ to be used as an input topic.
336
343
def stop(fail: bool = False )
337
344
```
338
345
339
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L542 )
346
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L557 )
340
347
341
348
Stop the internal poll loop and the message processing.
342
349
@@ -363,7 +370,7 @@ to unhandled exception, and it shouldn't commit the current checkpoint.
363
370
def get_producer() -> Producer
364
371
```
365
372
366
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L587 )
373
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L602 )
367
374
368
375
Create and return a pre- configured Producer instance.
369
376
The Producer is initialized with params passed to Application.
@@ -398,7 +405,7 @@ with app.get_producer() as producer:
398
405
def get_consumer(auto_commit_enable: bool = True ) -> Consumer
399
406
```
400
407
401
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L641 )
408
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L657 )
402
409
403
410
Create and return a pre- configured Consumer instance.
404
411
@@ -455,7 +462,7 @@ with app.get_consumer() as consumer:
455
462
def clear_state()
456
463
```
457
464
458
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L690 )
465
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L706 )
459
466
460
467
Clear the state of the application.
461
468
@@ -469,7 +476,7 @@ Clear the state of the application.
469
476
def add_source(source: BaseSource, topic: Optional[Topic] = None ) -> Topic
470
477
```
471
478
472
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L696 )
479
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L712 )
473
480
474
481
Add a source to the application.
475
482
@@ -498,7 +505,7 @@ def run(dataframe: Optional[StreamingDataFrame] = None,
498
505
count: int = 0 )
499
506
```
500
507
501
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L729 )
508
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L745 )
502
509
503
510
Start processing data from Kafka using provided `StreamingDataFrame`
504
511
@@ -565,7 +572,7 @@ Default = 0 (infinite)
565
572
class ApplicationConfig(BaseSettings)
566
573
```
567
574
568
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1070 )
575
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1089 )
569
576
570
577
Immutable object holding the application configuration
571
578
@@ -588,7 +595,7 @@ def settings_customise_sources(
588
595
) -> Tuple[PydanticBaseSettingsSource, ... ]
589
596
```
590
597
591
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1105 )
598
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1125 )
592
599
593
600
Included to ignore reading/ setting values from the environment
594
601
@@ -602,7 +609,7 @@ Included to ignore reading/setting values from the environment
602
609
def copy(** kwargs) -> " ApplicationConfig"
603
610
```
604
611
605
- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1118 )
612
+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ app.py# L1138 )
606
613
607
614
Update the application config and return a copy
608
615
0 commit comments