Skip to content

Commit af476a9

Browse files
authored
Improve column groups calculation (#9269)
1 parent c52a789 commit af476a9

File tree

17 files changed

+362
-201
lines changed

17 files changed

+362
-201
lines changed

ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3224,7 +3224,7 @@ class TYtNativeGateway : public IYtGateway {
32243224

32253225
TFuture<void> DoMerge(TYtMerge merge, const TExecContext<TRunOptions>::TPtr& execCtx) {
32263226
YQL_ENSURE(execCtx->OutTables_.size() == 1);
3227-
bool forceTransform = NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::ForceTransform);
3227+
bool forceTransform = NYql::HasAnySetting(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::TransformColGroups);
32283228
bool combineChunks = NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::CombineChunks);
32293229
TMaybe<ui64> limit = GetLimit(merge.Settings().Ref());
32303230

ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ TYtPhysicalOptProposalTransformer::TYtPhysicalOptProposalTransformer(TYtState::T
7676
AddHandler(2, &TYtMap::Match, HNDL(MapToMerge));
7777
AddHandler(2, &TYtPublish::Match, HNDL(UnorderedPublishTarget));
7878
AddHandler(2, &TYtMap::Match, HNDL(PushDownYtMapOverSortedMerge));
79-
AddHandler(2, &TYtMerge::Match, HNDL(MergeToCopy));
8079
AddHandler(2, &TYtMerge::Match, HNDL(ForceTransform));
80+
AddHandler(2, &TYtMerge::Match, HNDL(MergeToCopy));
8181
#undef HNDL
8282
}
8383

ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_merge.cpp

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::MergeToCopy(TExprBase n
405405
return node;
406406
}
407407

408-
if (NYql::HasAnySetting(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::CombineChunks)) {
408+
if (NYql::HasAnySetting(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::TransformColGroups | EYtSettingType::CombineChunks)) {
409409
return node;
410410
}
411411

@@ -475,26 +475,47 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::ForceTransform(TExprBas
475475
return node;
476476
}
477477

478-
if (NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::ForceTransform)) {
479-
return node;
478+
if (!NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::ForceTransform)
479+
&& NYql::HasSetting(merge.Input().Item(0).Settings().Ref(), EYtSettingType::Sample)) {
480+
return TExprBase(ctx.ChangeChild(merge.Ref(), TYtMerge::idx_Settings, NYql::AddSetting(merge.Settings().Ref(), EYtSettingType::ForceTransform, {}, ctx)));
480481
}
481482

482483
const auto cluster = merge.DataSink().Cluster().StringValue();
483-
const bool hasOutGroup = NYql::HasSetting(merge.Output().Item(0).Settings().Ref(), EYtSettingType::ColumnGroups);
484-
const bool lookup = State_->Configuration->OptimizeFor.Get(cluster).GetOrElse(NYT::OF_LOOKUP_ATTR) == NYT::OF_LOOKUP_ATTR;
485-
const bool enabledColGroup = State_->Configuration->ColumnGroupMode.Get().GetOrElse(EColumnGroupMode::Disable) != EColumnGroupMode::Disable;
486-
const bool hasNonTmpInput = AnyOf(merge.Input().Item(0).Paths(), [](const TYtPath& path) {
487-
return path.Table().Maybe<TYtTable>() && !NYql::HasSetting(path.Table().Cast<TYtTable>().Settings().Ref(), EYtSettingType::Anonymous);
488-
});
489-
const bool hasSampling = NYql::HasSetting(merge.Input().Item(0).Settings().Ref(), EYtSettingType::Sample);
490-
491-
const bool addForceTransform = hasSampling
492-
|| (!lookup && enabledColGroup != hasOutGroup)
493-
|| (!lookup && hasOutGroup && hasNonTmpInput)
494-
;
495-
496-
if (addForceTransform) {
497-
return TExprBase(ctx.ChangeChild(merge.Ref(), TYtMerge::idx_Settings, NYql::AddSetting(merge.Settings().Ref(), EYtSettingType::ForceTransform, {}, ctx)));
484+
if (State_->Configuration->OptimizeFor.Get(cluster).GetOrElse(NYT::OF_LOOKUP_ATTR) == NYT::OF_LOOKUP_ATTR) {
485+
return node;
486+
}
487+
488+
TString outGroup;
489+
if (auto setting = NYql::GetSetting(merge.Output().Item(0).Settings().Ref(), EYtSettingType::ColumnGroups)) {
490+
outGroup = setting->Tail().Content();
491+
}
492+
493+
std::vector<TString> inputColGroupSpecs;
494+
for (const auto& path: merge.Input().Item(0).Paths()) {
495+
inputColGroupSpecs.emplace_back();
496+
if (auto table = path.Table().Maybe<TYtTable>()) {
497+
if (auto tableDesc = State_->TablesData->FindTable(cluster, TString{TYtTableInfo::GetTableLabel(table.Cast())}, TEpochInfo::Parse(table.Cast().Epoch().Ref()))) {
498+
inputColGroupSpecs.back() = tableDesc->ColumnGroupSpec;
499+
}
500+
} else if (auto out = path.Table().Maybe<TYtOutput>()) {
501+
if (auto setting = NYql::GetSetting(GetOutputOp(out.Cast()).Output().Item(FromString<ui32>(out.Cast().OutIndex().Value())).Settings().Ref(), EYtSettingType::ColumnGroups)) {
502+
inputColGroupSpecs.back() = setting->Tail().Content();
503+
}
504+
}
505+
}
506+
507+
bool needTransformColGroups = false;
508+
if (!outGroup.empty() && AnyOf(inputColGroupSpecs, [&outGroup](const auto& g) { return outGroup != g; })) {
509+
needTransformColGroups = true;
510+
}
511+
if (outGroup.empty() && AnyOf(inputColGroupSpecs, [](const auto& g) { return !g.empty(); })) {
512+
needTransformColGroups = true;
513+
}
514+
515+
if (needTransformColGroups && !NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::TransformColGroups)) {
516+
return TExprBase(ctx.ChangeChild(merge.Ref(), TYtMerge::idx_Settings, NYql::AddSetting(merge.Settings().Ref(), EYtSettingType::TransformColGroups, {}, ctx)));
517+
} else if (!needTransformColGroups && NYql::HasSetting(merge.Settings().Ref(), EYtSettingType::TransformColGroups)) {
518+
return TExprBase(ctx.ChangeChild(merge.Ref(), TYtMerge::idx_Settings, NYql::RemoveSetting(merge.Settings().Ref(), EYtSettingType::TransformColGroups, ctx)));
498519
}
499520
return node;
500521
}

ydb/library/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -933,6 +933,29 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
933933
return TStatus::Error;
934934
}
935935

936+
TStringBuf outGroup;
937+
if (auto setting = NYql::GetSetting(copy.Output().Item(0).Settings().Ref(), EYtSettingType::ColumnGroups)) {
938+
outGroup = setting->Tail().Content();
939+
}
940+
941+
TStringBuf inputColGroupSpec;
942+
const auto& path = copy.Input().Item(0).Paths().Item(0);
943+
if (auto table = path.Table().Maybe<TYtTable>()) {
944+
if (auto tableDesc = State_->TablesData->FindTable(copy.DataSink().Cluster().StringValue(), TString{TYtTableInfo::GetTableLabel(table.Cast())}, TEpochInfo::Parse(table.Cast().Epoch().Ref()))) {
945+
inputColGroupSpec = tableDesc->ColumnGroupSpec;
946+
}
947+
} else if (auto out = path.Table().Maybe<TYtOutput>()) {
948+
if (auto setting = NYql::GetSetting(GetOutputOp(out.Cast()).Output().Item(FromString<ui32>(out.Cast().OutIndex().Value())).Settings().Ref(), EYtSettingType::ColumnGroups)) {
949+
inputColGroupSpec = setting->Tail().Content();
950+
}
951+
}
952+
953+
if (outGroup != inputColGroupSpec) {
954+
ctx.AddError(TIssue(ctx.GetPosition(copy.Output().Item(0).Settings().Pos()), TStringBuilder() << TYtCopy::CallableName()
955+
<< "has input/output tables with different " << EYtSettingType::ColumnGroups << " values"));
956+
return TStatus::Error;
957+
}
958+
936959
input->SetTypeAnn(MakeOutputOperationType(copy, ctx));
937960
return TStatus::Ok;
938961
}
@@ -950,7 +973,7 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
950973

951974
auto merge = TYtMerge(input);
952975

953-
if (!ValidateSettings(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::CombineChunks | EYtSettingType::Limit | EYtSettingType::KeepSorted | EYtSettingType::NoDq, ctx)) {
976+
if (!ValidateSettings(merge.Settings().Ref(), EYtSettingType::ForceTransform | EYtSettingType::TransformColGroups | EYtSettingType::CombineChunks | EYtSettingType::Limit | EYtSettingType::KeepSorted | EYtSettingType::NoDq, ctx)) {
954977
return TStatus::Error;
955978
}
956979

ydb/library/yql/providers/yt/provider/yql_yt_op_settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,7 @@ bool ValidateSettings(const TExprNode& settingsNode, EYtSettingTypes accepted, T
422422
case EYtSettingType::IgnoreNonExisting:
423423
case EYtSettingType::WarnNonExisting:
424424
case EYtSettingType::ForceTransform:
425+
case EYtSettingType::TransformColGroups:
425426
case EYtSettingType::CombineChunks:
426427
case EYtSettingType::WithQB:
427428
case EYtSettingType::Inline:

ydb/library/yql/providers/yt/provider/yql_yt_op_settings.h

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,11 @@ enum class EYtSettingType: ui64 {
7070
StatColumns /* "statcolumns" */,
7171
SysColumns /* "syscolumns" */,
7272
IgnoreTypeV3 /* "ignoretypev3" "ignore_type_v3" */,
73-
// Table content
73+
// Table content
7474
MemUsage /* "memUsage" */,
7575
ItemsCount /* "itemsCount" */,
7676
RowFactor /* "rowFactor" */,
77-
// Operations
77+
// Operations
7878
Ordered /* "ordered" */, // hybrid supported
7979
KeyFilter /* "keyFilter" */,
8080
KeyFilter2 /* "keyFilter2" */,
@@ -86,6 +86,7 @@ enum class EYtSettingType: ui64 {
8686
ReduceBy /* "reduceBy" */, // hybrid supported
8787
ReduceFilterBy /* "reduceFilterBy" */,
8888
ForceTransform /* "forceTransform" */, // hybrid supported
89+
TransformColGroups /* "transformColGroups" */, // hybrid supported
8990
WeakFields /* "weakFields" */,
9091
Sharded /* "sharded" */,
9192
CombineChunks /* "combineChunks" */,
@@ -95,16 +96,16 @@ enum class EYtSettingType: ui64 {
9596
Flow /* "flow" */, // hybrid supported
9697
KeepSorted /* "keepSorted" */, // hybrid supported
9798
KeySwitch /* "keySwitch" */, // hybrid supported
98-
// Out tables
99+
// Out tables
99100
UniqueBy /* "uniqueBy" */,
100101
OpHash /* "opHash" */,
101-
// Operations
102+
// Operations
102103
MapOutputType /* "mapOutputType" */, // hybrid supported
103104
ReduceInputType /* "reduceInputType" */, // hybrid supported
104105
NoDq /* "noDq" */,
105-
// Read
106+
// Read
106107
Split /* "split" */,
107-
// Write hints
108+
// Write hints
108109
CompressionCodec /* "compression_codec" "compressioncodec"*/,
109110
ErasureCodec /* "erasure_codec" "erasurecodec" */,
110111
Expiration /* "expiration" */,
@@ -133,7 +134,7 @@ using TBase = std::bitset<YtSettingTypesCount>;
133134
using ::NYql::EYtSettingTypes::bitset::bitset;
134135

135136
EYtSettingTypes(EYtSettingType type)
136-
: TBase(std::bitset<YtSettingTypesCount>(1) << static_cast<ui64>(type))
137+
: TBase(std::bitset<YtSettingTypesCount>(1) << static_cast<ui64>(type))
137138
{}
138139

139140
EYtSettingTypes& operator|=(const EYtSettingTypes& other) {
@@ -165,7 +166,7 @@ const auto DqReadSupportedSettings = EYtSettingType::SysColumns | EYtSettingType
165166
const auto DqOpSupportedSettings = EYtSettingType::Ordered | EYtSettingType::Limit | EYtSettingType::SortLimitBy | EYtSettingType::SortBy |
166167
EYtSettingType::ReduceBy | EYtSettingType::ForceTransform | EYtSettingType::JobCount | EYtSettingType::JoinReduce |
167168
EYtSettingType::FirstAsPrimary | EYtSettingType::Flow | EYtSettingType::KeepSorted | EYtSettingType::KeySwitch |
168-
EYtSettingType::ReduceInputType | EYtSettingType::MapOutputType | EYtSettingType::Sharded;
169+
EYtSettingType::ReduceInputType | EYtSettingType::MapOutputType | EYtSettingType::Sharded | EYtSettingType::TransformColGroups;
169170

170171
///////////////////////////////////////////////////////////////////////////////////////////////
171172

0 commit comments

Comments
 (0)