Skip to content

Commit aa51a5b

Browse files
committed
Correct after rebase
1 parent 2df4b51 commit aa51a5b

File tree

2 files changed

+27
-39
lines changed

2 files changed

+27
-39
lines changed

quixstreams/dataframe/dataframe.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1666,18 +1666,13 @@ def join(
16661666
f"Invalid how value: {how}. "
16671667
f"Valid values are: {', '.join(get_args(JoinHow))}."
16681668
)
1669+
16691670
if on_overlap not in get_args(JoinOnOverlap):
16701671
raise ValueError(
16711672
f"Invalid on_overlap value: {on_overlap}. "
16721673
f"Valid values are: {', '.join(get_args(JoinOnOverlap))}."
16731674
)
16741675

1675-
self.ensure_topics_copartitioned(*self.topics, *right.topics)
1676-
right.register_store(store_type=TimestampedStore)
1677-
1678-
is_inner_join = how == "inner"
1679-
retention_ms = ensure_milliseconds(retention_ms)
1680-
16811676
if merger is None:
16821677
if on_overlap == "keep-left":
16831678

@@ -1710,20 +1705,26 @@ def merger(left_value, right_value):
17101705
)
17111706
return {**left_value, **right_value}
17121707

1708+
self.ensure_topics_copartitioned(*self.topics, *right.topics)
1709+
right.register_store(store_type=TimestampedStore)
1710+
is_inner_join = how == "inner"
1711+
retention_ms = ensure_milliseconds(retention_ms)
1712+
17131713
def left_func(value, key, timestamp, headers):
17141714
right_tx = _get_transaction(right)
1715-
right_value = right_tx.get_last(
1716-
timestamp=timestamp,
1717-
prefix=key,
1718-
retention_ms=retention_ms,
1719-
)
1715+
right_value = right_tx.get_last(timestamp=timestamp, prefix=key)
17201716
if is_inner_join and not right_value:
17211717
return DISCARDED
17221718
return merger(value, right_value)
17231719

17241720
def right_func(value, key, timestamp, headers):
17251721
right_tx = _get_transaction(right)
1726-
right_tx.set(timestamp=timestamp, value=value, prefix=key)
1722+
right_tx.set_for_timestamp(
1723+
timestamp=timestamp,
1724+
value=value,
1725+
prefix=key,
1726+
retention_ms=retention_ms,
1727+
)
17271728

17281729
left = self.apply(left_func, metadata=True).filter(
17291730
lambda value: value is not DISCARDED

tests/test_quixstreams/test_dataframe/test_dataframe.py

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import warnings
55
from collections import namedtuple
66
from datetime import timedelta
7+
from functools import partial
78
from typing import Any, get_args
89
from unittest import mock
910

@@ -2873,47 +2874,33 @@ def merger(left, right):
28732874
joined_value = publish(joined_sdf, left_topic, value=2, key=b"key", timestamp=2)
28742875
assert joined_value == [({"left": 2, "right": 1}, b"key", 2, None)]
28752876

2876-
@pytest.mark.parametrize(
2877-
"retention_ms, right_timestamp, left_timestamp, joined",
2878-
[
2879-
# Retention strategy includes right values greater or equal to 5 - 3 = 2
2880-
(3, 2, 5, True),
2881-
(timedelta(milliseconds=3), 2, 5, True),
2882-
# Retention strategy ignores right values lower than 6 - 3 = 3
2883-
(3, 2, 6, False),
2884-
(timedelta(milliseconds=3), 2, 6, False),
2885-
],
2886-
)
28872877
def test_retention_ms(
28882878
self,
28892879
create_topic,
28902880
create_sdf,
28912881
assign_partition,
28922882
publish,
2893-
retention_ms,
2894-
right_timestamp,
2895-
left_timestamp,
2896-
joined,
28972883
):
28982884
left_topic, right_topic = create_topic(), create_topic()
28992885
left_sdf, right_sdf = create_sdf(left_topic), create_sdf(right_topic)
29002886

2901-
joined_sdf = left_sdf.join(right_sdf, retention_ms=retention_ms)
2887+
joined_sdf = left_sdf.join(right_sdf, retention_ms=10)
29022888
assign_partition(right_sdf)
29032889

2904-
publish(
2905-
joined_sdf,
2906-
right_topic,
2907-
value={"right": 1},
2908-
key=b"key",
2909-
timestamp=right_timestamp,
2910-
)
2911-
joined_value = publish(
2890+
# min eligible timestamp is 15 - 10 = 5
2891+
publish(joined_sdf, right_topic, value={"right": 1}, key=b"key", timestamp=15)
2892+
2893+
# min eligible timestamp is still 5
2894+
publish(joined_sdf, right_topic, value={"right": 3}, key=b"key", timestamp=4)
2895+
publish(joined_sdf, right_topic, value={"right": 2}, key=b"key", timestamp=5)
2896+
2897+
publish_left = partial(
2898+
publish,
29122899
joined_sdf,
29132900
left_topic,
2914-
value={"left": 2},
2901+
value={"left": 4},
29152902
key=b"key",
2916-
timestamp=left_timestamp,
29172903
)
29182904

2919-
assert bool(joined_value) == joined
2905+
assert publish_left(timestamp=4) == []
2906+
assert publish_left(timestamp=5) == [({"left": 4, "right": 2}, b"key", 5, None)]

0 commit comments

Comments
 (0)