Skip to content

Commit f2c9027

Browse files
tim-quixgwaramadzedaniil-quix
authored
Stop conditions with last message received timeout (#780)
Co-authored-by: Remy Gwaramadze <gwaramadze@users.noreply.github.com> Co-authored-by: Daniil Gusev <daniil@quix.io>
1 parent 6f7938f commit f2c9027

File tree

14 files changed

+1063
-121
lines changed

14 files changed

+1063
-121
lines changed

docs/debugging.md

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
# Inspecting Data and Debugging
2+
3+
Sometimes, there is a need for more direct insight around your data processing with
4+
a `StreamingDataFrame`, especially with specific operations.
5+
6+
Here are some helpful tools or strategies for inspecting your `Application` and data,
7+
whether for debugging or confirming things are working as expected!
8+
9+
## Printing Data
10+
11+
This is generally the simplest approach for inspecting your `Application`.
12+
13+
You can print the current record's value at desired points in a `StreamingDataFrame`.
14+
15+
### Single Record Printing
16+
17+
The most log-friendly approach (especially if you do not have a live terminal
18+
session, such as when running in Kubernetes) is `StreamingDataFrame.print()`,
19+
which prints the current record value wherever it's called.
20+
21+
It's a multi-line print by default, but it can be single line with kwarg `pretty=False`.
22+
23+
It can additionally include the record metadata with kwarg `metadata=True`:
24+
25+
```python
26+
sdf = app.dataframe(...)
27+
# some SDF transformations happening here ...
28+
29+
# Print the current record's value, key, timestamp and headers
30+
sdf.print(metadata=True)
31+
# It will print the record's data wrapped into a dict for readability:
32+
# { 'value': {'number': 12183},
33+
# 'key': b'key',
34+
# 'timestamp': 1721129697951,
35+
# 'headers': [('header_name', b'header-value')]
36+
# }
37+
```
38+
39+
40+
### Table Printing
41+
42+
A more user-friendly way to monitor your data stream is using `StreamingDataFrame.print_table()`.
43+
It creates a live-updating table that shows the most recent records:
44+
45+
```python
46+
sdf = app.dataframe(...)
47+
# some SDF transformations happening here ...
48+
49+
# Show last 5 records with metadata columns
50+
sdf.print_table(size=5, title="My Stream")
51+
52+
# For wide datasets, limit columns to improve readability
53+
sdf.print_table(
54+
size=5,
55+
title="My Stream",
56+
columns=["id", "name", "value"],
57+
column_widths={"name": 20}
58+
)
59+
```
60+
61+
This will produce a live table like:
62+
63+
```
64+
My Stream
65+
┏━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┓
66+
┃ _key ┃ _timestamp ┃ id ┃ name ┃ value ┃
67+
┡━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━┩
68+
│ b'53fe8e4' │ 1738685136 │ 876 │ Charlie │ 42.5 │
69+
│ b'91bde51' │ 1738685137 │ 11 │ Alice │ 18.3 │
70+
│ b'6617dfe' │ 1738685138 │ 133 │ Bob │ 73.1 │
71+
│ b'f47ac93' │ 1738685139 │ 244 │ David │ 55.7 │
72+
│ b'038e524' │ 1738685140 │ 567 │ Eve │ 31.9 │
73+
└────────────┴────────────┴────────┴──────────────────────┴─────────┘
74+
```
75+
76+
Note that the "name" column is resized to desired width of 20 characters.
77+
78+
You can monitor multiple points in your pipeline by adding multiple print_table calls:
79+
80+
```python
81+
sdf = app.dataframe(topic)
82+
sdf.print_table(title="Raw Input")
83+
84+
sdf = sdf.filter(lambda value: ...)
85+
sdf.print_table(title="Filtered Values")
86+
87+
sdf = sdf.apply(lambda value: ...)
88+
sdf.print_table(title="Final Output")
89+
```
90+
91+
## Interacting with Data
92+
93+
If you'd like to store or manipulate data from `Application` processing directly, you can
94+
do an "interactive" `Application` by executing `Application.run()` within an
95+
iPython session (terminal/Pycharm/VS Code etc.) or Jupiter Notebook cell.
96+
97+
To do so, you'll need to both:
98+
99+
- Provide stop conditions to `Application.run()`
100+
- Use Quix Streams `ListSink` + `StreamingDataFrame.sink()` to store data.
101+
102+
Basically:
103+
104+
1. The `Application` runs for the specified period in the interactive session.
105+
2. While running, data is stored in the specified `ListSink` variable(s).
106+
3. Once the Application stops, those variable(s) are now accessible as normal.
107+
108+
The details of this pattern are further explained below.
109+
110+
### Application.run() stop conditions
111+
112+
In a production setting, `Application.run()` should be called only once with no
113+
arguments, which means it will run indefinitely (until it encounters an error or
114+
is manually stopped by the user).
115+
116+
However, for debugging, the following kwargs can be passed to stop the `Application`
117+
when the applicable condition is met:
118+
119+
- `timeout`: maximum time to wait for a new message (default `0.0` == infinite)
120+
- `count`: number of messages to process from main SDF input topics (default `0` == infinite)
121+
122+
If used together (which is the recommended pattern for debugging), either condition
123+
will trigger the stop.
124+
125+
126+
#### Count Behavior
127+
128+
There are a few of things to be aware of with `count`:
129+
130+
- It only counts messages from (input) topics passed by the user to a `StreamingDatFrame`.
131+
- this means things like repartition topics (group by) are NOT counted.
132+
133+
- It's a total message count _across all input topics_, NOT for each input topic
134+
- ex: for `count=20`, `topic_a` could get 5 messages, and `topic_b` 15 messages
135+
136+
- Things like `SDF.apply(expand=True`) and branching do not affect counts.
137+
138+
- AFTER the count is reached, the `Application` flushes any respective
139+
repartition topics so all downstream processing is included.
140+
- repartition highwaters are recorded when condition is met and is consumed up to
141+
those watermarks.
142+
143+
144+
#### Timeout Behavior
145+
146+
A couple things to note about `timeout`:
147+
148+
- Though it can be used standalone, it's recommended to be paired with a `count`.
149+
150+
- Tracking starts once the first partition assignment (or recovery, if needed) finishes.
151+
- There is a 60s wait buffer for the first assignment to trigger.
152+
153+
- Using only `timeout` when collecting data from a high-volume topic
154+
with a `ListSink` could cause out-of-memory errors.
155+
156+
#### Multiple Application.run() calls
157+
158+
It is safe to do subsequent `Application.run()` calls (even with
159+
new arguments!); it will simply pick up where it left off.
160+
161+
There is no need to do any manual cleanup when finished; each run cleans up after itself.
162+
163+
> **NOTE:** You do not need to re-execute the entire Application setup, just `.run()`.
164+
165+
### ListSink
166+
167+
`ListSink` primarily exists to use alongside `Application.run()` stop conditions.
168+
169+
It collects data where it's used, which can then be interacted with like
170+
a list once the `Application` stops.
171+
172+
#### Using ListSink
173+
174+
To use a `ListSink`, simply store one in a separate variable and pass it
175+
like any other `Sink`:
176+
177+
```python
178+
from quixstreams import Application
179+
from quixstreams.sinks.core.list import ListSink
180+
181+
app = Application(broker_address="localhost:9092")
182+
topic = app.topic("some-topic")
183+
list_sink = ListSink() # sink will be a list-like object
184+
sdf = app.dataframe(topic=topic).sink(list_sink)
185+
app.run(count=50, timeout=10) # get up to 50 records (stops if no messages for 10s)
186+
```
187+
188+
You can then interact with it once the `Application` stops:
189+
190+
```shell
191+
> print(list_sink)
192+
[{"thing": "x"}, {"thing": "y"}, {"thing": "z"}]
193+
194+
> list_sink[0]
195+
{"thing": "x"}
196+
```
197+
198+
> **NOTE:** Though the result is actually a `ListSink`, it behaves like a list...you can even pass it
199+
to `Pandas.DataFrame()` and it will work as expected!
200+
201+
202+
#### ListSink Limitations
203+
204+
- You can use any number of `ListSink` in an `Application`
205+
- each one must have its own variable.
206+
207+
- `ListLink` does not limit it's own size
208+
- Be sure to use it with `Application.run()` stopping conditions.
209+
210+
- `ListSink` does not "refresh" itself per `.run()`; it collects data indefinitely.
211+
- You can remove the current data stored by doing `list_sink.clear()`.
212+
213+
### Interactive limitations
214+
215+
Currently, Quix Streams `Source` functionality is limited due
216+
to `multiprocessing` `if __name__ == '__main__':` limitations; specifically:
217+
218+
- You cannot run any source-based `Application` "interactively"
219+
(i.e. using ListSink to inspect values)
220+
- There is potentially a way around this for [Jupiter Notebooks](connectors/sources/custom-sources.md#custom-sources-and-jupyter-notebook)
221+
- You can only run a plain source (no SDF) with the `timeout` argument.
222+
223+
## Using Breakpoints
224+
225+
For detailed examination, you can set breakpoints using `StreamingDataFrame.update()`:
226+
227+
```python
228+
import pdb
229+
230+
sdf = app.dataframe(...)
231+
# some SDF transformations happening here ...
232+
233+
# Set a breakpoint
234+
sdf.update(lambda value: pdb.set_trace())
235+
```
236+
237+
## Application loglevel to DEBUG
238+
239+
Though likely not helpful for the average user, it is possible to change the
240+
`Application` loglevel to `DEBUG` for more under-the-hood insight.
241+
242+
Just do `Application(loglevel="DEBUG")`.
243+
244+
> **NOTE**: This does NOT grant any insights into the message data directly.

docs/processing.md

Lines changed: 0 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -625,94 +625,6 @@ You may also achieve the same result with `sdf[sdf.apply()]` syntax:
625625
sdf = sdf[sdf.apply(lambda value: value['field_a'] > 0)]
626626
```
627627

628-
## Debugging
629-
630-
To debug code in `StreamingDataFrame`, you can use several built-in tools for
631-
data inspection and monitoring.
632-
633-
### Simple Value Printing
634-
635-
Using `StreamingDataFrame.print()` to print the current record's value and metadata in the stream:
636-
637-
```python
638-
sdf = app.dataframe(...)
639-
# some SDF transformations happening here ...
640-
641-
# Print the current record's value, key, timestamp and headers
642-
sdf.print(metadata=True)
643-
# It will print the record's data wrapped into a dict for readability:
644-
# { 'value': {'number': 12183},
645-
# 'key': b'key',
646-
# 'timestamp': 1721129697951,
647-
# 'headers': [('header_name', b'header-value')]
648-
# }
649-
```
650-
651-
### Table Printing
652-
653-
The most convenient way to monitor your data stream is using `StreamingDataFrame.print_table()`.
654-
It creates a live-updating table that shows the most recent records:
655-
656-
```python
657-
sdf = app.dataframe(...)
658-
# some SDF transformations happening here ...
659-
660-
# Show last 5 records with metadata columns
661-
sdf.print_table(size=5, title="My Stream")
662-
663-
# For wide datasets, limit columns to improve readability
664-
sdf.print_table(
665-
size=5,
666-
title="My Stream",
667-
columns=["id", "name", "value"],
668-
column_widths={"name": 20}
669-
)
670-
```
671-
672-
This will produce a live table like:
673-
674-
```
675-
My Stream
676-
┏━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┓
677-
┃ _key ┃ _timestamp ┃ id ┃ name ┃ value ┃
678-
┡━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━┩
679-
│ b'53fe8e4' │ 1738685136 │ 876 │ Charlie │ 42.5 │
680-
│ b'91bde51' │ 1738685137 │ 11 │ Alice │ 18.3 │
681-
│ b'6617dfe' │ 1738685138 │ 133 │ Bob │ 73.1 │
682-
│ b'f47ac93' │ 1738685139 │ 244 │ David │ 55.7 │
683-
│ b'038e524' │ 1738685140 │ 567 │ Eve │ 31.9 │
684-
└────────────┴────────────┴────────┴──────────────────────┴─────────┘
685-
```
686-
687-
Note that the "name" column is resized to desired width of 20 characters.
688-
689-
You can monitor multiple points in your pipeline by adding multiple print_table calls:
690-
691-
```python
692-
sdf = app.dataframe(topic)
693-
sdf.print_table(title="Raw Input")
694-
695-
sdf = sdf.filter(lambda value: ...)
696-
sdf.print_table(title="Filtered Values")
697-
698-
sdf = sdf.apply(lambda value: ...)
699-
sdf.print_table(title="Final Output")
700-
```
701-
702-
### Using Breakpoints
703-
704-
For detailed examination, you can set breakpoints using `StreamingDataFrame.update()`:
705-
706-
```python
707-
import pdb
708-
709-
sdf = app.dataframe(...)
710-
# some SDF transformations happening here ...
711-
712-
# Set a breakpoint
713-
sdf.update(lambda value: pdb.set_trace())
714-
```
715-
716628
## Updating Kafka Timestamps
717629

718630
In Quix Streams, each processed item has a timestamp assigned.

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ nav:
3535
- How to:
3636
- Produce Data to Kafka: producer.md
3737
- Process & Transform Data: processing.md
38+
- Inspecting Data & Debugging: debugging.md
3839
- GroupBy Operation: groupby.md
3940
- Windows & Aggregations: windowing.md
4041
- Configuration: configuration.md

quixstreams/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@
55

66
__all__ = ["Application", "message_context", "MessageContext", "State"]
77

8-
__version__ = "3.10.0"
8+
__version__ = "3.11.0dev1"

0 commit comments

Comments
 (0)