Skip to content

Commit b607a52

Browse files
authored
Feature: Sliding Windows (#515)
* Store timestamp in state value * Decouple state operations from aggregate functions * Check max expired window start instead of min valid window end * Pass watermark to expire_windows * Decouple window deletion from window expiration * Feature: Sliding Windows * Do not emit right windows via .final() * Do not emit right windows via .current() * Move SlidingWindow to a separate module * Don't use deque * Replace watermark with max_start_time * Set expiration watermark only once * Remove timeit tests These were complicated. Custom performance tests are better. * Refactoring tests and fixing problems * Test presence of exact windows in the state * Log late_by_ms from FixedTimeWindow * Rename watermark variables * Log expired windows from SlidingWindow * Correct existing windowing docs Corrected timestamp keys and changed temperature readings to numbers that are easier to calculate without calculator and also look more familiar in both Celsius and Fahrenheit scales. * Add sliding windows docs * Create docstring describing sliding window algorithm * Fix imports * Fix TestWindowedRocksDBPartitionTransaction after rebase * Latest deleted window timestamps per key * Create helper TimestampsCache class
1 parent 40fe296 commit b607a52

File tree

16 files changed

+1608
-288
lines changed

16 files changed

+1608
-288
lines changed

docs/windowing.md

Lines changed: 102 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -167,17 +167,17 @@ Input:
167167

168168
(Here the `"timestamp"` column illustrates Kafka message timestamps)
169169
```json
170-
{"temperature": 65, "timestamp": 100}
171-
{"temperature": 52, "timestamp": 200}
172-
{"temperature": 61, "timestamp": 300}
170+
{"temperature": 30, "timestamp": 100}
171+
{"temperature": 29, "timestamp": 200}
172+
{"temperature": 28, "timestamp": 300}
173173
```
174174

175175
Expected output:
176176

177177
```json
178-
{"avg_temperature": 65, "window_start": 0, "window_end": 3600000}
179-
{"avg_temperature": 58.5, "window_start": 0, "window_end": 3600000}
180-
{"avg_temperature": 59.333, "window_start": 0, "window_end": 3600000}
178+
{"avg_temperature": 30, "window_start_ms": 0, "window_end_ms": 3600000}
179+
{"avg_temperature": 29.5, "window_start_ms": 0, "window_end_ms": 3600000}
180+
{"avg_temperature": 29, "window_start_ms": 0, "window_end_ms": 3600000}
181181
```
182182

183183
Here is how to do it using tumbling windows:
@@ -253,21 +253,21 @@ Input:
253253
(Here the `"timestamp"` column illustrates Kafka message timestamps)
254254

255255
```json
256-
{"temperature": 65, "timestamp": 50000}
257-
{"temperature": 52, "timestamp": 60000}
258-
{"temperature": 61, "timestamp": 62000}
256+
{"temperature": 30, "timestamp": 50000}
257+
{"temperature": 29, "timestamp": 60000}
258+
{"temperature": 28, "timestamp": 62000}
259259
```
260260

261261
Expected output:
262262

263263
```json
264-
{"avg_temperature": 65, "window_start": 0, "window_end": 3600000}
264+
{"avg_temperature": 30, "window_start_ms": 0, "window_end_ms": 3600000}
265265

266-
{"avg_temperature": 58.5, "window_start": 0, "window_end": 3600000}
267-
{"avg_temperature": 65, "window_start": 60000, "window_end": 4200000}
266+
{"avg_temperature": 29.5, "window_start_ms": 0, "window_end_ms": 3600000}
267+
{"avg_temperature": 30, "window_start_ms": 60000, "window_end_ms": 4200000}
268268

269-
{"avg_temperature": 59.333, "window_start": 0, "window_end": 3600000}
270-
{"avg_temperature": 56.5, "window_start": 60000, "window_end": 4200000}
269+
{"avg_temperature": 29, "window_start_ms": 0, "window_end_ms": 3600000}
270+
{"avg_temperature": 28.5, "window_start_ms": 60000, "window_end_ms": 4200000}
271271
```
272272

273273

@@ -305,6 +305,94 @@ sdf = (
305305
```
306306

307307

308+
## Sliding Windows
309+
Sliding windows are overlapping time-based windows that advance with each incoming message, rather than at fixed time intervals like hopping windows. They have a fixed 1 ms resolution and perform better and are less resource-intensive than hopping windows with a 1 ms step. Sliding windows do not produce redundant windows; every interval has a distinct aggregation.
310+
311+
Sliding windows provide optimal performance for tasks requiring high-precision real-time monitoring. However, if the task is not time-critical or the data stream is extremely dense, tumbling or hopping windows may perform better.
312+
313+
For example, a sliding window of 1 hour will generate the following intervals as messages A, B, C, and D arrive:
314+
315+
```
316+
Sliding Windows
317+
318+
Time
319+
[00:00:00.000, 01:00:00.000): ......A
320+
[00:00:00.001, 01:00:00.001): ......B
321+
[00:00:00.003, 01:00:00.003): ......C
322+
[00:00:00.006, 01:00:00.006): ......D
323+
324+
```
325+
326+
Note that both the start and the end of the interval are inclusive.
327+
328+
In sliding windows, each timestamp can be assigned to multiple intervals because these intervals overlap.
329+
330+
For example, a timestamp `01:33:13.000` will match intervals for all messages incoming between `01:33:13.000` and `02:33:13.000`. Borderline windows including this timestamp will be:
331+
332+
- `00:33:13.000 - 01:33:13.000`
333+
- `01:33:13.000 - 02:33:13.000`
334+
335+
336+
**Example:**
337+
338+
Imagine you receive temperature readings from sensors, and you need to calculate the average temperature for the last hour, producing updates for each incoming message. The message key is a sensor ID, so the aggregations will be grouped by each sensor.
339+
340+
Input:
341+
(Here the `"timestamp"` column illustrates Kafka message timestamps)
342+
343+
```json
344+
{"temperature": 30, "timestamp": 3600000}
345+
{"temperature": 29, "timestamp": 4800000}
346+
{"temperature": 28, "timestamp": 4800001}
347+
{"temperature": 27, "timestamp": 7200000}
348+
{"temperature": 26, "timestamp": 7200001}
349+
```
350+
351+
Expected output:
352+
353+
```json
354+
{"avg_temperature": 30, "window_start_ms": 0, "window_end_ms": 3600000}
355+
{"avg_temperature": 29.5, "window_start_ms": 1200000, "window_end_ms": 4800000}
356+
{"avg_temperature": 29, "window_start_ms": 1200001, "window_end_ms": 4800001}
357+
{"avg_temperature": 28.5, "window_start_ms": 3600000, "window_end_ms": 7200000}
358+
{"avg_temperature": 27.5, "window_start_ms": 3600001, "window_end_ms": 7200001} # reading 30 is outside of the window
359+
```
360+
361+
362+
```python
363+
from datetime import timedelta
364+
from quixstreams import Application
365+
366+
app = Application(...)
367+
sdf = app.dataframe(...)
368+
369+
sdf = (
370+
# Extract "temperature" value from the message
371+
sdf.apply(lambda value: value["temperature"])
372+
373+
# Define a sliding window of 1h
374+
# You can also pass duration_ms as integer of milliseconds
375+
.sliding_window(duration_ms=timedelta(hours=1))
376+
377+
# Specify the "mean" aggregate function
378+
.mean()
379+
380+
# Emit updates for each incoming message
381+
.current()
382+
383+
# Unwrap the aggregated result to match the expected output format
384+
.apply(
385+
lambda result: {
386+
"avg_temperature": result["value"],
387+
"window_start_ms": result["start"],
388+
"window_end_ms": result["end"],
389+
}
390+
)
391+
)
392+
393+
```
394+
395+
308396
## Supported Aggregations
309397

310398
Currently, windows support the following aggregation functions:

quixstreams/dataframe/dataframe.py

Lines changed: 90 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,11 @@
5252
from .registry import DataframeRegistry
5353
from .series import StreamingSeries
5454
from .utils import ensure_milliseconds
55-
from .windows import HoppingWindowDefinition, TumblingWindowDefinition
55+
from .windows import (
56+
HoppingWindowDefinition,
57+
SlidingWindowDefinition,
58+
TumblingWindowDefinition,
59+
)
5660

5761
ApplyCallbackStateful = Callable[[Any, State], Any]
5862
ApplyWithMetadataCallbackStateful = Callable[[Any, Any, int, Any, State], Any]
@@ -821,11 +825,11 @@ def tumbling_window(
821825
.sum()
822826
823827
# Specify how the results should be emitted downstream.
824-
# "all()" will emit results as they come for each updated window,
828+
# "current()" will emit results as they come for each updated window,
825829
# possibly producing multiple messages per key-window pair
826830
# "final()" will emit windows only when they are closed and cannot
827831
# receive any updates anymore.
828-
.all()
832+
.current()
829833
)
830834
```
831835
@@ -900,11 +904,11 @@ def hopping_window(
900904
.sum()
901905
902906
# Specify how the results should be emitted downstream.
903-
# "all()" will emit results as they come for each updated window,
907+
# "current()" will emit results as they come for each updated window,
904908
# possibly producing multiple messages per key-window pair
905909
# "final()" will emit windows only when they are closed and cannot
906910
# receive any updates anymore.
907-
.all()
911+
.current()
908912
)
909913
```
910914
@@ -951,6 +955,87 @@ def hopping_window(
951955
name=name,
952956
)
953957

958+
def sliding_window(
959+
self,
960+
duration_ms: Union[int, timedelta],
961+
grace_ms: Union[int, timedelta] = 0,
962+
name: Optional[str] = None,
963+
) -> SlidingWindowDefinition:
964+
"""
965+
Create a sliding window transformation on this StreamingDataFrame.
966+
Sliding windows continuously evaluate the stream with a fixed step of 1 ms
967+
allowing for overlapping, but not redundant windows of a fixed size.
968+
969+
Sliding windows are similar to hopping windows with step_ms set to 1,
970+
but are siginificantly more perforant.
971+
972+
They allow performing stateful aggregations like `sum`, `reduce`, etc.
973+
on top of the data and emit results downstream.
974+
975+
Notes:
976+
977+
- The timestamp of the aggregation result is set to the window start timestamp.
978+
- Every window is grouped by the current Kafka message key.
979+
- Messages with `None` key will be ignored.
980+
- The time windows always use the current event time.
981+
- Windows are inclusive on both the start end end time.
982+
- Every window contains a distinct aggregation.
983+
984+
Example Snippet:
985+
986+
```python
987+
app = Application()
988+
sdf = app.dataframe(...)
989+
990+
sdf = (
991+
# Define a sliding window of 60s with a grace period of 10s
992+
sdf.sliding_window(
993+
duration_ms=timedelta(seconds=60),
994+
grace_ms=timedelta(seconds=10)
995+
)
996+
997+
# Specify the aggregation function
998+
.sum()
999+
1000+
# Specify how the results should be emitted downstream.
1001+
# "current()" will emit results as they come for each updated window,
1002+
# possibly producing multiple messages per key-window pair
1003+
# "final()" will emit windows only when they are closed and cannot
1004+
# receive any updates anymore.
1005+
.current()
1006+
)
1007+
```
1008+
1009+
:param duration_ms: The length of each window.
1010+
Can be specified as either an `int` representing milliseconds or a
1011+
`timedelta` object.
1012+
>***NOTE:*** `timedelta` objects will be rounded to the closest millisecond
1013+
value.
1014+
1015+
:param grace_ms: The grace period for data arrival.
1016+
It allows late-arriving data (data arriving after the window
1017+
has theoretically closed) to be included in the window.
1018+
Can be specified as either an `int` representing milliseconds
1019+
or as a `timedelta` object.
1020+
>***NOTE:*** `timedelta` objects will be rounded to the closest millisecond
1021+
value.
1022+
1023+
:param name: The unique identifier for the window. If not provided, it will be
1024+
automatically generated based on the window's properties.
1025+
1026+
:return: `SlidingWindowDefinition` instance representing the sliding window
1027+
configuration.
1028+
This object can be further configured with aggregation functions
1029+
like `sum`, `count`, etc. applied to the StreamingDataFrame.
1030+
"""
1031+
1032+
duration_ms = ensure_milliseconds(duration_ms)
1033+
grace_ms = ensure_milliseconds(grace_ms)
1034+
1035+
return SlidingWindowDefinition(
1036+
duration_ms=duration_ms, grace_ms=grace_ms, dataframe=self, name=name
1037+
)
1038+
9541039
def drop(
9551040
self,
9561041
columns: Union[str, List[str]],
Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
from .base import WindowResult
2-
from .definitions import HoppingWindowDefinition, TumblingWindowDefinition
2+
from .definitions import (
3+
HoppingWindowDefinition,
4+
SlidingWindowDefinition,
5+
TumblingWindowDefinition,
6+
)
37

48
__all__ = [
59
"HoppingWindowDefinition",
10+
"SlidingWindowDefinition",
611
"TumblingWindowDefinition",
712
"WindowResult",
813
]

quixstreams/dataframe/windows/base.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,14 @@
33

44
from typing_extensions import TypedDict
55

6-
from quixstreams.state import WindowedState
7-
86

97
class WindowResult(TypedDict):
108
start: int
119
end: int
1210
value: Any
1311

1412

15-
WindowAggregateFunc = Callable[[int, int, int, Any, WindowedState], Any]
13+
WindowAggregateFunc = Callable[[Any, Any], Any]
1614
WindowMergeFunc = Callable[[Any], Any]
1715

1816

0 commit comments

Comments
 (0)