Skip to content

Commit 2c63185

Browse files
authored
Use AssumeConstraints for YtDqProcessWrite (#8530)
1 parent da5623c commit 2c63185

File tree

9 files changed

+379
-51
lines changed

9 files changed

+379
-51
lines changed

ydb/library/yql/core/yql_expr_constraint.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,10 @@ class TCallableConstraintTransformer : public TCallableTransformerBase<TCallable
399399
"Bad constraints yson-value: " << CurrentExceptionMessage()));
400400
return IGraphTransformer::TStatus::Error;
401401
}
402+
if (!set) {
403+
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "AssumeConstraints with empty constraints set"));
404+
return IGraphTransformer::TStatus::Error;
405+
}
402406
for (auto constraint: set.GetAllConstraints()) {
403407
if (!constraint->IsApplicableToType(*input->GetTypeAnn())) {
404408
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << *constraint

ydb/library/yql/providers/yt/common/yql_configuration.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,6 @@ constexpr ui16 DEFAULT_MAX_COLUMN_GROUPS = 64;
8989

9090
constexpr bool DEFAULT_DISABLE_FUSE_OPERATIONS = false;
9191

92+
constexpr bool DEFAULT_ENABLE_DQ_WRITE_CONSTRAINTS = false;
93+
9294
} // NYql

ydb/library/yql/providers/yt/common/yql_yt_settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,7 @@ TYtConfiguration::TYtConfiguration()
486486
REGISTER_SETTING(*this, MinColumnGroupSize).Lower(2);
487487
REGISTER_SETTING(*this, MaxColumnGroups);
488488
REGISTER_SETTING(*this, ExtendedStatsMaxChunkCount);
489+
REGISTER_SETTING(*this, _EnableYtDqProcessWriteConstraints);
489490
}
490491

491492
EReleaseTempDataMode GetReleaseTempDataMode(const TYtSettings& settings) {

ydb/library/yql/providers/yt/common/yql_yt_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ struct TYtSettings {
282282
NCommon::TConfSetting<ui16, false> MinColumnGroupSize;
283283
NCommon::TConfSetting<ui16, false> MaxColumnGroups;
284284
NCommon::TConfSetting<ui64, false> ExtendedStatsMaxChunkCount;
285+
NCommon::TConfSetting<bool, false> _EnableYtDqProcessWriteConstraints;
285286
};
286287

287288
EReleaseTempDataMode GetReleaseTempDataMode(const TYtSettings& settings);

ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ TYtPhysicalOptProposalTransformer::TYtPhysicalOptProposalTransformer(TYtState::T
2020
#define HNDL(name) "PhysicalOptimizer-"#name, Hndl(&TYtPhysicalOptProposalTransformer::name)
2121
AddHandler(0, &TCoMux::Match, HNDL(Mux));
2222
AddHandler(0, &TYtWriteTable::Match, HNDL(Write));
23-
AddHandler(0, &TYtWriteTable::Match, HNDL(DqWrite));
23+
if (!State_->Configuration->_EnableYtDqProcessWriteConstraints.Get().GetOrElse(DEFAULT_ENABLE_DQ_WRITE_CONSTRAINTS)) {
24+
AddHandler(0, &TYtWriteTable::Match, HNDL(DqWrite));
25+
}
2426
AddHandler(0, Names({TCoLength::CallableName(), TCoHasItems::CallableName()}), HNDL(Length));
2527
AddHandler(0, &TCoSort::Match, HNDL(Sort<false>));
2628
AddHandler(0, &TCoTopSort::Match, HNDL(Sort<true>));
@@ -33,7 +35,13 @@ TYtPhysicalOptProposalTransformer::TYtPhysicalOptProposalTransformer(TYtState::T
3335
AddHandler(0, &TCoOrderedLMap::Match, HNDL(LMap<TCoOrderedLMap>));
3436
AddHandler(0, &TCoEquiJoin::Match, HNDL(EquiJoin));
3537
AddHandler(0, &TCoCountBase::Match, HNDL(TakeOrSkip));
36-
AddHandler(0, &TYtWriteTable::Match, HNDL(Fill));
38+
if (State_->Configuration->_EnableYtDqProcessWriteConstraints.Get().GetOrElse(DEFAULT_ENABLE_DQ_WRITE_CONSTRAINTS)) {
39+
AddHandler(0, &TYtMaterialize::Match, HNDL(DqMaterialize));
40+
AddHandler(0, &TYtMaterialize::Match, HNDL(Materialize));
41+
AddHandler(0, &TYtWriteTable::Match, HNDL(FillToMaterialize));
42+
} else {
43+
AddHandler(0, &TYtWriteTable::Match, HNDL(Fill));
44+
}
3745
AddHandler(0, &TResPull::Match, HNDL(ResPull));
3846
if (State_->Configuration->UseNewPredicateExtraction.Get().GetOrElse(DEFAULT_USE_NEW_PREDICATE_EXTRACTION)) {
3947
AddHandler(0, Names({TYtMap::CallableName(), TYtMapReduce::CallableName()}), HNDL(ExtractKeyRange));
@@ -42,7 +50,8 @@ TYtPhysicalOptProposalTransformer::TYtPhysicalOptProposalTransformer(TYtState::T
4250
AddHandler(0, Names({TYtMap::CallableName(), TYtMapReduce::CallableName()}), HNDL(ExtractKeyRangeLegacy));
4351
}
4452
AddHandler(0, &TCoExtendBase::Match, HNDL(Extend));
45-
AddHandler(0, &TCoAssumeSorted::Match, HNDL(AssumeSorted));
53+
AddHandler(0, &TCoAssumeSorted::Match, HNDL(AssumeConstraints));
54+
AddHandler(0, &TCoAssumeConstraints::Match, HNDL(AssumeConstraints));
4655
AddHandler(0, &TYtMapReduce::Match, HNDL(AddTrivialMapperForNativeYtTypes));
4756
AddHandler(0, &TYtDqWrite::Match, HNDL(YtDqWrite));
4857
AddHandler(0, &TYtDqProcessWrite::Match, HNDL(YtDqProcessWrite));

ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,18 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
3434

3535
NNodes::TMaybeNode<NNodes::TExprBase> DqWrite(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) const;
3636

37+
NNodes::TMaybeNode<NNodes::TExprBase> Materialize(NNodes::TExprBase node, TExprContext& ctx) const;
38+
39+
NNodes::TMaybeNode<NNodes::TExprBase> DqMaterialize(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) const;
40+
3741
NNodes::TMaybeNode<NNodes::TExprBase> YtDqProcessWrite(NNodes::TExprBase node, TExprContext& ctx) const;
3842

3943
NNodes::TMaybeNode<NNodes::TExprBase> Write(NNodes::TExprBase node, TExprContext& ctx) const;
4044

4145
NNodes::TMaybeNode<NNodes::TExprBase> Fill(NNodes::TExprBase node, TExprContext& ctx) const;
4246

47+
NNodes::TMaybeNode<NNodes::TExprBase> FillToMaterialize(NNodes::TExprBase node, TExprContext& ctx) const;
48+
4349
NNodes::TMaybeNode<NNodes::TExprBase> TakeOrSkip(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const;
4450

4551
NNodes::TMaybeNode<NNodes::TExprBase> Extend(NNodes::TExprBase node, TExprContext& ctx) const;
@@ -73,7 +79,8 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
7379
NNodes::TMaybeNode<NNodes::TExprBase> FuseInnerMap(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const;
7480

7581
NNodes::TMaybeNode<NNodes::TExprBase> FuseOuterMap(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const;
76-
NNodes::TMaybeNode<NNodes::TExprBase> AssumeSorted(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const;
82+
83+
NNodes::TMaybeNode<NNodes::TExprBase> AssumeConstraints(NNodes::TExprBase node, TExprContext& ctx, const TGetParents& getParents) const;
7784

7885
NNodes::TMaybeNode<NNodes::TExprBase> LambdaFieldsSubset(NNodes::TYtWithUserJobsOpBase op, size_t lambdaIdx, TExprContext& ctx, const TGetParents& getParents) const;
7986

ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_sort.cpp

Lines changed: 68 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -462,40 +462,38 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::TopSort(TExprBase node,
462462
}
463463

464464

465-
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::AssumeSorted(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
465+
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::AssumeConstraints(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
466466
if (State_->Types->EvaluationInProgress || State_->PassiveExecution) {
467467
return node;
468468
}
469469

470-
auto assume = node.Cast<TCoAssumeSorted>();
470+
auto assume = node.Cast<TCoInputBase>();
471471
auto input = assume.Input();
472472
if (!IsYtProviderInput(input)) {
473473
return node;
474474
}
475475

476476
auto sorted = node.Ref().GetConstraint<TSortedConstraintNode>();
477-
if (!sorted) {
478-
// Drop AssumeSorted with unsupported sort modes
479-
return input;
480-
}
481477

482478
auto maybeOp = input.Maybe<TYtOutput>().Operation();
483479
bool needSeparateOp = !maybeOp
484480
|| maybeOp.Raw()->StartsExecution()
485481
|| (maybeOp.Raw()->HasResult() && maybeOp.Raw()->GetResult().Type() == TExprNode::World)
486482
|| IsOutputUsedMultipleTimes(maybeOp.Ref(), *getParents())
487-
|| maybeOp.Maybe<TYtMapReduce>()
488-
|| maybeOp.Maybe<TYtEquiJoin>();
483+
|| (sorted && maybeOp.Maybe<TYtMapReduce>())
484+
|| (sorted && maybeOp.Maybe<TYtEquiJoin>());
489485

490-
bool canMerge = false;
486+
bool canMerge = !sorted;
491487
bool equalSort = false;
492-
if (auto inputSort = input.Ref().GetConstraint<TSortedConstraintNode>()) {
493-
if (sorted->IsPrefixOf(*inputSort)) {
494-
canMerge = true;
495-
equalSort = sorted->Equals(*inputSort);
488+
if (sorted) {
489+
if (auto inputSort = input.Ref().GetConstraint<TSortedConstraintNode>()) {
490+
if (sorted->IsPrefixOf(*inputSort)) {
491+
canMerge = true;
492+
equalSort = sorted->Equals(*inputSort);
493+
}
496494
}
497495
}
498-
if (equalSort && maybeOp.Maybe<TYtSort>()) {
496+
if (equalSort && maybeOp.Maybe<TYtSort>() && node.Ref().GetAllConstraints().size() == 1 /* only sort constraint */) {
499497
return input;
500498
}
501499

@@ -508,9 +506,12 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::AssumeSorted(TExprBase
508506

509507
const bool useNativeDescSort = State_->Configuration->UseNativeDescSort.Get().GetOrElse(DEFAULT_USE_NATIVE_DESC_SORT);
510508

511-
TKeySelectorBuilder builder(assume.Pos(), ctx, useNativeDescSort, outItemType);
512-
builder.ProcessConstraint(*sorted);
513-
needSeparateOp = needSeparateOp || (builder.NeedMap() && !equalSort);
509+
THolder<TKeySelectorBuilder> builder;
510+
if (sorted) {
511+
builder = MakeHolder<TKeySelectorBuilder>(assume.Pos(), ctx, useNativeDescSort, outItemType);
512+
builder->ProcessConstraint(*sorted);
513+
needSeparateOp = needSeparateOp || (builder->NeedMap() && !equalSort && !maybeOp.Maybe<TYtDqProcessWrite>());
514+
}
514515

515516
if (needSeparateOp) {
516517
TYtOutTableInfo outTable(outItemType, State_->Configuration->UseNativeYtTypes.Get().GetOrElse(DEFAULT_USE_NATIVE_YT_TYPES) ? NTCF_ALL : NTCF_NONE);
@@ -533,14 +534,17 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::AssumeSorted(TExprBase
533534
&& firstNativeType == path->GetNativeYtType();
534535
});
535536
if (canMerge) {
536-
outTable.RowSpec->CopySortness(ctx, *inputPaths.front()->Table->RowSpec, TYqlRowSpecInfo::ECopySort::WithDesc);
537-
outTable.RowSpec->ClearSortness(ctx, sorted->GetContent().size());
537+
if (sorted) {
538+
outTable.RowSpec->CopySortness(ctx, *inputPaths.front()->Table->RowSpec, TYqlRowSpecInfo::ECopySort::WithDesc);
539+
outTable.RowSpec->ClearSortness(ctx, sorted->GetContent().size());
540+
}
538541
outTable.SetUnique(assume.Ref().GetConstraint<TDistinctConstraintNode>(), assume.Pos(), ctx);
542+
539543
if (firstNativeType) {
540544
outTable.RowSpec->CopyTypeOrders(*firstNativeType);
541545
}
542546

543-
YQL_ENSURE(sorted->GetContent().size() == outTable.RowSpec->SortMembers.size());
547+
YQL_ENSURE(!sorted || sorted->GetContent().size() == outTable.RowSpec->SortMembers.size());
544548
const bool useExplicitColumns = AnyOf(inputPaths, [] (const TYtPathInfo::TPtr& path) {
545549
return !path->Table->IsTemp || (path->Table->RowSpec && path->Table->RowSpec->HasAuxColumns());
546550
});
@@ -570,14 +574,16 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::AssumeSorted(TExprBase
570574
.Done();
571575
}
572576
else {
573-
builder.FillRowSpecSort(*outTable.RowSpec);
577+
if (builder) {
578+
builder->FillRowSpecSort(*outTable.RowSpec);
579+
}
574580
outTable.SetUnique(assume.Ref().GetConstraint<TDistinctConstraintNode>(), assume.Pos(), ctx);
575581

576-
TCoLambda mapper = builder.NeedMap()
582+
TCoLambda mapper = builder && builder->NeedMap()
577583
? Build<TCoLambda>(ctx, assume.Pos())
578584
.Args({"stream"})
579585
.Body<TExprApplier>()
580-
.Apply(TCoLambda(builder.MakeRemapLambda(true)))
586+
.Apply(TCoLambda(builder->MakeRemapLambda(true)))
581587
.With(0, "stream")
582588
.Build()
583589
.Done()
@@ -587,17 +593,19 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::AssumeSorted(TExprBase
587593
.Done();
588594

589595
auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, assume.Pos());
590-
settingsBuilder
591-
.Add()
592-
.Name()
593-
.Value(ToString(EYtSettingType::KeepSorted))
594-
.Build()
595-
.Build()
596-
.Add()
597-
.Name()
598-
.Value(ToString(EYtSettingType::Ordered))
596+
if (sorted) {
597+
settingsBuilder
598+
.Add()
599+
.Name()
600+
.Value(ToString(EYtSettingType::KeepSorted))
601+
.Build()
599602
.Build()
600-
.Build();
603+
.Add()
604+
.Name()
605+
.Value(ToString(EYtSettingType::Ordered))
606+
.Build()
607+
.Build();
608+
}
601609
if (State_->Configuration->UseFlow.Get().GetOrElse(DEFAULT_USE_FLOW)) {
602610
settingsBuilder
603611
.Add()
@@ -625,21 +633,43 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::AssumeSorted(TExprBase
625633

626634
auto op = GetOutputOp(input.Cast<TYtOutput>());
627635
TExprNode::TPtr newOp = op.Ptr();
628-
if (!op.Maybe<TYtSort>()) {
636+
637+
if (builder && builder->NeedMap() && maybeOp.Maybe<TYtDqProcessWrite>()) {
638+
TNodeOnNodeOwnedMap remaps;
639+
VisitExpr(maybeOp.Cast<TYtDqProcessWrite>().Input().Ptr(), [&builder, &remaps, &ctx](const TExprNode::TPtr& n) {
640+
if (TYtOutput::Match(n.Get())) {
641+
// Stop traversing dependent operations
642+
return false;
643+
}
644+
if (TYtDqWrite::Match(n.Get())) {
645+
auto newInput = Build<TExprApplier>(ctx, n->Pos())
646+
.Apply(TCoLambda(builder->MakeRemapLambda(true)))
647+
.With(0, TExprBase(n->ChildPtr(TYtDqWrite::idx_Input)))
648+
.Done();
649+
remaps[n.Get()] = ctx.ChangeChild(*n, TYtDqWrite::idx_Input, newInput.Ptr());
650+
}
651+
return true;
652+
});
653+
newOp = ctx.ChangeChild(*newOp, TYtDqProcessWrite::idx_Input, ctx.ReplaceNodes(newOp->ChildPtr(TYtDqProcessWrite::idx_Input), remaps));
654+
}
655+
656+
if (!op.Maybe<TYtSort>() && sorted) {
629657
if (auto settings = op.Maybe<TYtTransientOpBase>().Settings()) {
630658
if (!NYql::HasSetting(settings.Ref(), EYtSettingType::KeepSorted)) {
631-
newOp = ctx.ChangeChild(op.Ref(), TYtTransientOpBase::idx_Settings, NYql::AddSetting(settings.Ref(), EYtSettingType::KeepSorted, {}, ctx));
659+
newOp = ctx.ChangeChild(*newOp, TYtTransientOpBase::idx_Settings, NYql::AddSetting(settings.Ref(), EYtSettingType::KeepSorted, {}, ctx));
632660
}
633661
} else if (auto settings = op.Maybe<TYtFill>().Settings()) {
634662
if (!NYql::HasSetting(settings.Ref(), EYtSettingType::KeepSorted)) {
635-
newOp = ctx.ChangeChild(op.Ref(), TYtFill::idx_Settings, NYql::AddSetting(settings.Ref(), EYtSettingType::KeepSorted, {}, ctx));
663+
newOp = ctx.ChangeChild(*newOp, TYtFill::idx_Settings, NYql::AddSetting(settings.Ref(), EYtSettingType::KeepSorted, {}, ctx));
636664
}
637665
}
638666
}
639667
if (!equalSort) {
640668
const size_t index = FromString(input.Cast<TYtOutput>().OutIndex().Value());
641669
TYtOutTableInfo outTable(op.Output().Item(index));
642-
builder.FillRowSpecSort(*outTable.RowSpec);
670+
if (builder) {
671+
builder->FillRowSpecSort(*outTable.RowSpec);
672+
}
643673
outTable.RowSpec->SetConstraints(assume.Ref().GetConstraintSet());
644674
outTable.SetUnique(assume.Ref().GetConstraint<TDistinctConstraintNode>(), assume.Pos(), ctx);
645675

@@ -658,6 +688,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::AssumeSorted(TExprBase
658688
return Build<TYtOutput>(ctx, assume.Pos())
659689
.Operation(newOp)
660690
.OutIndex(input.Cast<TYtOutput>().OutIndex())
691+
.Mode(sorted ? TMaybeNode<TCoAtom>() : input.Cast<TYtOutput>().Mode())
661692
.Done();
662693
}
663694

0 commit comments

Comments
 (0)