Skip to content

Commit 777fc8e

Browse files
authored
adjust the changelog name for named window operations to accommodate branching (#563)
1 parent fc8ac2d commit 777fc8e

File tree

5 files changed

+15
-10
lines changed

5 files changed

+15
-10
lines changed

quixstreams/dataframe/windows/definitions.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -286,10 +286,8 @@ def __init__(
286286
)
287287

288288
def _get_name(self, func_name: str) -> str:
289-
return (
290-
self._name
291-
or f"hopping_window_{self._duration_ms}_{self._step_ms}_{func_name}"
292-
)
289+
prefix = f"{self._name}_hopping_window" if self._name else "hopping_window"
290+
return f"{prefix}_{self._duration_ms}_{self._step_ms}_{func_name}"
293291

294292
def _create_window(
295293
self,
@@ -321,7 +319,8 @@ def __init__(
321319
)
322320

323321
def _get_name(self, func_name: str) -> str:
324-
return self._name or f"tumbling_window_{self._duration_ms}_{func_name}"
322+
prefix = f"{self._name}_tumbling_window" if self._name else "tumbling_window"
323+
return f"{prefix}_{self._duration_ms}_{func_name}"
325324

326325
def _create_window(
327326
self,

quixstreams/state/manager.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,10 @@ def register_windowed_store(self, topic_name: str, store_name: str):
168168
"""
169169
store = self._stores.get(topic_name, {}).get(store_name)
170170
if store:
171-
raise WindowedStoreAlreadyRegisteredError()
171+
raise WindowedStoreAlreadyRegisteredError(
172+
"This window range and type combination already exists; "
173+
"to use this window, provide a unique name via the `name` parameter."
174+
)
172175
self._stores.setdefault(topic_name, {})[store_name] = WindowedRocksDBStore(
173176
name=store_name,
174177
topic=topic_name,

tests/test_quixstreams/test_app.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1511,6 +1511,9 @@ def test_changelog_recovery_window_store(
15111511
store_name = "window"
15121512
window_duration_ms = 5000
15131513
window_step_ms = 2000
1514+
actual_store_name = (
1515+
f"{store_name}_hopping_window_{window_duration_ms}_{window_step_ms}_sum"
1516+
)
15141517
msg_tick_ms = 1000
15151518
msg_int_value = 10
15161519
partition_timestamps = {
@@ -1591,13 +1594,13 @@ def validate_state():
15911594
with state_manager_factory(
15921595
group_id=consumer_group, state_dir=state_dir
15931596
) as state_manager:
1594-
state_manager.register_windowed_store(topic.name, store_name)
1597+
state_manager.register_windowed_store(topic.name, actual_store_name)
15951598
for p_num, windows in expected_window_updates.items():
15961599
state_manager.on_partition_assign(
15971600
topic=topic.name, partition=p_num, committed_offset=-1001
15981601
)
15991602
store = state_manager.get_store(
1600-
topic=topic.name, store_name=store_name
1603+
topic=topic.name, store_name=actual_store_name
16011604
)
16021605

16031606
# in this test, each expiration check only deletes one window,

tests/test_quixstreams/test_dataframe/test_windows/test_hopping.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class TestHoppingWindow:
2121
@pytest.mark.parametrize(
2222
"duration, grace, step, provided_name, func_name, expected_name",
2323
[
24-
(10, 5, 3, "custom_window", "sum", "custom_window"),
24+
(10, 5, 3, "custom_window", "sum", "custom_window_hopping_window_10_3_sum"),
2525
(10, 5, 3, None, "sum", "hopping_window_10_3_sum"),
2626
(15, 5, 3, None, "count", "hopping_window_15_3_count"),
2727
],

tests/test_quixstreams/test_dataframe/test_windows/test_tumbling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class TestTumblingWindow:
1919
@pytest.mark.parametrize(
2020
"duration, grace, provided_name, func_name, expected_name",
2121
[
22-
(10, 5, "custom_window", "sum", "custom_window"),
22+
(10, 5, "custom_window", "sum", "custom_window_tumbling_window_10_sum"),
2323
(10, 5, None, "sum", "tumbling_window_10_sum"),
2424
(15, 5, None, "count", "tumbling_window_15_count"),
2525
],

0 commit comments

Comments
 (0)