Skip to content

Commit badc591

Browse files
authored
Update docs (#406)
Add more info about print() and drop() to the processing.md
1 parent 8b20c0c commit badc591

File tree

1 file changed

+67
-5
lines changed

1 file changed

+67
-5
lines changed

docs/processing.md

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,52 @@ sdf = sdf.apply(lambda value: {'temperature': value['temperature'],
143143
> To make projection on top of non-mapping values (like custom objects), use
144144
> the `.apply()` approach.
145145
146+
## Dropping Columns
147+
Similarly to projections, you may drop unnecessary columns from incoming records using a `StreamingDataFrame.drop()` method.
148+
149+
It accepts either one column name as a string or a list of names.
150+
151+
The `.drop()` method updates the existing `StreamingDataFrame` object and returns the same `StreamingDataFrame` instance so that you can chain other methods after the `drop()` call, too.
152+
153+
Internally, it mutates the record's value and deletes the keys in place.
154+
155+
**Example**:
156+
157+
In this example, assume you receive temperature readings in the following format:
158+
159+
```json
160+
{
161+
"temperature": 35.5,
162+
"timestamp": 1710865771.3750699,
163+
"metadata": {
164+
"sensor_id": "sensor-1"
165+
}
166+
}
167+
```
168+
169+
and you need to drop a "metadata" key from the record:
170+
171+
```json
172+
{
173+
"temperature": 35.5,
174+
"timestamp": 1710865771.3750699
175+
}
176+
```
177+
178+
Here is how to do that with `StreamingDataFrame`:
179+
180+
```python
181+
sdf = app.dataframe(...)
182+
# Dropping the "metadata" key from the record's value assuming it's a dictionary
183+
sdf.drop("metadata")
184+
185+
# You may also drop multiple keys by providing a list of names:
186+
sdf.drop(["metadata", "timestamp"])
187+
```
188+
189+
> **_NOTE:_** The `StreamingDataFrame.drop()` method works only with mapping-like values like dictionaries.
190+
191+
146192
## Transforming Data
147193

148194
### Generating New Data
@@ -547,7 +593,26 @@ sdf = sdf[sdf.apply(lambda value: value['field_a'] > 0)]
547593
To debug code in `StreamingDataFrame`, you can use the usual tools like prints, logging
548594
and breakpoints.
549595

550-
**Example**:
596+
**Example 1**:
597+
598+
Using `StreamingDataFrame.print()` to print the current record's value and metadata in the stream:
599+
600+
```python
601+
sdf = app.dataframe(...)
602+
# some SDF transformations happening here ...
603+
604+
# Print the current record's value, key, timestamp and headers
605+
sdf.print(metadata=True)
606+
# It will print the record's data wrapped into a dict for readability:
607+
# { 'value': {'number': 12183},
608+
# 'key': b'key',
609+
# 'timestamp': 1721129697951,
610+
# 'headers': [('header_name', b'header-value')]
611+
# }
612+
```
613+
614+
615+
**Example 2**:
551616

552617
Here is how to use `StreamingDataFrame.update()` to set a breakpoint and examine the
553618
value between operations:
@@ -559,10 +624,7 @@ sdf = app.dataframe(...)
559624
# some SDF transformations happening here ...
560625

561626
# Set a breakpoint
562-
sdf = sdf.update(lambda value: pdb.set_trace())
563-
564-
# Or simply print the value
565-
sdf = sdf.update(lambda value: print('Value: ', value))
627+
sdf.update(lambda value: pdb.set_trace())
566628
```
567629

568630
## Updating Kafka Timestamps

0 commit comments

Comments
 (0)