Skip to content

Commit 000d795

Browse files
authored
add SDF.print(), allow inplace for SDF.update() and SDF.to_topic() (#403)
- Added `SDF.print(pretty: bool, metadata: bool)`, which prints the value, and optionally metadata - Enables `SDF.print()`, `SDF.update()` and `SDF.to_topic()` to be used without reassigning them (and you can chain them as normal) ```python sdf = SDF() sdf = sdf.apply(a_func) sdf.update(other_func).print(metadata=True).to_topic(my_topic) ```
1 parent 0a73ced commit 000d795

File tree

3 files changed

+166
-16
lines changed

3 files changed

+166
-16
lines changed

docs/processing.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -501,15 +501,23 @@ For example, to log input data, or to update a counter in the State.
501501
The return of the callback passed to `.update()` will be ignored, and the original input
502502
will be sent to downstream operations instead.
503503

504+
This operation occurs in-place, meaning reassigning the operation to your `sdf` is
505+
entirely OPTIONAL; the original `StreamingDataFrame` is still returned to allow the
506+
chaining of commands like `sdf.update().print()`.
507+
508+
> Note: chains that include any non-inplace function will still require reassignment:
509+
> `sdf = sdf.update().filter().print()`
510+
504511
**Example:**
505512

506513
```python
507514
# Mutate a list by appending a new item to it
508515
# The updated list will be passed downstream
509516
sdf = sdf.update(lambda some_list: some_list.append(1))
510517

511-
# Using .update() to print a value to the console
512-
sdf = sdf.update(lambda value: print("Received value: ", value))
518+
# OR instead (no reassignment):
519+
sdf.update(lambda some_list: some_list.append(1))
520+
513521
```
514522

515523
### StreamingDataFrame.filter()

quixstreams/dataframe/dataframe.py

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import contextvars
44
import functools
55
import operator
6+
import pprint
67
from copy import deepcopy
78
from datetime import timedelta
89
from typing import (
@@ -282,6 +283,9 @@ def update(
282283
The result of the function will be ignored, and the original value will be
283284
passed downstream.
284285
286+
This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the
287+
original `StreamingDataFrame` is returned for chaining (`sdf.update().print()`).
288+
285289
286290
Example Snippet:
287291
@@ -297,7 +301,8 @@ def func(values: list, state: State):
297301
298302
sdf = StreamingDataframe()
299303
sdf = sdf.update(func, stateful=True)
300-
sdf = sdf.update(lambda value: print("Received value: ", value))
304+
# does not require reassigning
305+
sdf.update(lambda v: v.append(1))
301306
```
302307
303308
:param func: function to update value
@@ -306,6 +311,7 @@ def func(values: list, state: State):
306311
:param metadata: if True, the callback will receive key, timestamp and headers
307312
along with the value.
308313
Default - `False`.
314+
:return: the updated StreamingDataFrame instance (reassignment NOT required).
309315
"""
310316
if stateful:
311317
self._register_store()
@@ -319,15 +325,14 @@ def func(values: list, state: State):
319325
func=cast(UpdateWithMetadataCallbackStateful, with_metadata_func),
320326
processing_context=self._processing_context,
321327
)
322-
stream = self.stream.add_update(
328+
return self._add_update(
323329
cast(UpdateWithMetadataCallback, stateful_func), metadata=True
324330
)
325331
else:
326-
stream = self.stream.add_update(
332+
return self._add_update(
327333
cast(Union[UpdateCallback, UpdateWithMetadataCallback], func),
328334
metadata=metadata,
329335
)
330-
return self.__dataframe_clone__(stream=stream)
331336

332337
@overload
333338
def filter(self, func: FilterCallback) -> Self: ...
@@ -546,6 +551,9 @@ def to_topic(
546551
"""
547552
Produce current value to a topic. You can optionally specify a new key.
548553
554+
This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the
555+
original `StreamingDataFrame` is returned for chaining (`sdf.update().print()`).
556+
549557
Example Snippet:
550558
551559
```python
@@ -560,17 +568,18 @@ def to_topic(
560568
561569
sdf = app.dataframe(input_topic)
562570
sdf = sdf.to_topic(output_topic_0)
563-
sdf = sdf.to_topic(output_topic_1, key=lambda data: data["a_field"])
571+
# does not require reassigning
572+
sdf.to_topic(output_topic_1, key=lambda data: data["a_field"])
564573
```
565574
566575
:param topic: instance of `Topic`
567576
:param key: a callable to generate a new message key, optional.
568577
If passed, the return type of this callable must be serializable
569578
by `key_serializer` defined for this Topic object.
570579
By default, the current message key will be used.
571-
580+
:return: the updated StreamingDataFrame instance (reassignment NOT required).
572581
"""
573-
return self.update(
582+
return self._add_update(
574583
lambda value, orig_key, timestamp, headers: self._produce(
575584
topic=topic,
576585
value=value,
@@ -673,6 +682,48 @@ def _set_headers_callback(
673682
stream = self.stream.add_transform(func=_set_headers_callback)
674683
return self.__dataframe_clone__(stream=stream)
675684

685+
def print(self, pretty: bool = True, metadata: bool = False) -> Self:
686+
"""
687+
Print out the current message value (and optionally, the message metadata) to
688+
stdout (console) (like the built-in `print` function).
689+
690+
Can also output a more dict-friendly format with `pretty=True`.
691+
692+
This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the
693+
original `StreamingDataFrame` is returned for chaining (`sdf.update().print()`).
694+
695+
> NOTE: prints the current (edited) values, not the original values.
696+
697+
Example Snippet:
698+
699+
```python
700+
from quixstreams import Application
701+
702+
703+
app = Application()
704+
input_topic = app.topic("data")
705+
706+
sdf = app.dataframe(input_topic)
707+
sdf["edited_col"] = sdf["orig_col"] + "edited"
708+
# print the updated message value with the newly added column
709+
sdf.print()
710+
```
711+
712+
:param pretty: Whether to use "pprint" formatting, which uses new-lines and
713+
indents for easier console reading (but might be worse for log parsing).
714+
:param metadata: Whether to additionally print the key, timestamp, and headers
715+
:return: the updated StreamingDataFrame instance (reassignment NOT required).
716+
"""
717+
print_args = ["value", "key", "timestamp", "headers"]
718+
if pretty:
719+
printer = functools.partial(pprint.pprint, indent=2, sort_dicts=False)
720+
else:
721+
printer = print
722+
return self._add_update(
723+
lambda *args: printer({print_args[i]: args[i] for i in range(len(args))}),
724+
metadata=metadata,
725+
)
726+
676727
def compose(
677728
self,
678729
sink: Optional[Callable[[Any, Any, int, Any], None]] = None,
@@ -929,6 +980,14 @@ def _produce(
929980
)
930981
self._producer.produce_row(row=row, topic=topic, key=key, timestamp=timestamp)
931982

983+
def _add_update(
984+
self,
985+
func: Union[UpdateCallback, UpdateWithMetadataCallback],
986+
metadata: bool = False,
987+
):
988+
self._stream = self._stream.add_update(func, metadata=metadata)
989+
return self
990+
932991
def _register_store(self):
933992
"""
934993
Register the default store for input topic in StateStoreManager
@@ -986,7 +1045,7 @@ def __setitem__(self, item_key: Any, item: Union[Self, object]):
9861045
# Update an item key with a result of another sdf.apply()
9871046
diff = self.stream.diff(item.stream)
9881047
other_sdf_composed = diff.compose_returning()
989-
stream = self.stream.add_update(
1048+
self._add_update(
9901049
lambda value, key, timestamp, headers: operator.setitem(
9911050
value,
9921051
item_key,
@@ -997,18 +1056,15 @@ def __setitem__(self, item_key: Any, item: Union[Self, object]):
9971056
elif isinstance(item, StreamingSeries):
9981057
# Update an item key with a result of another series
9991058
series_composed = item.compose_returning()
1000-
stream = self.stream.add_update(
1059+
self._add_update(
10011060
lambda value, key, timestamp, headers: operator.setitem(
10021061
value, item_key, series_composed(value, key, timestamp, headers)[0]
10031062
),
10041063
metadata=True,
10051064
)
10061065
else:
10071066
# Update an item key with a constant
1008-
stream = self.stream.add_update(
1009-
lambda value: operator.setitem(value, item_key, item)
1010-
)
1011-
self._stream = stream
1067+
self._add_update(lambda value: operator.setitem(value, item_key, item))
10121068

10131069
@overload
10141070
def __getitem__(self, item: str) -> StreamingSeries: ...

tests/test_quixstreams/test_dataframe/test_dataframe.py

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import operator
23
import uuid
34
from collections import namedtuple
@@ -369,6 +370,26 @@ def test_set_headers(self, original_headers, new_headers, dataframe_factory):
369370
)[0]
370371
assert result == expected
371372

373+
@pytest.mark.parametrize(
374+
"metadata,expected",
375+
[
376+
(False, str({"value": {"x": 1}})),
377+
(
378+
True,
379+
str({"value": {"x": 1}, "key": b"key", "timestamp": 0, "headers": []}),
380+
),
381+
],
382+
)
383+
def test_print(self, dataframe_factory, metadata, expected, capsys):
384+
sdf = dataframe_factory()
385+
sdf.print(metadata=metadata)
386+
387+
value = {"x": 1}
388+
key, timestamp, headers = b"key", 0, []
389+
sdf.test(value=value, key=key, timestamp=timestamp, headers=headers)
390+
391+
assert expected in capsys.readouterr().out
392+
372393

373394
class TestStreamingDataFrameApplyExpand:
374395
def test_apply_expand(self, dataframe_factory):
@@ -418,14 +439,76 @@ def test_setitem_expand_not_allowed(self, dataframe_factory):
418439
_ = sdf[sdf.apply(lambda v: [v, v], expand=True)]
419440

420441

442+
class TestStreamingDataFrameUpdate:
443+
def test_update_no_reassign(self, dataframe_factory):
444+
"""
445+
"Update" operations should be applied regardless of a reassignment,
446+
and anything else requires assignment.
447+
"""
448+
sdf = dataframe_factory()
449+
sdf_tree_1 = sdf.stream.tree()
450+
sdf_id_1 = id(sdf)
451+
452+
# non-update non-reassignment (no change!)
453+
sdf.apply(lambda v: v)
454+
sdf_tree_2 = sdf.stream.tree()
455+
sdf_id_2 = id(sdf)
456+
assert sdf_id_1 == sdf_id_2
457+
assert sdf_tree_1 == sdf_tree_2
458+
459+
# non-update reassignment
460+
sdf = sdf.apply(lambda v: v)
461+
sdf_tree_3 = sdf.stream.tree()
462+
sdf_id_3 = id(sdf)
463+
assert sdf_id_2 != sdf_id_3
464+
assert sdf_tree_2 != sdf_tree_3
465+
466+
# update non-reassignment
467+
sdf.update(lambda v: v)
468+
sdf_tree_4 = sdf.stream.tree()
469+
sdf_id_4 = id(sdf)
470+
assert sdf_id_3 == sdf_id_4
471+
assert sdf_tree_3 != sdf_tree_4
472+
473+
# update reassignment
474+
sdf = sdf.update(lambda v: v)
475+
sdf_tree_5 = sdf.stream.tree()
476+
sdf_id_5 = id(sdf)
477+
assert sdf_id_4 == sdf_id_5
478+
assert sdf_tree_4 != sdf_tree_5
479+
480+
def test_chaining_inplace_with_non_inplace(self, dataframe_factory):
481+
"""
482+
When chaining together inplace and non-inplace, reassigning must happen else
483+
everything starting with the non-inplace will be lost.
484+
"""
485+
sdf = dataframe_factory()
486+
sdf.update(lambda v: v.append(1)).apply(lambda v: v + [2]).update(
487+
lambda v: v.append(3)
488+
)
489+
sdf = sdf.apply(lambda v: v + [4])
490+
491+
value = []
492+
key, timestamp, headers = b"key", 0, []
493+
494+
assert sdf.test(value, key, timestamp, headers)[0] == (
495+
[1, 4],
496+
key,
497+
timestamp,
498+
headers,
499+
)
500+
501+
421502
class TestStreamingDataFrameToTopic:
503+
@pytest.mark.parametrize("reassign", [True, False])
422504
def test_to_topic(
423505
self,
424506
dataframe_factory,
425507
row_consumer_factory,
426508
row_producer_factory,
427509
topic_manager_topic_factory,
428510
message_context_factory,
511+
reassign,
429512
):
430513
topic = topic_manager_topic_factory(
431514
key_serializer="str",
@@ -436,7 +519,10 @@ def test_to_topic(
436519
producer = row_producer_factory()
437520

438521
sdf = dataframe_factory(producer=producer)
439-
sdf = sdf.to_topic(topic)
522+
if reassign:
523+
sdf = sdf.to_topic(topic)
524+
else:
525+
sdf.to_topic(topic)
440526

441527
value = {"x": 1, "y": 2}
442528
key, timestamp = "key", 10

0 commit comments

Comments
 (0)