@@ -2644,38 +2644,62 @@ def test_concat_stateful_mismatching_partitions_fails(
2644
2644
2645
2645
2646
2646
class TestStreamingDataFrameJoin :
2647
- def test_join (
2648
- self ,
2649
- topic_manager_factory ,
2650
- dataframe_factory ,
2651
- state_manager ,
2652
- message_context_factory ,
2653
- ):
2654
- topic_manager = topic_manager_factory ()
2655
- left_topic = topic_manager .topic (str (uuid .uuid4 ()))
2656
- right_topic = topic_manager .topic (str (uuid .uuid4 ()))
2647
+ @pytest .fixture
2648
+ def topic_manager (self , topic_manager_factory ):
2649
+ return topic_manager_factory ()
2650
+
2651
+ @pytest .fixture
2652
+ def create_topic (self , topic_manager ):
2653
+ def _create_topic (num_partitions = 1 ):
2654
+ return topic_manager .topic (
2655
+ str (uuid .uuid4 ()),
2656
+ create_config = TopicConfig (
2657
+ num_partitions = num_partitions ,
2658
+ replication_factor = 1 ,
2659
+ ),
2660
+ )
2657
2661
2658
- left_sdf = dataframe_factory (topic = left_topic , state_manager = state_manager )
2659
- right_sdf = dataframe_factory (topic = right_topic , state_manager = state_manager )
2660
- sdf_joined = left_sdf .join (right_sdf )
2662
+ return _create_topic
2661
2663
2662
- state_manager .on_partition_assign (
2663
- stream_id = right_sdf .stream_id ,
2664
- partition = 0 ,
2665
- committed_offsets = {},
2666
- )
2664
+ @pytest .fixture
2665
+ def create_sdf (self , dataframe_factory , state_manager ):
2666
+ def _create_sdf (topic ):
2667
+ return dataframe_factory (topic = topic , state_manager = state_manager )
2667
2668
2668
- sdf_joined .test (
2669
- value = {"right" : 1 },
2670
- key = b"key" ,
2671
- timestamp = 1 ,
2672
- topic = right_topic ,
2673
- ctx = message_context_factory (topic = right_topic .name ),
2674
- )
2675
- assert sdf_joined .test (
2676
- value = {"left" : 2 },
2677
- key = b"key" ,
2678
- timestamp = 2 ,
2679
- topic = left_topic ,
2680
- ctx = message_context_factory (topic = left_topic .name ),
2681
- ) == [({"left" : 2 , "right" : 1 }, b"key" , 2 , None )]
2669
+ return _create_sdf
2670
+
2671
+ @pytest .fixture
2672
+ def assign_partition (self , state_manager ):
2673
+ def _assign_partition (sdf ):
2674
+ state_manager .on_partition_assign (
2675
+ stream_id = sdf .stream_id ,
2676
+ partition = 0 ,
2677
+ committed_offsets = {},
2678
+ )
2679
+
2680
+ return _assign_partition
2681
+
2682
+ @pytest .fixture
2683
+ def publish (self , message_context_factory ):
2684
+ def _publish (sdf , topic , value , key , timestamp ):
2685
+ return sdf .test (
2686
+ value = value ,
2687
+ key = key ,
2688
+ timestamp = timestamp ,
2689
+ topic = topic ,
2690
+ ctx = message_context_factory (topic = topic .name ),
2691
+ )
2692
+
2693
+ return _publish
2694
+
2695
+ def test_join (self , create_topic , create_sdf , assign_partition , publish ):
2696
+ left_topic , right_topic = create_topic (), create_topic ()
2697
+ left_sdf , right_sdf = create_sdf (left_topic ), create_sdf (right_topic )
2698
+ joined_sdf = left_sdf .join (right_sdf )
2699
+ assign_partition (right_sdf )
2700
+
2701
+ publish (joined_sdf , right_topic , value = {"right" : 1 }, key = b"key" , timestamp = 1 )
2702
+ joined_value = publish (
2703
+ joined_sdf , left_topic , value = {"left" : 2 }, key = b"key" , timestamp = 2
2704
+ )
2705
+ assert joined_value == [({"left" : 2 , "right" : 1 }, b"key" , 2 , None )]
0 commit comments