Skip to content

Commit 44045ca

Browse files
Update documentation (#882)
Co-authored-by: daniil-quix <133032822+daniil-quix@users.noreply.github.com>
1 parent 17c9317 commit 44045ca

File tree

3 files changed

+618
-173
lines changed

3 files changed

+618
-173
lines changed

docs/api-reference/dataframe.md

Lines changed: 116 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
class StreamingDataFrame()
1111
```
1212

13-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L85)
13+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L87)
1414

1515
`StreamingDataFrame` is the main object you will use for ETL work.
1616

@@ -73,7 +73,7 @@ sdf = sdf.to_topic(topic_obj)
7373
def stream_id() -> str
7474
```
7575

76-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L170)
76+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L172)
7777

7878
An identifier of the data stream this StreamingDataFrame
7979
manipulates in the application.
@@ -107,7 +107,7 @@ def apply(func: Union[
107107
metadata: bool = False) -> "StreamingDataFrame"
108108
```
109109

110-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L229)
110+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L231)
111111

112112
Apply a function to transform the value and return a new value.
113113

@@ -165,7 +165,7 @@ def update(func: Union[
165165
metadata: bool = False) -> "StreamingDataFrame"
166166
```
167167

168-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L337)
168+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L335)
169169

170170
Apply a function to mutate value in-place or to perform a side effect
171171

@@ -233,7 +233,7 @@ def filter(func: Union[
233233
metadata: bool = False) -> "StreamingDataFrame"
234234
```
235235

236-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L444)
236+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L438)
237237

238238
Filter value using provided function.
239239

@@ -287,7 +287,7 @@ def group_by(
287287
) -> "StreamingDataFrame"
288288
```
289289

290-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L533)
290+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L523)
291291

292292
"Groups" messages by re-keying them via the provided group_by operation
293293

@@ -352,7 +352,7 @@ a clone with this operation added (assign to keep its effect).
352352
def contains(keys: Union[str, list[str]]) -> StreamingSeries
353353
```
354354

355-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L643)
355+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L633)
356356

357357
Check if keys are present in the Row value.
358358

@@ -394,7 +394,7 @@ def to_topic(
394394
key: Optional[Callable[[Any], Any]] = None) -> "StreamingDataFrame"
395395
```
396396

397-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L674)
397+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L664)
398398

399399
Produce current value to a topic. You can optionally specify a new key.
400400

@@ -448,7 +448,7 @@ def set_timestamp(
448448
func: Callable[[Any, Any, int, Any], int]) -> "StreamingDataFrame"
449449
```
450450

451-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L719)
451+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L709)
452452

453453
Set a new timestamp based on the current message value and its metadata.
454454

@@ -501,7 +501,7 @@ def set_headers(
501501
) -> "StreamingDataFrame"
502502
```
503503

504-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L762)
504+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L752)
505505

506506
Set new message headers based on the current message value and metadata.
507507

@@ -550,7 +550,7 @@ a new StreamingDataFrame instance
550550
def print(pretty: bool = True, metadata: bool = False) -> "StreamingDataFrame"
551551
```
552552

553-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L813)
553+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L803)
554554

555555
Print out the current message value (and optionally, the message metadata) to
556556

@@ -613,7 +613,7 @@ def print_table(
613613
int]] = None) -> "StreamingDataFrame"
614614
```
615615

616-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L859)
616+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L849)
617617

618618
Print a table with the most recent records.
619619

@@ -706,7 +706,7 @@ sdf.print_table(size=5, title="Live Records", slowdown=1)
706706
def compose(sink: Optional[VoidExecutor] = None) -> dict[str, VoidExecutor]
707707
```
708708

709-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L975)
709+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L965)
710710

711711
Compose all functions of this StreamingDataFrame into one big closure.
712712

@@ -760,7 +760,7 @@ def test(value: Any,
760760
topic: Optional[Topic] = None) -> List[Any]
761761
```
762762

763-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1009)
763+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L999)
764764

765765
A shorthand to test `StreamingDataFrame` with provided value
766766

@@ -800,7 +800,7 @@ def tumbling_window(
800800
) -> TumblingTimeWindowDefinition
801801
```
802802

803-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1048)
803+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1038)
804804

805805
Create a time-based tumbling window transformation on this StreamingDataFrame.
806806

@@ -892,7 +892,7 @@ def tumbling_count_window(
892892
name: Optional[str] = None) -> TumblingCountWindowDefinition
893893
```
894894

895-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1137)
895+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1127)
896896

897897
Create a count-based tumbling window transformation on this StreamingDataFrame.
898898

@@ -965,7 +965,7 @@ def hopping_window(
965965
) -> HoppingTimeWindowDefinition
966966
```
967967

968-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1187)
968+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1177)
969969

970970
Create a time-based hopping window transformation on this StreamingDataFrame.
971971

@@ -1068,7 +1068,7 @@ def hopping_count_window(
10681068
name: Optional[str] = None) -> HoppingCountWindowDefinition
10691069
```
10701070

1071-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1290)
1071+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1280)
10721072

10731073
Create a count-based hopping window transformation on this StreamingDataFrame.
10741074

@@ -1146,7 +1146,7 @@ def sliding_window(
11461146
) -> SlidingTimeWindowDefinition
11471147
```
11481148

1149-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1347)
1149+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1337)
11501150

11511151
Create a time-based sliding window transformation on this StreamingDataFrame.
11521152

@@ -1244,7 +1244,7 @@ def sliding_count_window(
12441244
name: Optional[str] = None) -> SlidingCountWindowDefinition
12451245
```
12461246

1247-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1442)
1247+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1432)
12481248

12491249
Create a count-based sliding window transformation on this StreamingDataFrame.
12501250

@@ -1314,7 +1314,7 @@ sdf = (
13141314
def fill(*columns: str, **mapping: Any) -> "StreamingDataFrame"
13151315
```
13161316

1317-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1495)
1317+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1485)
13181318

13191319
Fill missing values in the message value with a constant value.
13201320

@@ -1371,7 +1371,7 @@ def drop(columns: Union[str, List[str]],
13711371
errors: Literal["ignore", "raise"] = "raise") -> "StreamingDataFrame"
13721372
```
13731373

1374-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1547)
1374+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1537)
13751375

13761376
Drop column(s) from the message value (value must support `del`, like a dict).
13771377

@@ -1415,7 +1415,7 @@ a new StreamingDataFrame instance
14151415
def sink(sink: BaseSink)
14161416
```
14171417

1418-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1591)
1418+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1581)
14191419

14201420
Sink the processed data to the specified destination.
14211421

@@ -1443,7 +1443,7 @@ operations, but branches can still be generated from its originating SDF.
14431443
def concat(other: "StreamingDataFrame") -> "StreamingDataFrame"
14441444
```
14451445

1446-
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1629)
1446+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1619)
14471447

14481448
Concatenate two StreamingDataFrames together and return a new one.
14491449

@@ -1470,6 +1470,98 @@ The same is true for the repartition topics created by `.group_by()`.
14701470

14711471
a new StreamingDataFrame
14721472

1473+
<a id="quixstreams.dataframe.dataframe.StreamingDataFrame.join_asof"></a>
1474+
1475+
<br><br>
1476+
1477+
#### StreamingDataFrame.join\_asof
1478+
1479+
```python
1480+
def join_asof(right: "StreamingDataFrame",
1481+
how: JoinAsOfHow = "inner",
1482+
on_merge: Union[OnOverlap, Callable[[Any, Any], Any]] = "raise",
1483+
grace_ms: Union[int, timedelta] = timedelta(days=7),
1484+
name: Optional[str] = None) -> "StreamingDataFrame"
1485+
```
1486+
1487+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1649)
1488+
1489+
Join the left dataframe with the records of the right dataframe with
1490+
1491+
the same key whose timestamp is less than or equal to the left timestamp.
1492+
This join is built with the enrichment use case in mind, where the left side
1493+
represents some measurements and the right side is metadata.
1494+
1495+
To be joined, the underlying topics of the dataframes must have the same number of partitions
1496+
and use the same partitioner (all keys should be distributed across partitions using the same algorithm).
1497+
1498+
Joining dataframes belonging to the same topics (aka "self-join") is not supported as of now.
1499+
1500+
How it works:
1501+
- Records from the right side get written to the state store without emitting any updates downstream.
1502+
- Records on the left side query the right store for the values with the same **key** and the timestamp lower or equal to the record's timestamp.
1503+
Left side emits data downstream.
1504+
- If the match is found, the two records are merged together into a new one according to the `on_merge` logic.
1505+
- The size of the right store is controlled by the "grace_ms":
1506+
a newly added "right" record expires other values with the same key with timestamps below "<current timestamp> - <grace_ms>".
1507+
1508+
1509+
<br>
1510+
***Arguments:***
1511+
1512+
- `right`: a StreamingDataFrame to join with.
1513+
- `how`: the join strategy. Can be one of:
1514+
- "inner" - emits the result when the match on the right side is found for the left record.
1515+
- "left" - emits the result for each left record even if there is no match on the right side.
1516+
Default - `"inner"`.
1517+
- `on_merge`: how to merge the matched records together assuming they are dictionaries:
1518+
- "raise" - fail with an error if the same keys are found in both dictionaries
1519+
- "keep-left" - prefer the keys from the left record.
1520+
- "keep-right" - prefer the keys from the right record
1521+
- callback - a callback in form "(<left>, <right>) -> <new record>" to merge the records manually.
1522+
Use it to customize the merging logic or when one of the records is not a dictionary.
1523+
- `grace_ms`: how long to keep the right records in the store in event time.
1524+
(the time is taken from the records' timestamps).
1525+
It can be specified as either an `int` representing milliseconds or as a `timedelta` object.
1526+
The records are expired per key when the new record gets added.
1527+
Default - 7 days.
1528+
- `name`: The unique identifier of the underlying state store for the "right" dataframe.
1529+
If not provided, it will be generated based on the underlying topic names.
1530+
Provide a custom name if you need to join the same right dataframe multiple times
1531+
within the application.
1532+
1533+
**Example**:
1534+
1535+
1536+
```python
1537+
from datetime import timedelta
1538+
from quixstreams import Application
1539+
1540+
app = Application()
1541+
1542+
sdf_measurements = app.dataframe(app.topic("measurements"))
1543+
sdf_metadata = app.dataframe(app.topic("metadata"))
1544+
1545+
# Join records from the topic "measurements"
1546+
# with the latest effective records from the topic "metadata"
1547+
# using the "inner" join strategy and keeping the "metadata" records stored for 14 days in event time.
1548+
sdf_joined = sdf_measurements.join_asof(sdf_metadata, how="inner", grace_ms=timedelta(days=14))
1549+
```
1550+
1551+
<a id="quixstreams.dataframe.dataframe.StreamingDataFrame.register_store"></a>
1552+
1553+
<br><br>
1554+
1555+
#### StreamingDataFrame.register\_store
1556+
1557+
```python
1558+
def register_store(store_type: Optional[StoreTypes] = None) -> None
1559+
```
1560+
1561+
[[VIEW SOURCE]](https://github.com/quixio/quix-streams/blob/main/quixstreams/dataframe/dataframe.py#L1745)
1562+
1563+
Register the default store for the current stream_id in StateStoreManager.
1564+
14731565
<a id="quixstreams.dataframe.series"></a>
14741566

14751567
## quixstreams.dataframe.series

0 commit comments

Comments
 (0)