@@ -40,19 +40,27 @@ To use a different timestamp in window aggregations, you need to provide a
40
40
41
41
A timestamp extractor is a callable object accepting these positional arguments:
42
42
43
- - Message value
44
- - Message headers
45
- - Kafka message timestamp in milliseconds
46
- - Kafka timestamp type:
47
- - "0" - timestamp not available,
48
- - "1" - "create time" (specified by a producer)
49
- - "2" - "log-append time" (when the broker received a message)
43
+ 1 . Message value
44
+ 2 . Message headers
45
+ 3 . Kafka message timestamp in milliseconds
46
+ 4 . Kafka timestamp type.
47
+
48
+ Kafka timestamp type can have these values:
49
+
50
+ * ` 0 ` - timestamp not available,
51
+ * ` 1 ` - "create time" (specified by a producer)
52
+ * ` 2 ` - "log-append time" (when the broker received a message)
50
53
51
54
Timestamp extractor must always return timestamp ** as an integer in milliseconds** .
52
55
53
56
Example:
54
57
55
58
``` python
59
+ from quixstreams import Application
60
+ from quixstreams.models import TimestampType
61
+ from typing import Any, Optional, List, Tuple
62
+
63
+ # Create an Application
56
64
app = Application(... )
57
65
58
66
@@ -71,6 +79,12 @@ def custom_ts_extractor(
71
79
# Passing the timestamp extractor to the topic.
72
80
# The window functions will now use the extracted timestamp instead of the Kafka timestamp.
73
81
topic = app.topic(" input-topic" , timestamp_extractor = custom_ts_extractor)
82
+
83
+ # Create a StreamingDataFrame and keep processing
84
+ sdf = app.dataframe(topic)
85
+
86
+ if __name__ == ' __main__' :
87
+ app.run()
74
88
```
75
89
76
90
### Timestamps of the aggregation results
@@ -80,8 +94,11 @@ Since version 2.6, all windowed aggregations always set timestamps equal to the
80
94
** Example:**
81
95
82
96
``` python
97
+ from quixstreams import Application
83
98
from datetime import timedelta
84
99
100
+ app = Application(... )
101
+
85
102
sdf = app.dataframe(... )
86
103
87
104
# Input:
@@ -167,6 +184,11 @@ Here is how to do it using tumbling windows:
167
184
168
185
``` python
169
186
from datetime import timedelta
187
+ from quixstreams import Application
188
+
189
+ app = Application(... )
190
+ sdf = app.dataframe(... )
191
+
170
192
171
193
sdf = (
172
194
# Extract "temperature" value from the message
@@ -251,7 +273,10 @@ Expected output:
251
273
252
274
``` python
253
275
from datetime import timedelta
276
+ from quixstreams import Application
254
277
278
+ app = Application(... )
279
+ sdf = app.dataframe(... )
255
280
256
281
sdf = (
257
282
# Extract "temperature" value from the message
@@ -324,7 +349,9 @@ Here is how you can do that with `reduce()`:
324
349
325
350
``` python
326
351
from datetime import timedelta
352
+ from quixstreams import Application
327
353
354
+ app = Application(... )
328
355
sdf = app.dataframe(... )
329
356
330
357
@@ -397,7 +424,9 @@ Count all received events over a 10-minute tumbling window.
397
424
398
425
``` python
399
426
from datetime import timedelta
427
+ from quixstreams import Application
400
428
429
+ app = Application(... )
401
430
sdf = app.dataframe(... )
402
431
403
432
@@ -435,7 +464,9 @@ Imagine you receive the temperature data from the sensor, and you need to calcul
435
464
436
465
``` python
437
466
from datetime import timedelta
467
+ from quixstreams import Application
438
468
469
+ app = Application(... )
439
470
sdf = app.dataframe(... )
440
471
441
472
# Input:
@@ -475,6 +506,12 @@ Since it is rather generic, you may need to transform it into your own schema.
475
506
Here is how you can do that:
476
507
477
508
``` python
509
+ from datetime import timedelta
510
+ from quixstreams import Application
511
+
512
+ app = Application(... )
513
+ sdf = app.dataframe(... )
514
+
478
515
sdf = (
479
516
# Define a tumbling window of 10 minutes
480
517
sdf.tumbling_window(timedelta(minutes = 10 ))
@@ -520,8 +557,9 @@ Example:
520
557
521
558
``` python
522
559
from datetime import timedelta
560
+ from quixstreams import Application
523
561
524
-
562
+ app = Application( ... )
525
563
sdf = app.dataframe(... )
526
564
527
565
# Define a 1 hour tumbling window with a grace period of 10 seconds.
@@ -549,8 +587,9 @@ To emit results for each processed message in the stream, use the following API:
549
587
550
588
``` python
551
589
from datetime import timedelta
590
+ from quixstreams import Application
552
591
553
-
592
+ app = Application( ... )
554
593
sdf = app.dataframe(... )
555
594
556
595
# Calculate a sum of values over a window of 10 seconds
@@ -576,8 +615,9 @@ Here is how to emit results only once for each window interval after it's closed
576
615
577
616
``` python
578
617
from datetime import timedelta
618
+ from quixstreams import Application
579
619
580
-
620
+ app = Application( ... )
581
621
sdf = app.dataframe(... )
582
622
583
623
# Calculate a sum of values over a window of 10 seconds
0 commit comments