Skip to content

Commit 0f8074b

Browse files
committed
Merge PR #10707: Fixed: Make block combine use stream instead of flow
commit_hash:946462d1ea7e74758c7d6f86cc30cd674dc2195e
1 parent 39b2d17 commit 0f8074b

File tree

5 files changed

+1140
-692
lines changed

5 files changed

+1140
-692
lines changed

yql/essentials/core/type_ann/type_ann_blocks.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -791,7 +791,7 @@ IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input,
791791
}
792792

793793
TTypeAnnotationNode::TListType blockItemTypes;
794-
if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
794+
if (!EnsureWideStreamBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
795795
return IGraphTransformer::TStatus::Error;
796796
}
797797

@@ -817,7 +817,7 @@ IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input,
817817
}
818818

819819
auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
820-
input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
820+
input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType));
821821
return IGraphTransformer::TStatus::Ok;
822822
}
823823

@@ -828,7 +828,7 @@ IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& inpu
828828
}
829829

830830
TTypeAnnotationNode::TListType blockItemTypes;
831-
if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
831+
if (!EnsureWideStreamBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
832832
return IGraphTransformer::TStatus::Error;
833833
}
834834

@@ -867,7 +867,7 @@ IGraphTransformer::TStatus BlockCombineHashedWrapper(const TExprNode::TPtr& inpu
867867

868868
retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64)));
869869
auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
870-
input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
870+
input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType));
871871
return IGraphTransformer::TStatus::Ok;
872872
}
873873

@@ -879,7 +879,7 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr
879879
}
880880

881881
TTypeAnnotationNode::TListType blockItemTypes;
882-
if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
882+
if (!EnsureWideStreamBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
883883
return IGraphTransformer::TStatus::Error;
884884
}
885885
YQL_ENSURE(blockItemTypes.size() > 0);
@@ -917,7 +917,7 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr
917917
}
918918

919919
// disallow any scalar columns except for streamIndex column
920-
auto itemTypes = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems();
920+
auto itemTypes = input->Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems();
921921
for (ui32 i = 0; i + 1 < itemTypes.size(); ++i) {
922922
bool isScalar = itemTypes[i]->GetKind() == ETypeAnnotationKind::Scalar;
923923
if (isScalar && i != streamIndex) {
@@ -929,7 +929,7 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr
929929

930930
retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64)));
931931
auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
932-
input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
932+
input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType));
933933
return IGraphTransformer::TStatus::Ok;
934934
}
935935

yql/essentials/core/yql_aggregate_expander.cpp

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -699,7 +699,8 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
699699
} else {
700700
stream = AggList;
701701
}
702-
auto blocks = MakeInputBlocks(stream, keyIdxs, outputColumns, aggs, false, false);
702+
703+
TExprNode::TPtr blocks = MakeInputBlocks(stream, keyIdxs, outputColumns, aggs, false, false);
703704
if (!blocks) {
704705
return nullptr;
705706
}
@@ -708,22 +709,30 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {
708709
if (hashed) {
709710
aggWideFlow = Ctx.Builder(Node->Pos())
710711
.Callable("WideFromBlocks")
711-
.Callable(0, "BlockCombineHashed")
712-
.Add(0, blocks)
713-
.Callable(1, "Void")
712+
.Callable(0, "ToFlow")
713+
.Callable(0, "BlockCombineHashed")
714+
.Callable(0, "FromFlow")
715+
.Add(0, blocks)
716+
.Seal()
717+
.Callable(1, "Void")
718+
.Seal()
719+
.Add(2, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
720+
.Add(3, Ctx.NewList(Node->Pos(), std::move(aggs)))
714721
.Seal()
715-
.Add(2, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
716-
.Add(3, Ctx.NewList(Node->Pos(), std::move(aggs)))
717722
.Seal()
718723
.Seal()
719724
.Build();
720725
} else {
721726
aggWideFlow = Ctx.Builder(Node->Pos())
722-
.Callable("BlockCombineAll")
723-
.Add(0, blocks)
724-
.Callable(1, "Void")
727+
.Callable("ToFlow")
728+
.Callable(0, "BlockCombineAll")
729+
.Callable(0, "FromFlow")
730+
.Add(0, blocks)
731+
.Seal()
732+
.Callable(1, "Void")
733+
.Seal()
734+
.Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
725735
.Seal()
726-
.Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
727736
.Seal()
728737
.Build();
729738
}
@@ -2891,23 +2900,31 @@ TExprNode::TPtr TAggregateExpander::TryGenerateBlockMergeFinalizeHashed() {
28912900
TExprNode::TPtr aggBlocks;
28922901
if (!isMany) {
28932902
aggBlocks = Ctx.Builder(Node->Pos())
2894-
.Callable("BlockMergeFinalizeHashed")
2895-
.Add(0, blocks)
2896-
.Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
2897-
.Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
2903+
.Callable("ToFlow")
2904+
.Callable(0, "BlockMergeFinalizeHashed")
2905+
.Callable(0, "FromFlow")
2906+
.Add(0, blocks)
2907+
.Seal()
2908+
.Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
2909+
.Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
2910+
.Seal()
28982911
.Seal()
28992912
.Build();
29002913
} else {
29012914
auto manyStreamsSetting = GetSetting(*Node->Child(3), "many_streams");
29022915
YQL_ENSURE(manyStreamsSetting, "Missing many_streams setting");
29032916

29042917
aggBlocks = Ctx.Builder(Node->Pos())
2905-
.Callable("BlockMergeManyFinalizeHashed")
2906-
.Add(0, blocks)
2907-
.Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
2908-
.Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
2909-
.Atom(3, ToString(streamIdxColumn))
2910-
.Add(4, manyStreamsSetting->TailPtr())
2918+
.Callable("ToFlow")
2919+
.Callable(0, "BlockMergeManyFinalizeHashed")
2920+
.Callable(0, "FromFlow")
2921+
.Add(0, blocks)
2922+
.Seal()
2923+
.Add(1, Ctx.NewList(Node->Pos(), std::move(keyIdxs)))
2924+
.Add(2, Ctx.NewList(Node->Pos(), std::move(aggs)))
2925+
.Atom(3, ToString(streamIdxColumn))
2926+
.Add(4, manyStreamsSetting->TailPtr())
2927+
.Seal()
29112928
.Seal()
29122929
.Build();
29132930
}

0 commit comments

Comments
 (0)