|
| 1 | +# `StreamingDataFrame.concat()`: concatenating multiple topics into a stream |
| 2 | + |
| 3 | +Use `StreamingDataFrame.concat()` to combine two or more topics into a new stream containing all the elements from all the topics. |
| 4 | + |
| 5 | +Use it when you need: |
| 6 | + |
| 7 | +- To process multiple topics as a single stream. |
| 8 | +- To combine the branches of the same StreamingDataFrame back together. |
| 9 | + |
| 10 | +## Examples |
| 11 | + |
| 12 | +**Example 1:** Aggregate e-commerce orders from different locations into one stream and calculate the average order size in 1h windows. |
| 13 | + |
| 14 | +```python |
| 15 | +from datetime import timedelta |
| 16 | + |
| 17 | +from quixstreams import Application |
| 18 | +from quixstreams.dataframe.windows import Mean |
| 19 | + |
| 20 | +app = Application(...) |
| 21 | + |
| 22 | +# Define the orders topics |
| 23 | +topic_uk = app.topic("orders-uk") |
| 24 | +topic_de = app.topic("orders-de") |
| 25 | + |
| 26 | +# Create StreamingDataFrames for each location |
| 27 | +orders_uk = app.dataframe(topic_uk) |
| 28 | +orders_de = app.dataframe(topic_de) |
| 29 | + |
| 30 | +# Simulate the currency conversion step for each topic before concatenating them. |
| 31 | +orders_uk["amount_usd"] = orders_uk["amount"].apply(convert_currency("GBP", "USD")) |
| 32 | +orders_de["amount_usd"] = orders_de["amount"].apply(convert_currency("EUR", "USD")) |
| 33 | + |
| 34 | +# Concatenate the orders from different locations into a new StreamingDataFrame. |
| 35 | +# The new dataframe will have all records from both topics. |
| 36 | +orders_combined = orders_uk.concat(orders_de) |
| 37 | + |
| 38 | +# Calculate the average order size in USD within 1h tumbling window. |
| 39 | +orders_combined.tumbling_window(timedelta(hours=1)).agg(avg_amount_usd=Mean("amount_usd")) |
| 40 | + |
| 41 | + |
| 42 | +if __name__ == '__main__': |
| 43 | + app.run() |
| 44 | +``` |
| 45 | + |
| 46 | + |
| 47 | +**Example 2:** Combine branches of the same `StreamingDataFrame` back together. |
| 48 | +See the [Branching](branching.md) page for more details about branching. |
| 49 | + |
| 50 | +```python |
| 51 | +from quixstreams import Application |
| 52 | +app = Application(...) |
| 53 | + |
| 54 | +input_topic = app.topic("orders") |
| 55 | +output_topic = app.topic("output") |
| 56 | + |
| 57 | +# Create a dataframe with all orders |
| 58 | +all_orders = app.dataframe(input_topic) |
| 59 | + |
| 60 | +# Create a branches with DE and UK orders: |
| 61 | +orders_de = all_orders[all_orders["country"] == "DE"] |
| 62 | +orders_uk = all_orders[all_orders["country"] == "UK"] |
| 63 | + |
| 64 | +# Do some conditional processing for DE and UK orders here |
| 65 | +# ... |
| 66 | + |
| 67 | +# Combine the branches back with .concat() |
| 68 | +all_orders = orders_de.concat(orders_uk) |
| 69 | + |
| 70 | +# Send data to the output topic |
| 71 | +all_orders.to_topic(output_topic) |
| 72 | + |
| 73 | + |
| 74 | +if __name__ == '__main__': |
| 75 | + app.run() |
| 76 | + ``` |
| 77 | + |
| 78 | + |
| 79 | +## Message ordering between partitions |
| 80 | +When using `StreamingDataFrame.concat()` to combine different topics, the application's internal consumer goes into a special "buffered" mode. |
| 81 | + |
| 82 | +In this mode, it buffers messages per partition in order to process them in the timestamp order between different topics. |
| 83 | +Timestamp alignment is effective only for the partitions **with the same numbers**: partition zero is aligned with other zero partitions, but not with partition one. |
| 84 | + |
| 85 | +Why is this needed? |
| 86 | +Consider two topics A and B with the following timestamps: |
| 87 | + |
| 88 | +- **Topic A (partition 0):** 11, 15 |
| 89 | +- **Topic B (partition 0):** 12, 17 |
| 90 | + |
| 91 | +By default, Kafka does not guarantee the processing order to be **11**, **12**, **15**, **17** because the order is guaranteed only within a single partition. |
| 92 | + |
| 93 | +With timestamp alignment, the order is achievable given that the messages are already present in the topic partitions (i.e. it doesn't handle the cases when the producer is delayed). |
| 94 | + |
| 95 | + |
| 96 | +## Stateful operations on concatenated dataframes |
| 97 | + |
| 98 | +To perform stateful operations like windowed aggregations on the concatenated StreamingDataFrame, the underlying topics **must have the same number of partitions**. |
| 99 | +The application will raise the error when this condition is not met. |
| 100 | + |
| 101 | +In addition, **the message keys must be distributed using the same partitioning algorithm.** |
| 102 | +Otherwise, same keys may access different state stores leading to incorrect results. |
0 commit comments