Skip to content

Commit 6a94d72

Browse files
authored
[yt provider] Move UnorderedPublishTarget to finalizing (#9402)
1 parent 0ad5074 commit 6a94d72

File tree

9 files changed

+235
-135
lines changed

9 files changed

+235
-135
lines changed

ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ TYtPhysicalOptProposalTransformer::TYtPhysicalOptProposalTransformer(TYtState::T
7474
AddHandler(2, &TYtEquiJoin::Match, HNDL(RuntimeEquiJoin));
7575
AddHandler(2, &TStatWriteTable::Match, HNDL(ReplaceStatWriteTable));
7676
AddHandler(2, &TYtMap::Match, HNDL(MapToMerge));
77-
AddHandler(2, &TYtPublish::Match, HNDL(UnorderedPublishTarget));
7877
AddHandler(2, &TYtMap::Match, HNDL(PushDownYtMapOverSortedMerge));
7978
AddHandler(2, &TYtMerge::Match, HNDL(ForceTransform));
8079
AddHandler(2, &TYtMerge::Match, HNDL(MergeToCopy));

ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,6 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
122122

123123
NNodes::TMaybeNode<NNodes::TExprBase> MapToMerge(NNodes::TExprBase node, TExprContext& ctx) const;
124124

125-
NNodes::TMaybeNode<NNodes::TExprBase> UnorderedPublishTarget(NNodes::TExprBase node, TExprContext& ctx) const;
126-
127125
NNodes::TMaybeNode<NNodes::TExprBase> AddTrivialMapperForNativeYtTypes(NNodes::TExprBase node, TExprContext& ctx) const;
128126

129127
NNodes::TMaybeNode<NNodes::TExprBase> YtDqWrite(NNodes::TExprBase node, TExprContext& ctx) const;

ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_write.cpp

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -773,41 +773,4 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::Fill(TExprBase node, TE
773773
.Done();
774774
}
775775

776-
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::UnorderedPublishTarget(TExprBase node, TExprContext& ctx) const {
777-
auto publish = node.Cast<TYtPublish>();
778-
779-
auto cluster = TString{publish.DataSink().Cluster().Value()};
780-
auto pubTableInfo = TYtTableInfo(publish.Publish());
781-
if (auto commitEpoch = pubTableInfo.CommitEpoch.GetOrElse(0)) {
782-
const TYtTableDescription& nextDescription = State_->TablesData->GetTable(cluster, pubTableInfo.Name, commitEpoch);
783-
YQL_ENSURE(nextDescription.RowSpec);
784-
if (!nextDescription.RowSpec->IsSorted()) {
785-
bool modified = false;
786-
TVector<TYtOutput> outs;
787-
for (auto out: publish.Input()) {
788-
if (!IsUnorderedOutput(out) && TYqlRowSpecInfo(GetOutTable(out).Cast<TYtOutTable>().RowSpec()).IsSorted()) {
789-
outs.push_back(Build<TYtOutput>(ctx, out.Pos())
790-
.InitFrom(out)
791-
.Mode()
792-
.Value(ToString(EYtSettingType::Unordered))
793-
.Build()
794-
.Done());
795-
modified = true;
796-
} else {
797-
outs.push_back(out);
798-
}
799-
}
800-
if (modified) {
801-
return Build<TYtPublish>(ctx, publish.Pos())
802-
.InitFrom(publish)
803-
.Input()
804-
.Add(outs)
805-
.Build()
806-
.Done();
807-
}
808-
}
809-
}
810-
return node;
811-
}
812-
813776
} // namespace NYql

ydb/library/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
606606
if (contentRowSpecs) {
607607
size_t from = 0;
608608
if (initialWrite) {
609-
++nextDescription.WriteValidateCount;
609+
nextDescription.RowSpecSortReady = true;
610610
if (nextDescription.IsReplaced) {
611611
nextDescription.RowSpec->CopySortness(ctx, *contentRowSpecs.front(), TYqlRowSpecInfo::ECopySort::Exact);
612612
if (auto contentNativeType = contentRowSpecs.front()->GetNativeYtType()) {
@@ -630,7 +630,7 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
630630
<< " setting cannot be used with a unsorted table"));
631631
return TStatus::Error;
632632
}
633-
} else {
633+
} else if (nextDescription.RowSpecSortReady) {
634634
if (!nextDescription.MonotonicKeys) {
635635
nextDescription.MonotonicKeys = monotonicKeys;
636636
} else if (*nextDescription.MonotonicKeys != monotonicKeys) {
@@ -642,43 +642,50 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
642642
}
643643
}
644644

645-
const bool uniqueKeys = nextDescription.RowSpec->UniqueKeys;
646-
for (size_t s = from; s < contentRowSpecs.size(); ++s) {
647-
const bool hasSortChanges = nextDescription.RowSpec->MakeCommonSortness(ctx, *contentRowSpecs[s]);
648-
const bool breaksSorting = hasSortChanges || !nextDescription.RowSpec->CompareSortness(*contentRowSpecs[s], false);
649-
if (monotonicKeys) {
650-
if (breaksSorting) {
651-
ctx.AddError(TIssue(pos, TStringBuilder()
652-
<< "Inserts with "
653-
<< ToString(EYtSettingType::MonotonicKeys).Quote()
654-
<< " setting must not change output table sorting"));
655-
return TStatus::Error;
645+
if (nextDescription.RowSpecSortReady) {
646+
const bool uniqueKeys = nextDescription.RowSpec->UniqueKeys;
647+
for (size_t s = from; s < contentRowSpecs.size(); ++s) {
648+
const bool hasSortChanges = nextDescription.RowSpec->MakeCommonSortness(ctx, *contentRowSpecs[s]);
649+
const bool breaksSorting = hasSortChanges || !nextDescription.RowSpec->CompareSortness(*contentRowSpecs[s], false);
650+
if (monotonicKeys) {
651+
if (breaksSorting) {
652+
ctx.AddError(TIssue(pos, TStringBuilder()
653+
<< "Inserts with "
654+
<< ToString(EYtSettingType::MonotonicKeys).Quote()
655+
<< " setting must not change output table sorting"));
656+
return TStatus::Error;
657+
}
658+
nextDescription.RowSpec->UniqueKeys = uniqueKeys;
656659
}
657-
nextDescription.RowSpec->UniqueKeys = uniqueKeys;
658-
}
659-
if (nextDescription.WriteValidateCount < 2) {
660-
TStringBuilder warning;
661-
if (breaksSorting) {
662-
warning << "Sort order of written data differs from the order of "
663-
<< outTableInfo.Name.Quote() << " table content. Result table content will be ";
664-
if (nextDescription.RowSpec->IsSorted()) {
665-
warning << "ordered by ";
666-
for (size_t i: xrange(nextDescription.RowSpec->SortMembers.size())) {
667-
if (i != 0) {
668-
warning << ',';
660+
ui32 mutationId = 0;
661+
if (auto setting = NYql::GetSetting(settings, EYtSettingType::MutationId)) {
662+
mutationId = FromString<ui32>(setting->Child(1)->Content());
663+
}
664+
665+
if (++nextDescription.WriteValidateCount[mutationId] < 2) {
666+
TStringBuilder warning;
667+
if (breaksSorting) {
668+
warning << "Sort order of written data differs from the order of "
669+
<< outTableInfo.Name.Quote() << " table content. Result table content will be ";
670+
if (nextDescription.RowSpec->IsSorted()) {
671+
warning << "ordered by ";
672+
for (size_t i: xrange(nextDescription.RowSpec->SortMembers.size())) {
673+
if (i != 0) {
674+
warning << ',';
675+
}
676+
warning << nextDescription.RowSpec->SortMembers[i] << '('
677+
<< (nextDescription.RowSpec->SortDirections[i] ? "asc" : "desc") << ")";
669678
}
670-
warning << nextDescription.RowSpec->SortMembers[i] << '('
671-
<< (nextDescription.RowSpec->SortDirections[i] ? "asc" : "desc") << ")";
679+
} else {
680+
warning << "unordered";
672681
}
673-
} else {
674-
warning << "unordered";
682+
} else if (uniqueKeys && !nextDescription.RowSpec->UniqueKeys) {
683+
warning << "Result table content will have non unique keys";
675684
}
676-
} else if (uniqueKeys && !nextDescription.RowSpec->UniqueKeys) {
677-
warning << "Result table content will have non unique keys";
678-
}
679685

680-
if (warning && !ctx.AddWarning(YqlIssue(pos, EYqlIssueCode::TIssuesIds_EIssueCode_YT_SORT_ORDER_CHANGE, warning))) {
681-
return TStatus::Error;
686+
if (warning && !ctx.AddWarning(YqlIssue(pos, EYqlIssueCode::TIssuesIds_EIssueCode_YT_SORT_ORDER_CHANGE, warning))) {
687+
return TStatus::Error;
688+
}
682689
}
683690
}
684691
}

0 commit comments

Comments
 (0)