Skip to content

Commit d3b5643

Browse files
authored
Make DQ use only WideStream for ReplicateScalars (#16792)
1 parent 2a34cb8 commit d3b5643

File tree

1 file changed

+17
-52
lines changed

1 file changed

+17
-52
lines changed

ydb/library/yql/dq/opt/dq_opt_build.cpp

Lines changed: 17 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -645,15 +645,6 @@ TDqPhyStage DqEnableWideChannelsInputForStage(const TDqPhyStage& stage, TExprCon
645645
}
646646

647647
bool CanPullReplicateScalars(const TDqPhyStage& stage) {
648-
if constexpr (!NYql::NBlockStreamIO::ReplicateScalars) {
649-
auto maybeFromFlow = stage.Program().Body().Maybe<TCoFromFlow>();
650-
if (!maybeFromFlow) {
651-
return false;
652-
}
653-
654-
return bool(maybeFromFlow.Cast().Input().Maybe<TCoReplicateScalars>());
655-
}
656-
657648
return bool(stage.Program().Body().Maybe<TCoReplicateScalars>());
658649
}
659650

@@ -698,55 +689,29 @@ TDqPhyStage DqPullReplicateScalarsFromInputs(const TDqPhyStage& stage, TExprCont
698689
TDqPhyStage childStage = conn.Output().Stage().Cast<TDqPhyStage>();
699690
TCoLambda childProgram(ctx.DeepCopyLambda(childStage.Program().Ref()));
700691

701-
TMaybeNode<TExprBase> newChildStage;
702-
TExprNode::TPtr argReplace;
703-
TExprNode::TPtr newArgNode = newArg.Ptr();
704-
if constexpr (!NYql::NBlockStreamIO::ReplicateScalars) {
705-
TCoReplicateScalars childReplicateScalars = childProgram.Body().Cast<TCoFromFlow>().Input().Cast<TCoReplicateScalars>();
706-
707-
// replace FromFlow(ReplicateScalars(x, ...)) with FromFlow(x)
708-
newChildStage = Build<TDqPhyStage>(ctx, childStage.Pos())
709-
.InitFrom(childStage)
710-
.Program()
711-
.Args(childProgram.Args())
712-
.Body(ctx.ChangeChild(childProgram.Body().Ref(), TCoFromFlow::idx_Input, childReplicateScalars.Input().Ptr()))
713-
.Build()
714-
.Done();
715-
argReplace = Build<TCoFromFlow>(ctx, arg.Pos())
716-
.Input<TCoReplicateScalars>()
717-
.Input<TCoToFlow>()
718-
.Input(newArgNode)
719-
.Build()
720-
.Indexes(childReplicateScalars.Indexes())
721-
.Build()
722-
.Done()
723-
.Ptr();
724-
} else {
725-
TCoReplicateScalars childReplicateScalars = childProgram.Body().Cast<TCoReplicateScalars>();
726-
727-
// replace (ReplicateScalars(x, ...)) with (x)
728-
newChildStage = Build<TDqPhyStage>(ctx, childStage.Pos())
729-
.InitFrom(childStage)
730-
.Program()
731-
.Args(childProgram.Args())
732-
.Body(childReplicateScalars.Input())
733-
.Build()
734-
.Done();
735-
736-
argReplace = Build<TCoReplicateScalars>(ctx, arg.Pos())
737-
.Input(newArgNode)
738-
.Indexes(childReplicateScalars.Indexes())
739-
.Done()
740-
.Ptr();
741-
}
692+
TCoReplicateScalars childReplicateScalars = childProgram.Body().Cast<TCoReplicateScalars>();
742693

694+
// replace (ReplicateScalars(x, ...)) with (x)
695+
auto newChildStage = Build<TDqPhyStage>(ctx, childStage.Pos())
696+
.InitFrom(childStage)
697+
.Program()
698+
.Args(childProgram.Args())
699+
.Body(childReplicateScalars.Input())
700+
.Build()
701+
.Done();
743702
auto newOutput = Build<TDqOutput>(ctx, conn.Output().Pos())
744703
.InitFrom(conn.Output())
745-
.Stage(newChildStage.Cast().Ptr())
704+
.Stage(newChildStage)
746705
.Done();
706+
newInputs.push_back(ctx.ChangeChild(conn.Ref(), TDqConnection::idx_Output, newOutput.Ptr()));
747707

708+
TExprNode::TPtr newArgNode = newArg.Ptr();
709+
TExprNode::TPtr argReplace = Build<TCoReplicateScalars>(ctx, arg.Pos())
710+
.Input(newArgNode)
711+
.Indexes(childReplicateScalars.Indexes())
712+
.Done()
713+
.Ptr();
748714
argsMap.emplace(arg.Raw(), argReplace);
749-
newInputs.push_back(ctx.ChangeChild(conn.Ref(), TDqConnection::idx_Output, newOutput.Ptr()));
750715
} else {
751716
argsMap.emplace(arg.Raw(), newArg.Ptr());
752717
newInputs.push_back(stage.Inputs().Item(i).Ptr());

0 commit comments

Comments
 (0)