Skip to content

Commit 53c36ee

Browse files
committed
[JOIN] Implement join
1 parent ce2f8b3 commit 53c36ee

File tree

1 file changed

+22
-0
lines changed

1 file changed

+22
-0
lines changed

quixstreams/dataframe/dataframe.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
from quixstreams.sinks import BaseSink
5151
from quixstreams.state.base import State
5252
from quixstreams.state.base.transaction import PartitionTransaction
53+
from quixstreams.state.rocksdb.timestamped import TimestampedStore
5354
from quixstreams.utils.printing import (
5455
DEFAULT_COLUMN_NAME,
5556
DEFAULT_LIVE,
@@ -1645,6 +1646,27 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame":
16451646
*self.topics, *other.topics, stream=merged_stream
16461647
)
16471648

1649+
def join(self, right: "StreamingDataFrame") -> "StreamingDataFrame":
1650+
# TODO: ensure copartitioning of left and right?
1651+
right.processing_context.state_manager.register_store(
1652+
stream_id=right.stream_id,
1653+
store_type=TimestampedStore,
1654+
changelog_config=self._topic_manager.derive_topic_config(right.topics),
1655+
)
1656+
1657+
def left_func(value, key, timestamp, headers):
1658+
right_tx = _get_transaction(right)
1659+
right_value = right_tx.get_last(timestamp=timestamp, prefix=key)
1660+
return {**value, **(right_value or {})}
1661+
1662+
def right_func(value, key, timestamp, headers):
1663+
right_tx = _get_transaction(right)
1664+
right_tx.set(timestamp=timestamp, value=value, prefix=key)
1665+
1666+
left = self.apply(left_func, metadata=True)
1667+
right = right.update(right_func, metadata=True).filter(lambda value: False)
1668+
return left.concat(right)
1669+
16481670
def ensure_topics_copartitioned(self):
16491671
partitions_counts = set(t.broker_config.num_partitions for t in self._topics)
16501672
if len(partitions_counts) > 1:

0 commit comments

Comments
 (0)