Skip to content

Commit 23aa5f7

Browse files
authored
merge to analytics stable YQ-4184 supported OverridePlanner for olap reads (#16032) (#18919)
2 parents 2a7fe82 + a5d9ac5 commit 23aa5f7

File tree

2 files changed

+77
-1
lines changed

2 files changed

+77
-1
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1609,6 +1609,11 @@ class TKqpExecuterBase : public TActor<TDerived> {
16091609
const ui64 /*nodeId*/,
16101610
bool enableShuffleElimination = false
16111611
) const {
1612+
const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
1613+
if (const auto taskCount = stage.GetTaskCount()) {
1614+
return taskCount;
1615+
}
1616+
16121617
ui32 result = 0;
16131618
if (isOlapScan) {
16141619
if (AggregationSettings.HasCSScanThreadsPerNode()) {
@@ -1618,7 +1623,6 @@ class TKqpExecuterBase : public TActor<TDerived> {
16181623
result = predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), {});
16191624
}
16201625
} else {
1621-
const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
16221626
result = AggregationSettings.GetDSScanMinimalThreads();
16231627
if (stage.GetProgram().GetSettings().GetHasSort()) {
16241628
result = std::max(result, AggregationSettings.GetDSBaseSortScanThreads());

ydb/core/kqp/ut/federated_query/s3/kqp_s3_plan_ut.cpp

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,78 @@ Y_UNIT_TEST_SUITE(KqpS3PlanTest) {
225225
UNIT_ASSERT_VALUES_EQUAL(readStagePlan["Node Type"].GetStringSafe(), "Stage");
226226
UNIT_ASSERT_VALUES_EQUAL(readStagePlan["Stats"]["Tasks"], 1);
227227
}
228+
229+
Y_UNIT_TEST(S3Insert) {
230+
{
231+
Aws::S3::S3Client s3Client = MakeS3Client();
232+
CreateBucket("test_insert", s3Client);
233+
}
234+
235+
NKikimrConfig::TAppConfig appConfig;
236+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
237+
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
238+
auto kikimr = NTestUtils::MakeKikimrRunner(appConfig);
239+
240+
auto tc = kikimr->GetTableClient();
241+
auto session = tc.CreateSession().GetValueSync().GetSession();
242+
{
243+
const TString query = fmt::format(R"sql(
244+
CREATE EXTERNAL DATA SOURCE insert_data_sink WITH (
245+
SOURCE_TYPE="ObjectStorage",
246+
LOCATION="{insert_location}",
247+
AUTH_METHOD="NONE"
248+
);
249+
)sql",
250+
"insert_location"_a = GetBucketLocation("test_insert")
251+
);
252+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
253+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
254+
}
255+
256+
auto queryClient = kikimr->GetQueryClient();
257+
{
258+
const TString query = R"sql(
259+
CREATE TABLE olap_source (
260+
PRIMARY KEY (data)
261+
) WITH (STORE = COLUMN)
262+
AS SELECT * FROM AS_TABLE([
263+
<|data: "test_data"|>
264+
]);
265+
)sql";
266+
auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
267+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
268+
}
269+
270+
const TString sql = R"sql(
271+
PRAGMA ydb.OverridePlanner = @@ [
272+
{ "tx": 0, "stage": 0, "tasks": 42 }
273+
] @@;
274+
275+
INSERT INTO insert_data_sink.`/test/`
276+
WITH (FORMAT = "parquet")
277+
SELECT * FROM olap_source
278+
)sql";
279+
280+
TExecuteQueryResult queryResult = queryClient.ExecuteQuery(
281+
sql,
282+
TTxControl::NoTx(),
283+
TExecuteQuerySettings().StatsMode(EStatsMode::Full)).GetValueSync();
284+
285+
UNIT_ASSERT_VALUES_EQUAL_C(queryResult.GetStatus(), NYdb::EStatus::SUCCESS, queryResult.GetIssues().ToString());
286+
UNIT_ASSERT(queryResult.GetStats());
287+
UNIT_ASSERT(queryResult.GetStats()->GetPlan());
288+
Cerr << "Plan: " << *queryResult.GetStats()->GetPlan() << Endl;
289+
NJson::TJsonValue plan;
290+
UNIT_ASSERT(NJson::ReadJsonTree(*queryResult.GetStats()->GetPlan(), &plan));
291+
292+
const auto& writeStagePlan = plan["Plan"]["Plans"][0]["Plans"][0];
293+
UNIT_ASSERT_VALUES_EQUAL(writeStagePlan["Node Type"].GetStringSafe(), "Stage-Sink");
294+
UNIT_ASSERT_VALUES_EQUAL(writeStagePlan["Stats"]["Tasks"], 42);
295+
296+
const auto& readStagePlan = plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0];
297+
UNIT_ASSERT_VALUES_EQUAL(readStagePlan["Node Type"].GetStringSafe(), "TableFullScan");
298+
UNIT_ASSERT_VALUES_EQUAL(readStagePlan["Stats"]["Tasks"], 42);
299+
}
228300
}
229301

230302
} // namespace NKikimr::NKqp

0 commit comments

Comments
 (0)