Skip to content

Commit 58524c5

Browse files
authored
concat(): Require timestamp alignment only when SDFs have different topics (#866)
1 parent fd9688b commit 58524c5

File tree

2 files changed

+12
-2
lines changed

2 files changed

+12
-2
lines changed

quixstreams/dataframe/dataframe.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import contextvars
44
import functools
5+
import itertools
56
import operator
67
import pprint
78
import typing
@@ -1644,7 +1645,13 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame":
16441645
"""
16451646

16461647
merged_stream = self.stream.merge(other.stream)
1647-
self._registry.require_time_alignment()
1648+
1649+
# Enable partition time alignment only when concatenated StreamingDataFrames
1650+
# have different topics (they could be just branches)
1651+
total_topics = {t.name for t in itertools.chain(self.topics, other.topics)}
1652+
if len(total_topics) > 1:
1653+
self._registry.require_time_alignment()
1654+
16481655
return self.__dataframe_clone__(
16491656
*self.topics, *other.topics, stream=merged_stream
16501657
)

tests/test_quixstreams/test_dataframe/test_dataframe.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2558,7 +2558,8 @@ def test_concat_same_topic_success(self, topic_manager_factory, dataframe_factor
25582558
topic_manager = topic_manager_factory()
25592559
topic = topic_manager.topic(str(uuid.uuid4()))
25602560

2561-
sdf = dataframe_factory(topic)
2561+
registry = DataFrameRegistry()
2562+
sdf = dataframe_factory(topic, registry=registry)
25622563
sdf_branch1 = sdf.apply(lambda v: v + 1)
25632564
sdf_branch2 = sdf.apply(lambda v: v + 2)
25642565

@@ -2570,6 +2571,8 @@ def test_concat_same_topic_success(self, topic_manager_factory, dataframe_factor
25702571
(2, b"key1", 1, None),
25712572
(3, b"key1", 1, None),
25722573
]
2574+
# Timestamp alignment is not required the concated SDFs are branches
2575+
assert not registry.requires_time_alignment
25732576

25742577
def test_concat_stateful_success(
25752578
self,

0 commit comments

Comments
 (0)