Skip to content

Commit 8950220

Browse files
authored
[yt provider] Check table content weight for local execution (#11207)
1 parent 05a2731 commit 8950220

16 files changed

+60
-27
lines changed

ydb/library/yql/providers/yt/common/yql_configuration.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ constexpr ui32 DEFAULT_MAX_INPUT_TABLES = 3000;
8787
constexpr ui32 DEFAULT_MAX_OUTPUT_TABLES = 100;
8888
constexpr ui64 DEFAULT_APPLY_STORED_CONSTRAINTS = 0ULL;
8989

90-
constexpr bool DEFAULT_TABLE_CONTENT_LOCAL_EXEC = false;
90+
constexpr ui64 DEFAULT_TABLE_CONTENT_LOCAL_EXEC = 0;
9191

9292
constexpr ui32 DEFAULT_BATCH_LIST_FOLDER_CONCURRENCY = 5;
9393

ydb/library/yql/providers/yt/common/yql_yt_settings.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
#include <ydb/library/yql/public/udf/udf_data_type.h>
66

77
#include <library/cpp/yson/node/node_io.h>
8-
98
#include <library/cpp/regex/pcre/regexp.h>
9+
#include <library/cpp/string_utils/parse_size/parse_size.h>
1010

1111
#include <util/generic/yexception.h>
1212
#include <util/generic/size_literals.h>
@@ -252,7 +252,17 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx)
252252
REGISTER_SETTING(*this, TableContentTmpFolder);
253253
REGISTER_SETTING(*this, TableContentColumnarStatistics);
254254
REGISTER_SETTING(*this, TableContentUseSkiff);
255-
REGISTER_SETTING(*this, TableContentLocalExecution);
255+
REGISTER_SETTING(*this, TableContentLocalExecution)
256+
.Parser([](const TString& v) {
257+
// backward compatible parse from bool
258+
bool value = true;
259+
if (!v || TryFromString<bool>(v, value)) {
260+
return value ? 10_MB : 0_MB;
261+
} else {
262+
return NSize::ParseSize(v);
263+
}
264+
})
265+
.Upper(5_GB);
256266
REGISTER_SETTING(*this, DisableJobSplitting);
257267
REGISTER_SETTING(*this, UseColumnarStatistics)
258268
.Parser([](const TString& v) {

ydb/library/yql/providers/yt/common/yql_yt_settings.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ struct TYtSettings {
250250
NCommon::TConfSetting<NSize::TSize, false> TableContentMinAvgChunkSize;
251251
NCommon::TConfSetting<ui32, false> TableContentMaxInputTables;
252252
NCommon::TConfSetting<ui32, false> TableContentMaxChunksForNativeDelivery;
253-
NCommon::TConfSetting<bool, false> TableContentLocalExecution;
253+
NCommon::TConfSetting<NSize::TSize, false> TableContentLocalExecution;
254254
NCommon::TConfSetting<bool, false> UseTypeV2;
255255
NCommon::TConfSetting<bool, false> UseNativeYtTypes;
256256
NCommon::TConfSetting<bool, false> UseNativeDescSort;

ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,13 @@ TGatewayTransformer::TGatewayTransformer(const TExecContextBase& execCtx, TYtSet
7171
}
7272
}
7373

74-
TCallableVisitFunc TGatewayTransformer::operator()(TInternName name) {
74+
TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) {
75+
auto name = internName.Str();
76+
const bool small = name.SkipPrefix("Small");
7577
if (name == TYtTableContent::CallableName()) {
7678

7779
*TableContentFlag_ = true;
78-
*RemoteExecutionFlag_ = *RemoteExecutionFlag_ || !Settings_->TableContentLocalExecution.Get().GetOrElse(DEFAULT_TABLE_CONTENT_LOCAL_EXEC);
80+
*RemoteExecutionFlag_ = *RemoteExecutionFlag_ || !small;
7981

8082
if (EPhase::Content == Phase_ || EPhase::All == Phase_) {
8183
return [&](NMiniKQL::TCallable& callable, const TTypeEnvironment& env) {

ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_fuse.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ TMaybe<bool> TYtPhysicalOptProposalTransformer::CanFuseLambdas(const TCoLambda&
2323

2424
TExprNode::TPtr updatedBody = innerLambda.Body().Ptr();
2525
if (maxJobMemoryLimit) {
26-
auto status = UpdateTableContentMemoryUsage(innerLambda.Body().Ptr(), updatedBody, State_, ctx);
26+
auto status = UpdateTableContentMemoryUsage(innerLambda.Body().Ptr(), updatedBody, State_, ctx, false);
2727
if (status.Level != TStatus::Ok) {
2828
return {};
2929
}
@@ -36,7 +36,7 @@ TMaybe<bool> TYtPhysicalOptProposalTransformer::CanFuseLambdas(const TCoLambda&
3636

3737
updatedBody = outerLambda.Body().Ptr();
3838
if (maxJobMemoryLimit) {
39-
auto status = UpdateTableContentMemoryUsage(outerLambda.Body().Ptr(), updatedBody, State_, ctx);
39+
auto status = UpdateTableContentMemoryUsage(outerLambda.Body().Ptr(), updatedBody, State_, ctx, false);
4040
if (status.Level != TStatus::Ok) {
4141
return {};
4242
}

ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
227227
}
228228

229229
bool hasNonDeterministicFunctions = false;
230-
if (const auto status = PeepHoleOptimizeBeforeExec(optimizedNode, optimizedNode, State_, hasNonDeterministicFunctions, ctx); status.Level != TStatus::Ok) {
230+
if (const auto status = PeepHoleOptimizeBeforeExec(optimizedNode, optimizedNode, State_, hasNonDeterministicFunctions, ctx, false); status.Level != TStatus::Ok) {
231231
return SyncStatus(status);
232232
}
233233

@@ -652,7 +652,7 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
652652
}
653653

654654
bool hasNonDeterministicFunctions = false;
655-
if (const auto status = PeepHoleOptimizeBeforeExec(optimizedNode, optimizedNode, State_, hasNonDeterministicFunctions, ctx); status.Level != TStatus::Ok) {
655+
if (const auto status = PeepHoleOptimizeBeforeExec(optimizedNode, optimizedNode, State_, hasNonDeterministicFunctions, ctx, false); status.Level != TStatus::Ok) {
656656
return SyncStatus(status);
657657
}
658658

ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ class TYtDataSourceExecTransformer : public TExecTransformerBase {
135135
}
136136

137137
bool hasNonDeterministicFunctions = false;
138-
if (const auto status = PeepHoleOptimizeBeforeExec(optimizedInput, optimizedInput, State_, hasNonDeterministicFunctions, ctx); status.Level != IGraphTransformer::TStatus::Ok) {
138+
if (const auto status = PeepHoleOptimizeBeforeExec(optimizedInput, optimizedInput, State_, hasNonDeterministicFunctions, ctx, true); status.Level != IGraphTransformer::TStatus::Ok) {
139139
return SyncStatus(status);
140140
}
141141

ydb/library/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -846,7 +846,8 @@ class TYtDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
846846
return TStatus::Error;
847847
}
848848

849-
if (!ValidateSettings(tableContent.Settings().Ref(), EYtSettingType::MemUsage | EYtSettingType::ItemsCount | EYtSettingType::RowFactor | EYtSettingType::Split, ctx)) {
849+
const EYtSettingTypes allowed = EYtSettingType::MemUsage | EYtSettingType::ItemsCount | EYtSettingType::RowFactor | EYtSettingType::Split | EYtSettingType::Small;
850+
if (!ValidateSettings(tableContent.Settings().Ref(), allowed, ctx)) {
850851
return TStatus::Error;
851852
}
852853

ydb/library/yql/providers/yt/provider/yql_yt_helpers.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "yql_yt_provider_impl.h"
33
#include "yql_yt_op_settings.h"
44
#include "yql_yt_op_hash.h"
5+
#include "yql_yt_optimize.h"
56

67
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
78
#include <ydb/library/yql/providers/yt/lib/mkql_helpers/mkql_helpers.h>
@@ -712,6 +713,11 @@ std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture> CalculateNo
712713
auto typeTransformer = CreateTypeAnnotationTransformer(callableTransformer, *state->Types);
713714

714715
TExprNode::TPtr optimized;
716+
status = UpdateTableContentMemoryUsage(list, optimized, state, ctx, true);
717+
if (status.Level == IGraphTransformer::TStatus::Error) {
718+
return SyncStatus(status);
719+
}
720+
715721
bool hasNonDeterministicFunctions = false;
716722
status = PeepHoleOptimizeNode(list, optimized, ctx, *state->Types, typeTransformer.Get(), hasNonDeterministicFunctions);
717723
if (status.Level == IGraphTransformer::TStatus::Error) {

ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ TExprNode::TPtr THorizontalJoinOptimizer::HandleList(const TExprNode::TPtr& node
665665

666666
TExprNode::TPtr updatedLambda = map.Mapper().Ptr();
667667
if (MaxJobMemoryLimit) {
668-
auto status = UpdateTableContentMemoryUsage(map.Mapper().Ptr(), updatedLambda, State_, ctx);
668+
auto status = UpdateTableContentMemoryUsage(map.Mapper().Ptr(), updatedLambda, State_, ctx, false);
669669
if (status.Level != IGraphTransformer::TStatus::Ok) {
670670
return {};
671671
}
@@ -1346,7 +1346,7 @@ bool TMultiHorizontalJoinOptimizer::HandleGroup(const TVector<TYtMap>& maps, TEx
13461346

13471347
TExprNode::TPtr updatedLambda = map.Mapper().Ptr();
13481348
if (MaxJobMemoryLimit) {
1349-
if (UpdateTableContentMemoryUsage(map.Mapper().Ptr(), updatedLambda, State_, ctx).Level != IGraphTransformer::TStatus::Ok) {
1349+
if (UpdateTableContentMemoryUsage(map.Mapper().Ptr(), updatedLambda, State_, ctx, false).Level != IGraphTransformer::TStatus::Ok) {
13501350
return false;
13511351
}
13521352
}
@@ -1734,7 +1734,7 @@ bool TOutHorizontalJoinOptimizer::HandleGroup(TPositionHandle pos, const TGroupK
17341734

17351735
TExprNode::TPtr updatedLambda = map.Mapper().Ptr();
17361736
if (MaxJobMemoryLimit) {
1737-
if (UpdateTableContentMemoryUsage(map.Mapper().Ptr(), updatedLambda, State_, ctx).Level != IGraphTransformer::TStatus::Ok) {
1737+
if (UpdateTableContentMemoryUsage(map.Mapper().Ptr(), updatedLambda, State_, ctx, false).Level != IGraphTransformer::TStatus::Ok) {
17381738
return false;
17391739
}
17401740
}

0 commit comments

Comments
 (0)