Skip to content

Commit f0efe3a

Browse files
committed
Fix extra merge in YtPublish for column groups
commit_hash:57b9b05601823e224dc9d4fea919f61406148690
1 parent ddd1377 commit f0efe3a

File tree

2 files changed

+78
-41
lines changed

2 files changed

+78
-41
lines changed

yt/yql/providers/yt/gateway/native/yql_yt_native.cpp

Lines changed: 63 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,15 +1060,6 @@ class TYtNativeGateway : public IYtGateway {
10601060
}
10611061
const bool initial = NYql::HasSetting(publish.Settings().Ref(), EYtSettingType::Initial);
10621062

1063-
std::unordered_map<EYtSettingType, TString> strOpts;
1064-
for (const auto& setting : publish.Settings().Ref().Children()) {
1065-
if (setting->ChildrenSize() == 2) {
1066-
strOpts.emplace(FromString<EYtSettingType>(setting->Head().Content()), setting->Tail().Content());
1067-
} else if (setting->ChildrenSize() == 1) {
1068-
strOpts.emplace(FromString<EYtSettingType>(setting->Head().Content()), TString());;
1069-
}
1070-
}
1071-
10721063
YQL_CLOG(INFO, ProviderYt) << "Mode: " << mode << ", IsInitial: " << initial;
10731064

10741065
TSession::TPtr session = GetSession(options.SessionId());
@@ -1079,15 +1070,35 @@ class TYtNativeGateway : public IYtGateway {
10791070
TVector<TSrcTable> src;
10801071
ui64 chunksCount = 0;
10811072
ui64 dataSize = 0;
1082-
std::unordered_set<TString> columnGroups;
1073+
TSet<TString> srcColumnGroupAlts;
1074+
bool first = true;
1075+
const TStructExprType* itemType = nullptr;
10831076
for (auto out: publish.Input()) {
10841077
auto outTableWithCluster = GetOutTableWithCluster(out);
10851078
auto outTable = outTableWithCluster.first.Cast<TYtOutTable>();
10861079
src.emplace_back(outTable.Name().StringValue(), outTableWithCluster.second);
1087-
if (auto columnGroupSetting = NYql::GetSetting(outTable.Settings().Ref(), EYtSettingType::ColumnGroups)) {
1088-
columnGroups.emplace(columnGroupSetting->Tail().Content());
1089-
} else {
1090-
columnGroups.emplace();
1080+
if (first) {
1081+
itemType = GetSeqItemType(*outTable.Ref().GetTypeAnn()).Cast<TStructExprType>();
1082+
if (auto columnGroupSetting = NYql::GetSetting(outTable.Settings().Ref(), EYtSettingType::ColumnGroups)) {
1083+
srcColumnGroupAlts.emplace(columnGroupSetting->Tail().Content());
1084+
TString expanded;
1085+
if (ExpandDefaultColumnGroup(columnGroupSetting->Tail().Content(), *itemType, expanded)) {
1086+
srcColumnGroupAlts.insert(expanded);
1087+
}
1088+
}
1089+
first = false;
1090+
} else if (!srcColumnGroupAlts.empty()) {
1091+
if (auto columnGroupSetting = NYql::GetSetting(outTable.Settings().Ref(), EYtSettingType::ColumnGroups)) {
1092+
if (!srcColumnGroupAlts.contains(columnGroupSetting->Tail().Content())) {
1093+
TString expanded;
1094+
if (!ExpandDefaultColumnGroup(columnGroupSetting->Tail().Content(), *GetSeqItemType(*outTable.Ref().GetTypeAnn()).Cast<TStructExprType>(), expanded)
1095+
|| !srcColumnGroupAlts.contains(expanded)) {
1096+
srcColumnGroupAlts.clear();
1097+
}
1098+
}
1099+
} else {
1100+
srcColumnGroupAlts.clear();
1101+
}
10911102
}
10921103
auto stat = TYtTableStatInfo(outTable.Stat());
10931104
chunksCount += stat.ChunkCount;
@@ -1099,7 +1110,38 @@ class TYtNativeGateway : public IYtGateway {
10991110
if (src.size() > 10) {
11001111
YQL_CLOG(INFO, ProviderYt) << "...total input tables=" << src.size();
11011112
}
1102-
TString srcColumnGroups = columnGroups.size() == 1 ? *columnGroups.cbegin() : TString();
1113+
1114+
bool forceMerge = false;
1115+
bool forceTransform = false;
1116+
std::unordered_map<EYtSettingType, TString> strOpts;
1117+
for (const auto& setting : publish.Settings().Ref().Children()) {
1118+
const auto settingType = FromString<EYtSettingType>(setting->Head().Content());
1119+
if (setting->ChildrenSize() == 2) {
1120+
TString value = TString{setting->Tail().Content()};
1121+
if (EYtSettingType::ColumnGroups == settingType) {
1122+
bool groupDiff = false;
1123+
if (srcColumnGroupAlts.empty()) {
1124+
groupDiff = true;
1125+
} else {
1126+
if (!srcColumnGroupAlts.contains(value)) {
1127+
TString expanded;
1128+
YQL_ENSURE(itemType);
1129+
if (ExpandDefaultColumnGroup(value, *itemType, expanded)) {
1130+
value = std::move(expanded);
1131+
groupDiff = !srcColumnGroupAlts.contains(value);
1132+
}
1133+
}
1134+
}
1135+
if (groupDiff) {
1136+
forceMerge = forceTransform = true;
1137+
YQL_CLOG(INFO, ProviderYt) << "Column groups diff forces merge";
1138+
}
1139+
}
1140+
strOpts.emplace(settingType, value);
1141+
} else if (setting->ChildrenSize() == 1) {
1142+
strOpts.emplace(settingType, TString());
1143+
}
1144+
}
11031145

11041146
bool combineChunks = false;
11051147
if (auto minChunkSize = options.Config()->MinPublishedAvgChunkSize.Get()) {
@@ -1111,6 +1153,7 @@ class TYtNativeGateway : public IYtGateway {
11111153
YQL_CLOG(INFO, ProviderYt) << "Output: " << cluster << '.' << dst;
11121154
if (combineChunks) {
11131155
YQL_CLOG(INFO, ProviderYt) << "Use chunks combining";
1156+
forceMerge = true;
11141157
}
11151158
if (Services_.Config->GetLocalChainTest()) {
11161159
if (!src.empty()) {
@@ -1130,9 +1173,9 @@ class TYtNativeGateway : public IYtGateway {
11301173
const ui32 dstEpoch = TEpochInfo::Parse(publish.Publish().Epoch().Ref()).GetOrElse(0);
11311174
auto execCtx = MakeExecCtx(std::move(options), session, cluster, node.Get(), &ctx);
11321175

1133-
return session->Queue_->Async([execCtx, src = std::move(src), dst, dstEpoch, isAnonymous, mode, initial, srcColumnGroups, combineChunks, strOpts = std::move(strOpts)] () mutable {
1176+
return session->Queue_->Async([execCtx, src = std::move(src), dst, dstEpoch, isAnonymous, mode, initial, combineChunks, forceMerge, forceTransform, strOpts = std::move(strOpts)] () mutable {
11341177
YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
1135-
return ExecPublish(execCtx, std::move(src), dst, dstEpoch, isAnonymous, mode, initial, srcColumnGroups, combineChunks, strOpts);
1178+
return ExecPublish(execCtx, std::move(src), dst, dstEpoch, isAnonymous, mode, initial, combineChunks, forceMerge, forceTransform, strOpts);
11361179
})
11371180
.Apply([nodePos] (const TFuture<void>& f) {
11381181
try {
@@ -2414,8 +2457,9 @@ class TYtNativeGateway : public IYtGateway {
24142457
const bool isAnonymous,
24152458
EYtWriteMode mode,
24162459
const bool initial,
2417-
const TString& srcColumnGroups,
24182460
const bool combineChunks,
2461+
bool forceMerge,
2462+
bool forceTransform,
24192463
const std::unordered_map<EYtSettingType, TString>& strOpts)
24202464
{
24212465
TString tmpFolder = GetTablesTmpFolder(*execCtx->Options_.Config());
@@ -2490,8 +2534,6 @@ class TYtNativeGateway : public IYtGateway {
24902534
);
24912535
}
24922536

2493-
bool forceMerge = combineChunks;
2494-
24952537
NYT::MergeNodes(yqlAttrs, GetUserAttributes(execCtx->GetEntryForCluster(src.back().Cluster)->Tx, src.back().Name, true));
24962538
NYT::MergeNodes(yqlAttrs, YqlOpOptionsToAttrs(execCtx->Session_->OperationOptions_));
24972539
if (EYtWriteMode::RenewKeepMeta == mode) {
@@ -2582,8 +2624,6 @@ class TYtNativeGateway : public IYtGateway {
25822624
}
25832625
}
25842626

2585-
bool forceTransform = false;
2586-
25872627
#define DEFINE_OPT(name, attr, transform) \
25882628
auto dst##name = isAnonymous \
25892629
? execCtx->Options_.Config()->Temporary##name.Get(cluster) \
@@ -2612,10 +2652,6 @@ class TYtNativeGateway : public IYtGateway {
26122652
NYT::TNode columnGroupsSpec;
26132653
if (const auto it = strOpts.find(EYtSettingType::ColumnGroups); it != strOpts.cend() && execCtx->Options_.Config()->OptimizeFor.Get(cluster).GetOrElse(NYT::OF_LOOKUP_ATTR) != NYT::OF_LOOKUP_ATTR) {
26142654
columnGroupsSpec = NYT::NodeFromYsonString(it->second);
2615-
if (it->second != srcColumnGroups) {
2616-
forceMerge = forceTransform = true;
2617-
YQL_CLOG(INFO, ProviderYt) << "Column groups diff forces merge, src=" << srcColumnGroups << ", dst=" << it->second;
2618-
}
26192655
}
26202656

26212657
TFuture<void> res;
@@ -2656,7 +2692,7 @@ class TYtNativeGateway : public IYtGateway {
26562692
input = TRichYPath(std::get<0>(*p)).TransactionId(std::get<1>(*p)).OriginalPath(NYT::AddPathPrefix(dstPath, NYT::TConfig::Get()->Prefix)).Columns(columns);
26572693
}
26582694
} else {
2659-
input = TRichYPath(dstPath).Columns(columns);
2695+
input = TRichYPath(dstPath).Columns(columns);
26602696
}
26612697
mergeSpec.AddInput(input);
26622698
}

yt/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -441,19 +441,6 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
441441
}
442442
const bool initialWrite = NYql::HasSetting(settings, EYtSettingType::Initial);
443443
const bool monotonicKeys = NYql::HasSetting(settings, EYtSettingType::MonotonicKeys);
444-
TString columnGroup;
445-
TSet<TString> columnGroupAlts;
446-
if (auto setting = NYql::GetSetting(settings, EYtSettingType::ColumnGroups)) {
447-
if (!ValidateColumnGroups(*setting, *itemType->Cast<TStructExprType>(), ctx)) {
448-
return TStatus::Error;
449-
}
450-
columnGroup = setting->Tail().Content();
451-
columnGroupAlts.insert(columnGroup);
452-
TString exandedSpec;
453-
if (ExpandDefaultColumnGroup(setting->Tail().Content(), *itemType->Cast<TStructExprType>(), exandedSpec)) {
454-
columnGroupAlts.insert(std::move(exandedSpec));
455-
}
456-
}
457444

458445
if (!initialWrite && mode != EYtWriteMode::Append) {
459446
ctx.AddError(TIssue(pos, TStringBuilder() <<
@@ -587,8 +574,22 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
587574
<< GetTypeDiff(*description.RowType, *itemType)));
588575
return TStatus::Error;
589576
}
577+
}
590578

591-
if (!columnGroupAlts.empty() && !AnyOf(columnGroupAlts, [&](const auto& grp) { return description.ColumnGroupSpecAlts.contains(grp); })) {
579+
TString columnGroup;
580+
TSet<TString> columnGroupAlts;
581+
// Check and expand column groups _after_ type alignment (TryConvertTo)
582+
if (auto setting = NYql::GetSetting(settings, EYtSettingType::ColumnGroups)) {
583+
if (!ValidateColumnGroups(*setting, *itemType->Cast<TStructExprType>(), ctx)) {
584+
return TStatus::Error;
585+
}
586+
columnGroup = setting->Tail().Content();
587+
columnGroupAlts.insert(columnGroup);
588+
TString exandedSpec;
589+
if (ExpandDefaultColumnGroup(setting->Tail().Content(), *itemType->Cast<TStructExprType>(), exandedSpec)) {
590+
columnGroupAlts.insert(std::move(exandedSpec));
591+
}
592+
if (checkLayout && !AnyOf(columnGroupAlts, [&](const auto& grp) { return description.ColumnGroupSpecAlts.contains(grp); })) {
592593
ctx.AddError(TIssue(pos, TStringBuilder()
593594
<< "Insert with different "
594595
<< ToString(EYtSettingType::ColumnGroups).Quote()

0 commit comments

Comments
 (0)