Skip to content

Commit 3709281

Browse files
authored
Pragma ydb.OverridePlanner to tune runtime execution (#6904)
1 parent ba1a746 commit 3709281

File tree

7 files changed

+211
-2
lines changed

7 files changed

+211
-2
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -962,7 +962,12 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
962962

963963
ui32 taskCount = externalSource.GetPartitionedTaskParams().size();
964964

965-
if (!resourceSnapshot.empty()) {
965+
auto taskCountHint = stage.GetTaskCount();
966+
if (taskCountHint) {
967+
if (taskCount > taskCountHint) {
968+
taskCount = taskCountHint;
969+
}
970+
} else if (!resourceSnapshot.empty()) {
966971
ui32 maxTaskcount = resourceSnapshot.size() * 2;
967972
if (taskCount > maxTaskcount) {
968973
taskCount = maxTaskcount;
@@ -1342,7 +1347,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
13421347
}
13431348

13441349
if (isShuffle) {
1345-
partitionsCount = std::max(partitionsCount, GetMaxTasksAggregation(stageInfo, inputTasks, nodesCount));
1350+
if (stage.GetTaskCount()) {
1351+
partitionsCount = stage.GetTaskCount();
1352+
} else {
1353+
partitionsCount = std::max(partitionsCount, GetMaxTasksAggregation(stageInfo, inputTasks, nodesCount));
1354+
}
13461355
}
13471356

13481357
for (ui32 i = 0; i < partitionsCount; ++i) {

ydb/core/kqp/opt/kqp_query_plan.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2551,6 +2551,7 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
25512551
stats["UseLlvm"] = "undefined";
25522552
}
25532553

2554+
stats["PhysicalStageId"] = (*stat)->GetStageId();
25542555
stats["Tasks"] = (*stat)->GetTotalTasksCount();
25552556

25562557
stats["StageDurationUs"] = (*stat)->GetStageDurationUs();

ydb/core/kqp/provider/yql_kikimr_settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
6767
REGISTER_SETTING(*this, OptEnableOlapPushdown);
6868
REGISTER_SETTING(*this, OptEnableOlapProvideComputeSharding);
6969
REGISTER_SETTING(*this, OverrideStatistics);
70+
REGISTER_SETTING(*this, OverridePlanner);
7071

7172

7273
REGISTER_SETTING(*this, OptUseFinalizeByKey);

ydb/core/kqp/provider/yql_kikimr_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ struct TKikimrSettings {
5252
NCommon::TConfSetting<NDq::EHashJoinMode, false> HashJoinMode;
5353
NCommon::TConfSetting<TString, false> OverrideStatistics;
5454
NCommon::TConfSetting<ui64, false> EnableSpillingNodes;
55+
NCommon::TConfSetting<TString, false> OverridePlanner;
5556

5657
/* Disable optimizer rules */
5758
NCommon::TConfSetting<bool, false> OptDisableTopSort;

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,33 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
515515
CompileTransaction(tx, *queryProto.AddTransactions(), ctx);
516516
}
517517

518+
auto overridePlanner = Config->OverridePlanner.Get();
519+
if (overridePlanner) {
520+
NJson::TJsonReaderConfig jsonConfig;
521+
NJson::TJsonValue jsonNode;
522+
if (NJson::ReadJsonTree(*overridePlanner, &jsonConfig, &jsonNode)) {
523+
for (auto& stageOverride : jsonNode.GetArray()) {
524+
ui32 txId = 0;
525+
if (auto* txNode = stageOverride.GetValueByPath("tx")) {
526+
txId = txNode->GetIntegerSafe();
527+
}
528+
if (txId < static_cast<ui32>(queryProto.GetTransactions().size())) {
529+
auto& tx = *queryProto.MutableTransactions(txId);
530+
ui32 stageId = 0;
531+
if (auto* stageNode = stageOverride.GetValueByPath("stage")) {
532+
stageId = stageNode->GetIntegerSafe();
533+
}
534+
if (stageId < static_cast<ui32>(tx.GetStages().size())) {
535+
auto& stage = *tx.MutableStages(stageId);
536+
if (auto* tasksNode = stageOverride.GetValueByPath("tasks")) {
537+
stage.SetTaskCount(tasksNode->GetIntegerSafe());
538+
}
539+
}
540+
}
541+
}
542+
}
543+
}
544+
518545
for (ui32 i = 0; i < query.Results().Size(); ++i) {
519546
const auto& result = query.Results().Item(i);
520547

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1961,6 +1961,175 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
19611961

19621962
ValidateTables(client, oltpTable, olapTable);
19631963
}
1964+
1965+
Y_UNIT_TEST(OverridePlannerDefaults) {
1966+
const TString root = "/Root/";
1967+
const TString source = "source";
1968+
const TString table1 = "table1";
1969+
const TString table2 = "table2";
1970+
const TString bucket = "bucket";
1971+
const TString object1 = "object1";
1972+
const TString object2 = "object2";
1973+
const TString content1 = "foo,bar\naaa,0\nbbb,2";
1974+
const TString content2 = "foo,bar\naaa,1\nbbb,3";
1975+
1976+
Aws::S3::S3Client s3Client = MakeS3Client();
1977+
CreateBucket(bucket, s3Client);
1978+
UploadObject(bucket, table1 + "/" + object1, content1, s3Client);
1979+
UploadObject(bucket, table1 + "/" + object2, content2, s3Client);
1980+
UploadObject(bucket, table2 + "/" + object1, content1, s3Client);
1981+
UploadObject(bucket, table2 + "/" + object2, content2, s3Client);
1982+
1983+
auto kikimr = NTestUtils::MakeKikimrRunner();
1984+
1985+
auto tc = kikimr->GetTableClient();
1986+
auto session = tc.CreateSession().GetValueSync().GetSession();
1987+
const TString query = fmt::format(R"(
1988+
CREATE EXTERNAL DATA SOURCE `{source}` WITH (
1989+
SOURCE_TYPE="ObjectStorage",
1990+
LOCATION="{location}",
1991+
AUTH_METHOD="NONE"
1992+
);
1993+
CREATE EXTERNAL TABLE `{table1}` (
1994+
foo STRING NOT NULL,
1995+
bar UINT32 NOT NULL
1996+
) WITH (
1997+
DATA_SOURCE="{source}",
1998+
LOCATION="/{location_table1}/",
1999+
FORMAT="csv_with_names"
2000+
);
2001+
CREATE EXTERNAL TABLE `{table2}` (
2002+
foo STRING NOT NULL,
2003+
bar UINT32 NOT NULL
2004+
) WITH (
2005+
DATA_SOURCE="{source}",
2006+
LOCATION="/{location_table2}/",
2007+
FORMAT="csv_with_names"
2008+
);
2009+
)",
2010+
"source"_a = root + source,
2011+
"table1"_a = root + table1,
2012+
"table2"_a = root + table2,
2013+
"location_table1"_a = table1,
2014+
"location_table2"_a = table2,
2015+
"location"_a = TStringBuilder() << GetEnv("S3_ENDPOINT") << '/' << bucket
2016+
);
2017+
2018+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
2019+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
2020+
2021+
ui32 source1_id = 0;
2022+
ui32 source2_id = 0;
2023+
ui32 join_id = 0;
2024+
ui32 limit_id = 0;
2025+
auto queryClient = kikimr->GetQueryClient();
2026+
2027+
{
2028+
// default planner values
2029+
2030+
const TString sql = fmt::format(R"(
2031+
SELECT SUM(t1.bar + t2.bar) as sum FROM `{table1}` as t1 JOIN /*+grace()*/ `{table2}`as t2 ON t1.foo = t2.foo
2032+
)",
2033+
"table1"_a = root + table1,
2034+
"table2"_a = root + table2);
2035+
2036+
TExecuteQueryResult queryResult = queryClient.ExecuteQuery(
2037+
sql,
2038+
TTxControl::BeginTx().CommitTx(),
2039+
TExecuteQuerySettings().ExecMode(EExecMode::Execute).StatsMode(EStatsMode::Full)).GetValueSync();
2040+
2041+
UNIT_ASSERT_C(queryResult.IsSuccess(), queryResult.GetIssues().ToString());
2042+
UNIT_ASSERT(queryResult.GetStats());
2043+
UNIT_ASSERT(queryResult.GetStats()->GetPlan());
2044+
NJson::TJsonValue plan;
2045+
UNIT_ASSERT(NJson::ReadJsonTree(*queryResult.GetStats()->GetPlan(), &plan));
2046+
UNIT_ASSERT_VALUES_EQUAL(plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Stats"]["Tasks"].GetIntegerSafe(), 2);
2047+
UNIT_ASSERT_VALUES_EQUAL(plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][1]["Plans"][0]["Stats"]["Tasks"].GetIntegerSafe(), 2);
2048+
UNIT_ASSERT_VALUES_EQUAL(plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Stats"]["Tasks"].GetIntegerSafe(), 4);
2049+
UNIT_ASSERT_VALUES_EQUAL(plan["Plan"]["Plans"][0]["Plans"][0]["Stats"]["Tasks"].GetIntegerSafe(), 1);
2050+
2051+
source1_id = plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Stats"]["PhysicalStageId"].GetIntegerSafe();
2052+
source2_id = plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][1]["Plans"][0]["Stats"]["PhysicalStageId"].GetIntegerSafe();
2053+
join_id = plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Stats"]["PhysicalStageId"].GetIntegerSafe();
2054+
limit_id = plan["Plan"]["Plans"][0]["Plans"][0]["Stats"]["PhysicalStageId"].GetIntegerSafe();
2055+
}
2056+
2057+
{
2058+
// scale down
2059+
2060+
const TString sql = fmt::format(R"(
2061+
pragma ydb.OverridePlanner = @@ [
2062+
{{ "tx": 0, "stage": {source1_id}, "tasks": 1 }},
2063+
{{ "tx": 0, "stage": {source2_id}, "tasks": 1 }},
2064+
{{ "tx": 0, "stage": {join_id}, "tasks": 1 }},
2065+
{{ "tx": 0, "stage": {limit_id}, "tasks": 1 }}
2066+
] @@;
2067+
2068+
SELECT SUM(t1.bar + t2.bar) as sum FROM `{table1}` as t1 JOIN /*+grace()*/ `{table2}`as t2 ON t1.foo = t2.foo
2069+
)",
2070+
"source1_id"_a = source1_id,
2071+
"source2_id"_a = source2_id,
2072+
"join_id"_a = join_id,
2073+
"limit_id"_a = limit_id,
2074+
"table1"_a = root + table1,
2075+
"table2"_a = root + table2);
2076+
2077+
TExecuteQueryResult queryResult = queryClient.ExecuteQuery(
2078+
sql,
2079+
TTxControl::BeginTx().CommitTx(),
2080+
TExecuteQuerySettings().ExecMode(EExecMode::Execute).StatsMode(EStatsMode::Full)).GetValueSync();
2081+
2082+
UNIT_ASSERT_C(queryResult.IsSuccess(), queryResult.GetIssues().ToString());
2083+
UNIT_ASSERT(queryResult.GetStats());
2084+
UNIT_ASSERT(queryResult.GetStats()->GetPlan());
2085+
NJson::TJsonValue plan;
2086+
UNIT_ASSERT(NJson::ReadJsonTree(*queryResult.GetStats()->GetPlan(), &plan));
2087+
UNIT_ASSERT_VALUES_EQUAL(plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Stats"]["Tasks"].GetIntegerSafe(), 1);
2088+
UNIT_ASSERT_VALUES_EQUAL(plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][1]["Plans"][0]["Stats"]["Tasks"].GetIntegerSafe(), 1);
2089+
UNIT_ASSERT_VALUES_EQUAL(plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Stats"]["Tasks"].GetIntegerSafe(), 1);
2090+
UNIT_ASSERT_VALUES_EQUAL(plan["Plan"]["Plans"][0]["Plans"][0]["Stats"]["Tasks"].GetIntegerSafe(), 1);
2091+
}
2092+
2093+
{
2094+
// scale up
2095+
2096+
const TString sql = fmt::format(R"(
2097+
pragma ydb.OverridePlanner = @@ [
2098+
{{ "tx": 0, "stage": {source1_id}, "tasks": 10 }},
2099+
{{ "tx": 0, "stage": {source2_id}, "tasks": 10 }},
2100+
{{ "tx": 0, "stage": {join_id}, "tasks": 10 }},
2101+
{{ "tx": 0, "stage": {limit_id}, "tasks": 10 }}
2102+
] @@;
2103+
2104+
SELECT SUM(t1.bar + t2.bar) as sum FROM `{table1}` as t1 JOIN /*+grace()*/ `{table2}`as t2 ON t1.foo = t2.foo
2105+
)",
2106+
"source1_id"_a = source1_id,
2107+
"source2_id"_a = source2_id,
2108+
"join_id"_a = join_id,
2109+
"limit_id"_a = limit_id,
2110+
"table1"_a = root + table1,
2111+
"table2"_a = root + table2);
2112+
2113+
TExecuteQueryResult queryResult = queryClient.ExecuteQuery(
2114+
sql,
2115+
TTxControl::BeginTx().CommitTx(),
2116+
TExecuteQuerySettings().ExecMode(EExecMode::Execute).StatsMode(EStatsMode::Full)).GetValueSync();
2117+
2118+
UNIT_ASSERT_C(queryResult.IsSuccess(), queryResult.GetIssues().ToString());
2119+
UNIT_ASSERT(queryResult.GetStats());
2120+
UNIT_ASSERT(queryResult.GetStats()->GetPlan());
2121+
NJson::TJsonValue plan;
2122+
UNIT_ASSERT(NJson::ReadJsonTree(*queryResult.GetStats()->GetPlan(), &plan));
2123+
// only 2 files => sources stay with 2 tasks
2124+
// join scales to 10 tasks
2125+
// limit ignores hint and keeps being in the only task
2126+
UNIT_ASSERT_VALUES_EQUAL(plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Stats"]["Tasks"].GetIntegerSafe(), 2);
2127+
UNIT_ASSERT_VALUES_EQUAL(plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][1]["Plans"][0]["Stats"]["Tasks"].GetIntegerSafe(), 2);
2128+
UNIT_ASSERT_VALUES_EQUAL(plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]["Stats"]["Tasks"].GetIntegerSafe(), 10);
2129+
UNIT_ASSERT_VALUES_EQUAL(plan["Plan"]["Plans"][0]["Plans"][0]["Stats"]["Tasks"].GetIntegerSafe(), 1);
2130+
}
2131+
}
2132+
19642133
}
19652134

19662135
} // namespace NKikimr::NKqp

ydb/core/protos/kqp_physical.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ message TKqpPhyStage {
376376
repeated TKqpSink Sinks = 11;
377377
map<string, string> SecureParams = 12;
378378
bool AllowWithSpilling = 13;
379+
uint32 TaskCount = 14;
379380
}
380381

381382
message TKqpPhyResult {

0 commit comments

Comments
 (0)