Skip to content

Commit 275e2c0

Browse files
committed
Rename to join_latest
1 parent aa51a5b commit 275e2c0

File tree

2 files changed

+9
-9
lines changed

2 files changed

+9
-9
lines changed

quixstreams/dataframe/dataframe.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1653,7 +1653,7 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame":
16531653
*self.topics, *other.topics, stream=merged_stream
16541654
)
16551655

1656-
def join(
1656+
def join_latest(
16571657
self,
16581658
right: "StreamingDataFrame",
16591659
how: JoinHow = "inner",

tests/test_quixstreams/test_dataframe/test_dataframe.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2645,7 +2645,7 @@ def test_concat_stateful_mismatching_partitions_fails(
26452645
sdf1.concat(sdf2).update(lambda v, state: None, stateful=True)
26462646

26472647

2648-
class TestStreamingDataFrameJoin:
2648+
class TestStreamingDataFrameJoinLatest:
26492649
@pytest.fixture
26502650
def topic_manager(self, topic_manager_factory):
26512651
return topic_manager_factory()
@@ -2748,7 +2748,7 @@ def test_how(
27482748
):
27492749
left_topic, right_topic = create_topic(), create_topic()
27502750
left_sdf, right_sdf = create_sdf(left_topic), create_sdf(right_topic)
2751-
joined_sdf = left_sdf.join(right_sdf, how=how)
2751+
joined_sdf = left_sdf.join_latest(right_sdf, how=how)
27522752
assign_partition(right_sdf)
27532753

27542754
publish(joined_sdf, right_topic, value=right, key=b"key", timestamp=1)
@@ -2766,14 +2766,14 @@ def test_how_invalid_value(self, create_topic, create_sdf):
27662766
f"Valid values are: {', '.join(get_args(JoinHow))}."
27672767
)
27682768
with pytest.raises(ValueError, match=match):
2769-
left_sdf.join(right_sdf, how="invalid")
2769+
left_sdf.join_latest(right_sdf, how="invalid")
27702770

27712771
def test_mismatching_partitions_fails(self, create_topic, create_sdf):
27722772
left_topic, right_topic = create_topic(), create_topic(num_partitions=2)
27732773
left_sdf, right_sdf = create_sdf(left_topic), create_sdf(right_topic)
27742774

27752775
with pytest.raises(TopicPartitionsMismatch):
2776-
left_sdf.join(right_sdf)
2776+
left_sdf.join_latest(right_sdf)
27772777

27782778
@pytest.mark.parametrize(
27792779
"on_overlap, right, left, expected",
@@ -2835,7 +2835,7 @@ def test_on_overlap(
28352835
):
28362836
left_topic, right_topic = create_topic(), create_topic()
28372837
left_sdf, right_sdf = create_sdf(left_topic), create_sdf(right_topic)
2838-
joined_sdf = left_sdf.join(right_sdf, how="left", on_overlap=on_overlap)
2838+
joined_sdf = left_sdf.join_latest(right_sdf, how="left", on_overlap=on_overlap)
28392839
assign_partition(right_sdf)
28402840

28412841
publish(joined_sdf, right_topic, value=right, key=b"key", timestamp=1)
@@ -2858,7 +2858,7 @@ def test_on_overlap_invalid_value(self, create_topic, create_sdf):
28582858
f"Valid values are: {', '.join(get_args(JoinOnOverlap))}."
28592859
)
28602860
with pytest.raises(ValueError, match=match):
2861-
left_sdf.join(right_sdf, on_overlap="invalid")
2861+
left_sdf.join_latest(right_sdf, on_overlap="invalid")
28622862

28632863
def test_custom_merger(self, create_topic, create_sdf, assign_partition, publish):
28642864
left_topic, right_topic = create_topic(), create_topic()
@@ -2867,7 +2867,7 @@ def test_custom_merger(self, create_topic, create_sdf, assign_partition, publish
28672867
def merger(left, right):
28682868
return {"left": left, "right": right}
28692869

2870-
joined_sdf = left_sdf.join(right_sdf, merger=merger)
2870+
joined_sdf = left_sdf.join_latest(right_sdf, merger=merger)
28712871
assign_partition(right_sdf)
28722872

28732873
publish(joined_sdf, right_topic, value=1, key=b"key", timestamp=1)
@@ -2884,7 +2884,7 @@ def test_retention_ms(
28842884
left_topic, right_topic = create_topic(), create_topic()
28852885
left_sdf, right_sdf = create_sdf(left_topic), create_sdf(right_topic)
28862886

2887-
joined_sdf = left_sdf.join(right_sdf, retention_ms=10)
2887+
joined_sdf = left_sdf.join_latest(right_sdf, retention_ms=10)
28882888
assign_partition(right_sdf)
28892889

28902890
# min eligible timestamp is 15 - 10 = 5

0 commit comments

Comments
 (0)