Skip to content

Commit 54b47ab

Browse files
authored
Better scheduler for S3 Sources (#9620)
1 parent 7492f96 commit 54b47ab

File tree

9 files changed

+97
-3
lines changed

9 files changed

+97
-3
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1858,6 +1858,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
18581858
size_t sourceScanPartitionsCount = 0;
18591859
for (ui32 txIdx = 0; txIdx < Request.Transactions.size(); ++txIdx) {
18601860
auto& tx = Request.Transactions[txIdx];
1861+
auto scheduledTaskCount = ScheduleByCost(tx, ResourceSnapshot);
18611862
for (ui32 stageIdx = 0; stageIdx < tx.Body->StagesSize(); ++stageIdx) {
18621863
auto& stage = tx.Body->GetStages(stageIdx);
18631864
auto& stageInfo = TasksGraph.GetStageInfo(TStageId(txIdx, stageIdx));
@@ -1903,8 +1904,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
19031904
UnknownAffectedShardCount = true;
19041905
}
19051906
break;
1906-
case NKqpProto::TKqpSource::kExternalSource:
1907-
BuildReadTasksFromSource(stageInfo, ResourceSnapshot);
1907+
case NKqpProto::TKqpSource::kExternalSource: {
1908+
auto it = scheduledTaskCount.find(stageIdx);
1909+
BuildReadTasksFromSource(stageInfo, ResourceSnapshot, it != scheduledTaskCount.end() ? it->second.TaskCount : 0);
1910+
}
19081911
break;
19091912
default:
19101913
YQL_ENSURE(false, "unknown source type");

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ struct TShardRangesWithShardId {
8181
const TShardKeyRanges* Ranges;
8282
};
8383

84+
struct TStageScheduleInfo {
85+
double StageCost = 0.0;
86+
ui32 TaskCount = 0;
87+
};
8488

8589
TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken,
8690
const NKikimrKqp::TRlPath& path);
@@ -819,6 +823,40 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
819823
}
820824
}
821825

826+
std::map<ui32, TStageScheduleInfo> ScheduleByCost(const IKqpGateway::TPhysicalTxData& tx, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot) {
827+
std::map<ui32, TStageScheduleInfo> result;
828+
if (!resourceSnapshot.empty()) // can't schedule w/o node count
829+
{
830+
// collect costs and schedule stages with external sources only
831+
double totalCost = 0.0;
832+
for (ui32 stageIdx = 0; stageIdx < tx.Body->StagesSize(); ++stageIdx) {
833+
auto& stage = tx.Body->GetStages(stageIdx);
834+
if (stage.SourcesSize() > 0 && stage.GetSources(0).GetTypeCase() == NKqpProto::TKqpSource::kExternalSource) {
835+
if (stage.GetStageCost() > 0.0 && stage.GetTaskCount() == 0) {
836+
totalCost += stage.GetStageCost();
837+
result.emplace(stageIdx, TStageScheduleInfo{.StageCost = stage.GetStageCost()});
838+
}
839+
}
840+
}
841+
// assign task counts
842+
if (!result.empty()) {
843+
// allow use 2/3 of threads in single stage
844+
ui32 maxStageTaskCount = (TStagePredictor::GetUsableThreads() * 2 + 2) / 3;
845+
// total limit per mode is x2
846+
ui32 maxTotalTaskCount = maxStageTaskCount * 2;
847+
for (auto& [_, stageInfo] : result) {
848+
// schedule tasks evenly between nodes
849+
stageInfo.TaskCount =
850+
std::max<ui32>(
851+
std::min(static_cast<ui32>(maxTotalTaskCount * stageInfo.StageCost / totalCost), maxStageTaskCount)
852+
, 1
853+
) * resourceSnapshot.size();
854+
}
855+
}
856+
}
857+
return result;
858+
}
859+
822860
void BuildSysViewScanTasks(TStageInfo& stageInfo) {
823861
Y_DEBUG_ABORT_UNLESS(stageInfo.Meta.IsSysView());
824862

@@ -924,7 +962,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
924962
}
925963
}
926964

927-
void BuildReadTasksFromSource(TStageInfo& stageInfo, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot) {
965+
void BuildReadTasksFromSource(TStageInfo& stageInfo, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot, ui32 scheduledTaskCount) {
928966
const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
929967

930968
YQL_ENSURE(stage.GetSources(0).HasExternalSource());
@@ -936,6 +974,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
936974
ui32 taskCount = externalSource.GetPartitionedTaskParams().size();
937975

938976
auto taskCountHint = stage.GetTaskCount();
977+
if (taskCountHint == 0) {
978+
taskCountHint = scheduledTaskCount;
979+
}
980+
939981
if (taskCountHint) {
940982
if (taskCount > taskCountHint) {
941983
taskCount = taskCountHint;

ydb/core/kqp/opt/kqp_statistics_transformer.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,8 @@ void InferStatisticsForDqSourceWrap(const TExprNode::TPtr& input, TTypeAnnotatio
615615
}
616616
}
617617

618+
auto dataSourceStats = stats;
619+
618620
auto rowType = wrapBase.Cast().RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
619621
if (specific->FullRawRowAvgSize == 0.0) {
620622
auto newSpecific = std::make_shared<TS3ProviderStatistics>(*specific);
@@ -659,6 +661,10 @@ void InferStatisticsForDqSourceWrap(const TExprNode::TPtr& input, TTypeAnnotatio
659661
if (newSpecific->Compression) {
660662
stats->Cost *= 1.5;
661663
}
664+
{
665+
auto specific = const_cast<TS3ProviderStatistics*>(dynamic_cast<const TS3ProviderStatistics*>((dataSourceStats->Specific.get())));
666+
specific->Costs[TStructExprType::MakeHash(rowType->GetItems())] = stats->Cost;
667+
}
662668
}
663669
typeCtx->SetStats(input.Get(), stats);
664670
}

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
2222
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
2323
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
24+
#include <ydb/library/yql/providers/s3/statistics/yql_s3_statistics.h>
2425
#include <ydb/library/yql/core/yql_opt_utils.h>
2526

27+
2628
namespace NKikimr {
2729
namespace NKqp {
2830

@@ -672,8 +674,11 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
672674
}
673675
}
674676

677+
double stageCost = 0.0;
675678
VisitExpr(stage.Program().Ptr(), [&](const TExprNode::TPtr& exprNode) {
679+
676680
TExprBase node(exprNode);
681+
677682
if (auto maybeReadTable = node.Maybe<TKqpWideReadTable>()) {
678683
auto readTable = maybeReadTable.Cast();
679684
auto tableMeta = TablesData->ExistingTable(Cluster, readTable.Table().Path()).Metadata;
@@ -754,12 +759,15 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
754759
FillOlapProgram(readTableRanges, miniKqlResultType, *tableMeta, *tableOp.MutableReadOlapRange(), ctx);
755760
FillResultType(miniKqlResultType, *tableOp.MutableReadOlapRange());
756761
tableOp.MutableReadOlapRange()->SetReadType(NKqpProto::TKqpPhyOpReadOlapRanges::BLOCKS);
762+
} else if (auto maybeDqSourceWrapBase = node.Maybe<TDqSourceWrapBase>()) {
763+
stageCost += GetDqSourceWrapBaseCost(maybeDqSourceWrapBase.Cast(), TypesCtx);
757764
} else {
758765
YQL_ENSURE(!node.Maybe<TKqpReadTable>());
759766
}
760767
return true;
761768
});
762769

770+
stageProto.SetStageCost(stageCost);
763771
const auto& secureParams = FindSecureParams(stage.Program().Ptr(), TypesCtx, SecretNames);
764772
stageProto.MutableSecureParams()->insert(secureParams.begin(), secureParams.end());
765773

ydb/core/kqp/query_compiler/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ PEERDIR(
1919
ydb/library/yql/minikql
2020
ydb/library/yql/providers/common/mkql
2121
ydb/library/yql/providers/dq/common
22+
ydb/library/yql/providers/s3/expr_nodes
2223
)
2324

2425
YQL_LAST_ABI_VERSION()

ydb/core/protos/kqp_physical.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ message TKqpPhyStage {
377377
map<string, string> SecureParams = 12;
378378
bool AllowWithSpilling = 13;
379379
uint32 TaskCount = 14;
380+
double StageCost = 15;
380381
}
381382

382383
message TKqpPhyResult {

ydb/library/yql/providers/s3/statistics/ya.make

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
LIBRARY()
22

33
SRCS(
4+
yql_s3_statistics.cpp
45
)
56

67
PEERDIR(
78
ydb/library/yql/core
9+
ydb/library/yql/providers/dq/expr_nodes
10+
ydb/library/yql/providers/s3/expr_nodes
811
)
912

1013
YQL_LAST_ABI_VERSION()
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
2+
3+
#include "yql_s3_statistics.h"
4+
5+
6+
namespace NYql {
7+
8+
double GetDqSourceWrapBaseCost(const NNodes::TDqSourceWrapBase& wrapBase, TTypeAnnotationContext& typeCtx) {
9+
if (auto maybeS3DataSource = wrapBase.DataSource().Maybe<NNodes::TS3DataSource>()) {
10+
auto s3DataSource = maybeS3DataSource.Cast();
11+
auto stats = typeCtx.GetStats(s3DataSource.Raw());
12+
if (stats && stats->Specific) {
13+
const TS3ProviderStatistics* specific = dynamic_cast<const TS3ProviderStatistics*>((stats->Specific.get()));
14+
auto rowType = wrapBase.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
15+
auto it = specific->Costs.find(TStructExprType::MakeHash(rowType->GetItems()));
16+
if (it != specific->Costs.end()) {
17+
return it->second;
18+
}
19+
}
20+
}
21+
return 0.0;
22+
}
23+
24+
} // namespace NYql

ydb/library/yql/providers/s3/statistics/yql_s3_statistics.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
#pragma once
22

33
#include <ydb/library/yql/core/yql_statistics.h>
4+
#include <ydb/library/yql/core/yql_type_annotation.h>
5+
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
6+
#include <map>
47

58
namespace NYql {
69

@@ -13,6 +16,9 @@ struct TS3ProviderStatistics : public IProviderStatistics {
1316
double PrunedDecodedRowAvgSize = 0.0;
1417
TString Format;
1518
TString Compression;
19+
std::unordered_map<ui64, double> Costs;
1620
};
1721

22+
double GetDqSourceWrapBaseCost(const NNodes::TDqSourceWrapBase& wrapBase, TTypeAnnotationContext& typeCtx);
23+
1824
} // namespace NYql

0 commit comments

Comments
 (0)