Skip to content

Commit 2df4b51

Browse files
committed
Add retention_ms param
1 parent 633ce57 commit 2df4b51

File tree

2 files changed

+52
-1
lines changed

2 files changed

+52
-1
lines changed

quixstreams/dataframe/dataframe.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1659,6 +1659,7 @@ def join(
16591659
how: JoinHow = "inner",
16601660
on_overlap: JoinOnOverlap = "raise",
16611661
merger: Optional[Callable[[Any, Any], Any]] = None,
1662+
retention_ms: Union[int, timedelta] = timedelta(days=7),
16621663
) -> "StreamingDataFrame":
16631664
if how not in get_args(JoinHow):
16641665
raise ValueError(
@@ -1675,6 +1676,7 @@ def join(
16751676
right.register_store(store_type=TimestampedStore)
16761677

16771678
is_inner_join = how == "inner"
1679+
retention_ms = ensure_milliseconds(retention_ms)
16781680

16791681
if merger is None:
16801682
if on_overlap == "keep-left":
@@ -1710,7 +1712,11 @@ def merger(left_value, right_value):
17101712

17111713
def left_func(value, key, timestamp, headers):
17121714
right_tx = _get_transaction(right)
1713-
right_value = right_tx.get_last(timestamp=timestamp, prefix=key)
1715+
right_value = right_tx.get_last(
1716+
timestamp=timestamp,
1717+
prefix=key,
1718+
retention_ms=retention_ms,
1719+
)
17141720
if is_inner_join and not right_value:
17151721
return DISCARDED
17161722
return merger(value, right_value)

tests/test_quixstreams/test_dataframe/test_dataframe.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2872,3 +2872,48 @@ def merger(left, right):
28722872
publish(joined_sdf, right_topic, value=1, key=b"key", timestamp=1)
28732873
joined_value = publish(joined_sdf, left_topic, value=2, key=b"key", timestamp=2)
28742874
assert joined_value == [({"left": 2, "right": 1}, b"key", 2, None)]
2875+
2876+
@pytest.mark.parametrize(
2877+
"retention_ms, right_timestamp, left_timestamp, joined",
2878+
[
2879+
# Retention strategy includes right values greater or equal to 5 - 3 = 2
2880+
(3, 2, 5, True),
2881+
(timedelta(milliseconds=3), 2, 5, True),
2882+
# Retention strategy ignores right values lower than 6 - 3 = 3
2883+
(3, 2, 6, False),
2884+
(timedelta(milliseconds=3), 2, 6, False),
2885+
],
2886+
)
2887+
def test_retention_ms(
2888+
self,
2889+
create_topic,
2890+
create_sdf,
2891+
assign_partition,
2892+
publish,
2893+
retention_ms,
2894+
right_timestamp,
2895+
left_timestamp,
2896+
joined,
2897+
):
2898+
left_topic, right_topic = create_topic(), create_topic()
2899+
left_sdf, right_sdf = create_sdf(left_topic), create_sdf(right_topic)
2900+
2901+
joined_sdf = left_sdf.join(right_sdf, retention_ms=retention_ms)
2902+
assign_partition(right_sdf)
2903+
2904+
publish(
2905+
joined_sdf,
2906+
right_topic,
2907+
value={"right": 1},
2908+
key=b"key",
2909+
timestamp=right_timestamp,
2910+
)
2911+
joined_value = publish(
2912+
joined_sdf,
2913+
left_topic,
2914+
value={"left": 2},
2915+
key=b"key",
2916+
timestamp=left_timestamp,
2917+
)
2918+
2919+
assert bool(joined_value) == joined

0 commit comments

Comments
 (0)