Skip to content

Commit 50c6140

Browse files
committed
YQL-19424: Use WideStream instead of WideFlow in WideToBlocks computation node
commit_hash:0c0bfb556ff1f51f3293899c0364cd56c3965c41
1 parent 8e899a3 commit 50c6140

File tree

28 files changed

+466
-390
lines changed

28 files changed

+466
-390
lines changed

yql/essentials/core/peephole_opt/yql_opt_peephole_physical.cpp

Lines changed: 72 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -128,32 +128,22 @@ TExprNode::TPtr RebuildArgumentsOnlyLambdaForBlocks(const TExprNode& lambda, TEx
128128

129129
TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
130130
Y_UNUSED(types);
131-
132-
// Static assert to ensure backward compatible change: if the
133-
// constant below is true, both input and output types of
134-
// WideToBlocks callable have to be WideStream; otherwise,
135-
// both input and output types have to be WideFlow.
136-
// FIXME: When all spots using WideToBlocks are adjusted
137-
// to work with WideStream, drop the assertion below.
138-
static_assert(!NYql::NBlockStreamIO::WideToBlocks);
139-
140131
const auto& input = node->Head();
141-
if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) {
142-
const auto& wideFromBlocks = input.Head();
143-
// Technically, the code below rewrites the following sequence
144-
// (WideToBlocks (ToFlow (WideFromBlocks (<input>)))))
145-
// into (ReplicateScalars (<input>)), but ToFlow/FromFlow
146-
// wrappers will be removed when all other nodes in block
147-
// pipeline start using WideStream instead of the WideFlow.
148-
// Hence, the logging is left intact.
149-
YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << wideFromBlocks.Content();
132+
if (input.IsCallable("WideFromBlocks")) {
133+
YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << input.Content();
150134
// If tail is FromFlow, its input is WideFlow and can be
151135
// used intact; Otherwise the input is WideStream, so the
152136
// new input should be converted to WideFlow.
153-
const auto tail = wideFromBlocks.HeadPtr();
137+
const auto tail = input.HeadPtr();
154138
const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr()
155139
: ctx.NewCallable(tail->Pos(), "ToFlow", { tail });
156-
return ctx.NewCallable(node->Pos(), "ReplicateScalars", { flowInput });
140+
return ctx.Builder(node->Pos())
141+
.Callable("FromFlow")
142+
.Callable(0, "ReplicateScalars")
143+
.Add(0, flowInput)
144+
.Seal()
145+
.Seal()
146+
.Build();
157147
}
158148

159149
if (input.IsCallable({"Extend", "OrderedExtend"})) {
@@ -171,26 +161,10 @@ TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext&
171161

172162
TExprNode::TPtr OptimizeWideFromBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
173163
Y_UNUSED(types);
174-
175-
// Static assert to ensure backward compatible change: if the
176-
// constant below is true, both input and output types of
177-
// WideToBlocks callable have to be WideStream; otherwise,
178-
// both input and output types have to be WideFlow.
179-
// FIXME: When all spots using WideToBlocks are adjusted
180-
// to work with WideStream, drop the assertion below.
181-
static_assert(!NYql::NBlockStreamIO::WideToBlocks);
182-
183164
const auto& input = node->Head();
184-
if (input.IsCallable("FromFlow") && input.Head().IsCallable("WideToBlocks")) {
185-
const auto& wideToBlocks = input.Head();
186-
// Technically, the code below rewrites the following sequence
187-
// (WideFromBlocks (FromFlow (WideToBlocks (<input>))))
188-
// into (FromFlow (<input>)) (to match the ToFlow parent),
189-
// but ToFlow/FromFlow wrappers will be removed when all
190-
// other nodes in block pipeline start using WideStream
191-
// instead of the WideFlow. Hence, the logging is left intact.
192-
YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << wideToBlocks.Content();
193-
return ctx.NewCallable(node->Pos(), "FromFlow", {wideToBlocks.HeadPtr()});
165+
if (input.IsCallable("WideToBlocks")) {
166+
YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << input.Content();
167+
return input.HeadPtr();
194168
}
195169

196170
if (input.IsCallable("FromFlow") && input.Head().IsCallable("ReplicateScalars")) {
@@ -6268,22 +6242,17 @@ TExprNode::TPtr OptimizeWideMapBlocks(const TExprNode::TPtr& node, TExprContext&
62686242

62696243
YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to blocks, extra nodes: " << newNodes
62706244
<< ", extra columns: " << rewritePositions.size();
6271-
6272-
// Static assert to ensure backward compatible change: if the
6273-
// constant below is true, both input and output types of
6274-
// WideToBlocks callable have to be WideStream; otherwise,
6275-
// both input and output types have to be WideFlow.
6276-
// FIXME: When all spots using WideToBlocks are adjusted
6277-
// to work with WideStream, drop the assertion below.
6278-
static_assert(!NYql::NBlockStreamIO::WideToBlocks);
6279-
62806245
auto ret = ctx.Builder(node->Pos())
62816246
.Callable("ToFlow")
62826247
.Callable(0, "WideFromBlocks")
62836248
.Callable(0, "FromFlow")
62846249
.Callable(0, "WideMap")
6285-
.Callable(0, "WideToBlocks")
6286-
.Add(0, node->HeadPtr())
6250+
.Callable(0, "ToFlow")
6251+
.Callable(0, "WideToBlocks")
6252+
.Callable(0, "FromFlow")
6253+
.Add(0, node->HeadPtr())
6254+
.Seal()
6255+
.Seal()
62876256
.Seal()
62886257
.Add(1, blockLambda)
62896258
.Seal()
@@ -6318,18 +6287,14 @@ TExprNode::TPtr OptimizeWideFilterBlocks(const TExprNode::TPtr& node, TExprConte
63186287
return node;
63196288
}
63206289

6321-
// Static assert to ensure backward compatible change: if the
6322-
// constant below is true, both input and output types of
6323-
// WideToBlocks callable have to be WideStream; otherwise,
6324-
// both input and output types have to be WideFlow.
6325-
// FIXME: When all spots using WideToBlocks are adjusted
6326-
// to work with WideStream, drop the assertion below.
6327-
static_assert(!NYql::NBlockStreamIO::WideToBlocks);
6328-
63296290
auto blockMapped = ctx.Builder(node->Pos())
63306291
.Callable("WideMap")
6331-
.Callable(0, "WideToBlocks")
6332-
.Add(0, node->HeadPtr())
6292+
.Callable(0, "ToFlow")
6293+
.Callable(0, "WideToBlocks")
6294+
.Callable(0, "FromFlow")
6295+
.Add(0, node->HeadPtr())
6296+
.Seal()
6297+
.Seal()
63336298
.Seal()
63346299
.Add(1, blockLambda)
63356300
.Seal()
@@ -6446,22 +6411,17 @@ TExprNode::TPtr OptimizeSkipTakeToBlocks(const TExprNode::TPtr& node, TExprConte
64466411

64476412
TStringBuf newName = node->Content() == "Skip" ? "WideSkipBlocks" : "WideTakeBlocks";
64486413
YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName;
6449-
6450-
// Static assert to ensure backward compatible change: if the
6451-
// constant below is true, both input and output types of
6452-
// WideToBlocks callable have to be WideStream; otherwise,
6453-
// both input and output types have to be WideFlow.
6454-
// FIXME: When all spots using WideToBlocks are adjusted
6455-
// to work with WideStream, drop the assertion below.
6456-
static_assert(!NYql::NBlockStreamIO::WideToBlocks);
6457-
64586414
return ctx.Builder(node->Pos())
64596415
.Callable("ToFlow")
64606416
.Callable(0, "WideFromBlocks")
64616417
.Callable(0, "FromFlow")
64626418
.Callable(0, newName)
6463-
.Callable(0, "WideToBlocks")
6464-
.Add(0, node->HeadPtr())
6419+
.Callable(0, "ToFlow")
6420+
.Callable(0, "WideToBlocks")
6421+
.Callable(0, "FromFlow")
6422+
.Add(0, node->HeadPtr())
6423+
.Seal()
6424+
.Seal()
64656425
.Seal()
64666426
.Add(1, node->ChildPtr(1))
64676427
.Seal()
@@ -6504,16 +6464,15 @@ TExprNode::TPtr OptimizeTopOrSortBlocks(const TExprNode::TPtr& node, TExprContex
65046464
TString newName = node->Content() + TString("Blocks");
65056465
YQL_CLOG(DEBUG, CorePeepHole) << "Convert " << node->Content() << " to " << newName;
65066466
auto children = node->ChildrenList();
6507-
6508-
// Static assert to ensure backward compatible change: if the
6509-
// constant below is true, both input and output types of
6510-
// WideToBlocks callable have to be WideStream; otherwise,
6511-
// both input and output types have to be WideFlow.
6512-
// FIXME: When all spots using WideToBlocks are adjusted
6513-
// to work with WideStream, drop the assertion below.
6514-
static_assert(!NYql::NBlockStreamIO::WideToBlocks);
6515-
6516-
children[0] = ctx.NewCallable(node->Pos(), "WideToBlocks", { children[0] });
6467+
children[0] = ctx.Builder(node->Pos())
6468+
.Callable("ToFlow")
6469+
.Callable(0, "WideToBlocks")
6470+
.Callable(0, "FromFlow")
6471+
.Add(0, children[0])
6472+
.Seal()
6473+
.Seal()
6474+
.Seal()
6475+
.Build();
65176476
return ctx.Builder(node->Pos())
65186477
.Callable("ToFlow")
65196478
.Callable(0, "WideFromBlocks")
@@ -6726,27 +6685,46 @@ TExprNode::TPtr OptimizeWideMaps(const TExprNode::TPtr& node, TExprContext& ctx)
67266685
.Seal()
67276686
.Add(1, DropUnusedArgs(node->Tail(), unused, ctx))
67286687
.Seal().Build();
6729-
} else if (input.IsCallable("WideToBlocks")) {
6730-
// Static assert to ensure backward compatible change: if the
6731-
// constant below is true, both input and output types of
6732-
// WideToBlocks callable have to be WideStream; otherwise,
6733-
// both input and output types have to be WideFlow.
6734-
// FIXME: When all spots using WideToBlocks are adjusted
6735-
// to work with WideStream, drop the assertion below.
6736-
static_assert(!NYql::NBlockStreamIO::WideToBlocks);
6737-
6688+
} else if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideToBlocks")) {
67386689
auto actualUnused = unused;
67396690
if (actualUnused.back() + 1U == node->Tail().Head().ChildrenSize())
67406691
actualUnused.pop_back();
67416692
if (!actualUnused.empty()) {
6742-
YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over " << input.Content() << " with " << actualUnused.size() << " unused fields.";
6693+
const auto& wideToBlocks = input.Head();
6694+
// WideToBlocks uses WideStream instead of WideFlow,
6695+
// so it's wrapped with ToFlow/FromFlow. Hence, to drop
6696+
// unused fields for particular WideToBlocks node,
6697+
// the optimizer has to rewrite FromFlow child, but
6698+
// logging is left intact.
6699+
YQL_CLOG(DEBUG, CorePeepHole) << node->Content() << " over " << wideToBlocks.Content() << " with " << actualUnused.size() << " unused fields.";
6700+
const auto tail = wideToBlocks.HeadPtr();
6701+
const auto width = tail->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>()->GetSize();
6702+
const auto flowInput = tail->IsCallable("FromFlow") ? tail->HeadPtr()
6703+
: ctx.NewCallable(tail->Pos(), "ToFlow", { tail });
67436704
return ctx.Builder(node->Pos())
67446705
.Callable(node->Content())
6745-
.Callable(0, input.Content())
6746-
.Add(0, MakeWideMapForDropUnused(input.HeadPtr(), actualUnused, ctx))
6706+
.Callable(0, "ToFlow")
6707+
.Callable(0, "WideToBlocks")
6708+
.Callable(0, "FromFlow")
6709+
.Callable(0, "WideMap")
6710+
.Add(0, flowInput)
6711+
.Lambda(1)
6712+
.Params("items", width)
6713+
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
6714+
for (auto i = 0U, j = 0U; i < width; ++i) {
6715+
if (unused.cend() == std::find(unused.cbegin(), unused.cend(), i))
6716+
parent.Arg(j++, "items", i);
6717+
}
6718+
return parent;
6719+
})
6720+
.Seal()
6721+
.Seal()
6722+
.Seal()
6723+
.Seal()
67476724
.Seal()
67486725
.Add(1, DropUnusedArgs(node->Tail(), actualUnused, ctx))
6749-
.Seal().Build();
6726+
.Seal()
6727+
.Build();
67506728
}
67516729
} else if (input.IsCallable("ToFlow") && input.Head().IsCallable("WideFromBlocks")) {
67526730
const auto& wideFromBlocks = input.Head();

yql/essentials/core/type_ann/type_ann_blocks.cpp

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -868,24 +868,15 @@ IGraphTransformer::TStatus BlockMergeFinalizeHashedWrapper(const TExprNode::TPtr
868868

869869
IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
870870
Y_UNUSED(output);
871-
872-
// Static assert to ensure backward compatible change: if the
873-
// constant below is true, both input and output types of
874-
// WideToBlocks callable have to be WideStream; otherwise,
875-
// both input and output types have to be WideFlow.
876-
// FIXME: When all spots using WideToBlocks are adjusted
877-
// to work with WideStream, drop the assertion below.
878-
static_assert(!NYql::NBlockStreamIO::WideToBlocks);
879-
880871
if (!EnsureArgsCount(*input, 1U, ctx.Expr)) {
881872
return IGraphTransformer::TStatus::Error;
882873
}
883874

884-
if (!EnsureWideFlowType(input->Head(), ctx.Expr)) {
875+
if (!EnsureWideStreamType(input->Head(), ctx.Expr)) {
885876
return IGraphTransformer::TStatus::Error;
886877
}
887878

888-
const auto multiType = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>();
879+
const auto multiType = input->Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>();
889880
TTypeAnnotationNode::TListType retMultiType;
890881
for (const auto& type : multiType->GetItems()) {
891882
if (type->IsBlockOrScalar()) {
@@ -902,7 +893,7 @@ IGraphTransformer::TStatus WideToBlocksWrapper(const TExprNode::TPtr& input, TEx
902893

903894
retMultiType.push_back(ctx.Expr.MakeType<TScalarExprType>(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64)));
904895
auto outputItemType = ctx.Expr.MakeType<TMultiExprType>(retMultiType);
905-
input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(outputItemType));
896+
input->SetTypeAnn(ctx.Expr.MakeType<TStreamExprType>(outputItemType));
906897
return IGraphTransformer::TStatus::Ok;
907898
}
908899

yql/essentials/core/yql_aggregate_expander.cpp

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -679,17 +679,15 @@ TExprNode::TPtr TAggregateExpander::MakeInputBlocks(const TExprNode::TPtr& strea
679679

680680
auto extractorLambda = Ctx.NewLambda(Node->Pos(), Ctx.NewArguments(Node->Pos(), std::move(extractorArgs)), std::move(extractorRoots));
681681
auto mappedWideFlow = Ctx.NewCallable(Node->Pos(), "WideMap", { wideFlow, extractorLambda });
682-
683-
// Static assert to ensure backward compatible change: if the
684-
// constant below is true, both input and output types of
685-
// WideToBlocks callable have to be WideStream; otherwise,
686-
// both input and output types have to be WideFlow.
687-
// FIXME: When all spots using WideToBlocks are adjusted
688-
// to work with WideStream, drop the assertion below.
689-
static_assert(!NYql::NBlockStreamIO::WideToBlocks);
690-
691-
auto blocks = Ctx.NewCallable(Node->Pos(), "WideToBlocks", { mappedWideFlow });
692-
return blocks;
682+
return Ctx.Builder(Node->Pos())
683+
.Callable("ToFlow")
684+
.Callable(0, "WideToBlocks")
685+
.Callable(0, "FromFlow")
686+
.Add(0, mappedWideFlow)
687+
.Seal()
688+
.Seal()
689+
.Seal()
690+
.Build();
693691
}
694692

695693
TExprNode::TPtr TAggregateExpander::TryGenerateBlockCombineAllOrHashed() {

yql/essentials/core/yql_expr_type_annotation.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ TStringBuf NormalizeCallableName(TStringBuf name);
355355
void CheckExpectedTypeAndColumnOrder(const TExprNode& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx);
356356

357357
namespace NBlockStreamIO {
358-
constexpr bool WideToBlocks = false;
358+
constexpr bool WideToBlocks = true;
359359
} // namespace NBlockStreamIO
360360

361361
}

0 commit comments

Comments
 (0)