Skip to content

Commit 7e78b7e

Browse files
committed
First test
1 parent 40f98f4 commit 7e78b7e

File tree

1 file changed

+38
-0
lines changed

1 file changed

+38
-0
lines changed

tests/test_quixstreams/test_dataframe/test_dataframe.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2641,3 +2641,41 @@ def test_concat_stateful_mismatching_partitions_fails(
26412641
match="The underlying topics must have the same number of partitions to use State",
26422642
):
26432643
sdf1.concat(sdf2).update(lambda v, state: None, stateful=True)
2644+
2645+
2646+
class TestStreamingDataFrameJoin:
2647+
def test_join(
2648+
self,
2649+
topic_manager_factory,
2650+
dataframe_factory,
2651+
state_manager,
2652+
message_context_factory,
2653+
):
2654+
topic_manager = topic_manager_factory()
2655+
left_topic = topic_manager.topic(str(uuid.uuid4()))
2656+
right_topic = topic_manager.topic(str(uuid.uuid4()))
2657+
2658+
left_sdf = dataframe_factory(topic=left_topic, state_manager=state_manager)
2659+
right_sdf = dataframe_factory(topic=right_topic, state_manager=state_manager)
2660+
sdf_joined = left_sdf.join(right_sdf)
2661+
2662+
state_manager.on_partition_assign(
2663+
stream_id=right_sdf.stream_id,
2664+
partition=0,
2665+
committed_offsets={},
2666+
)
2667+
2668+
sdf_joined.test(
2669+
value={"right": 1},
2670+
key=b"key",
2671+
timestamp=1,
2672+
topic=right_topic,
2673+
ctx=message_context_factory(topic=right_topic.name),
2674+
)
2675+
assert sdf_joined.test(
2676+
value={"left": 2},
2677+
key=b"key",
2678+
timestamp=2,
2679+
topic=left_topic,
2680+
ctx=message_context_factory(topic=left_topic.name),
2681+
) == [({"left": 2, "right": 1}, b"key", 2, None)]

0 commit comments

Comments
 (0)