Skip to content

Commit 45af15c

Browse files
committed
Fix #16475 (#18247)
1 parent 94fa60c commit 45af15c

File tree

7 files changed

+118
-27
lines changed

7 files changed

+118
-27
lines changed

ydb/core/kqp/opt/kqp_query_plan.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,8 +245,8 @@ class TxPlanSerializer {
245245
planNode.TypeName = "Effect";
246246
Visit(TExprBase(stage), planNode);
247247
} else if (stageBase.Outputs()) { // Sink
248+
AFL_ENSURE(stageBase.Outputs().Cast().Size() == 1);
248249
auto& planNode = AddPlanNode(phaseNode);
249-
planNode.TypeName = "Sink";
250250
Visit(TExprBase(stage), planNode);
251251
}
252252
}
@@ -961,7 +961,8 @@ class TxPlanSerializer {
961961
if (auto outputs = expr.Cast<TDqStageBase>().Outputs()) {
962962
for (auto output : outputs.Cast()) {
963963
if (auto sink = output.Maybe<TDqSink>()) {
964-
Visit(sink.Cast(), expr.Cast<TDqStageBase>(), stagePlanNode);
964+
AFL_ENSURE(outputs.Cast().Size() == 1);
965+
Visit(sink.Cast(), expr.Cast<TDqStageBase>(), planNode);
965966
}
966967
}
967968
}

ydb/core/kqp/ut/federated_query/s3/kqp_s3_plan_ut.cpp

Lines changed: 94 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,12 @@ Y_UNIT_TEST_SUITE(KqpS3PlanTest) {
143143
UNIT_ASSERT(NJson::ReadJsonTree(*queryResult.GetStats()->GetPlan(), &plan));
144144

145145
const auto& writeStagePlan = plan["Plan"]["Plans"][0]["Plans"][0];
146-
UNIT_ASSERT_VALUES_EQUAL(writeStagePlan["Node Type"].GetStringSafe(), "Stage-Sink");
147-
UNIT_ASSERT(writeStagePlan["Operators"].GetArraySafe().size() >= 1);
148-
const auto& sinkOp = writeStagePlan["Operators"].GetArraySafe()[0];
146+
UNIT_ASSERT_VALUES_EQUAL(writeStagePlan["Node Type"].GetStringSafe(), "Stage");
147+
148+
const auto& sinkPlan = plan["Plan"]["Plans"][0];
149+
UNIT_ASSERT_VALUES_EQUAL(sinkPlan["Node Type"].GetStringSafe(), "Sink");
150+
UNIT_ASSERT(sinkPlan["Operators"].GetArraySafe().size() >= 1);
151+
const auto& sinkOp = sinkPlan["Operators"].GetArraySafe()[0];
149152
UNIT_ASSERT_VALUES_EQUAL(sinkOp["ExternalDataSource"].GetStringSafe(), "write_data_source");
150153
UNIT_ASSERT_VALUES_EQUAL(sinkOp["Compression"].GetStringSafe(), "gzip");
151154

@@ -218,13 +221,100 @@ Y_UNIT_TEST_SUITE(KqpS3PlanTest) {
218221
UNIT_ASSERT(NJson::ReadJsonTree(*queryResult.GetStats()->GetPlan(), &plan));
219222

220223
const auto& writeStagePlan = plan["Plan"]["Plans"][0]["Plans"][0];
221-
UNIT_ASSERT_VALUES_EQUAL(writeStagePlan["Node Type"].GetStringSafe(), "Stage-Sink");
224+
UNIT_ASSERT_VALUES_EQUAL(writeStagePlan["Node Type"].GetStringSafe(), "Stage");
222225
UNIT_ASSERT_VALUES_EQUAL(writeStagePlan["Stats"]["Tasks"], 1);
223226

227+
const auto& sinkPlan = plan["Plan"]["Plans"][0];
228+
UNIT_ASSERT_VALUES_EQUAL(sinkPlan["Node Type"].GetStringSafe(), "Sink");
229+
UNIT_ASSERT(sinkPlan["Operators"].GetArraySafe().size() >= 1);
230+
const auto& sinkOp = sinkPlan["Operators"].GetArraySafe()[0];
231+
UNIT_ASSERT_VALUES_EQUAL(sinkOp["Name"].GetStringSafe(), "FillTable");
232+
UNIT_ASSERT_VALUES_EQUAL(sinkOp["Table"].GetStringSafe(), "result_table");
233+
224234
const auto& readStagePlan = plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0];
225235
UNIT_ASSERT_VALUES_EQUAL(readStagePlan["Node Type"].GetStringSafe(), "Stage");
226236
UNIT_ASSERT_VALUES_EQUAL(readStagePlan["Stats"]["Tasks"], 1);
227237
}
238+
239+
Y_UNIT_TEST(S3Insert) {
240+
{
241+
Aws::S3::S3Client s3Client = MakeS3Client();
242+
CreateBucket("test_insert", s3Client);
243+
}
244+
245+
NKikimrConfig::TAppConfig appConfig;
246+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
247+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
248+
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
249+
auto kikimr = NTestUtils::MakeKikimrRunner(appConfig);
250+
251+
auto tc = kikimr->GetTableClient();
252+
auto session = tc.CreateSession().GetValueSync().GetSession();
253+
{
254+
const TString query = fmt::format(R"sql(
255+
CREATE EXTERNAL DATA SOURCE insert_data_sink WITH (
256+
SOURCE_TYPE="ObjectStorage",
257+
LOCATION="{insert_location}",
258+
AUTH_METHOD="NONE"
259+
);
260+
)sql",
261+
"insert_location"_a = GetBucketLocation("test_insert")
262+
);
263+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
264+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
265+
}
266+
267+
auto queryClient = kikimr->GetQueryClient();
268+
{
269+
const TString query = R"sql(
270+
CREATE TABLE olap_source (
271+
PRIMARY KEY (data)
272+
) WITH (STORE = COLUMN)
273+
AS SELECT * FROM AS_TABLE([
274+
<|data: "test_data"|>
275+
]);
276+
)sql";
277+
auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
278+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
279+
}
280+
281+
const TString sql = R"sql(
282+
PRAGMA ydb.OverridePlanner = @@ [
283+
{ "tx": 0, "stage": 0, "tasks": 42 }
284+
] @@;
285+
286+
INSERT INTO insert_data_sink.`/test/`
287+
WITH (FORMAT = "parquet")
288+
SELECT * FROM olap_source
289+
)sql";
290+
291+
TExecuteQueryResult queryResult = queryClient.ExecuteQuery(
292+
sql,
293+
TTxControl::NoTx(),
294+
TExecuteQuerySettings().StatsMode(EStatsMode::Full)).GetValueSync();
295+
296+
UNIT_ASSERT_VALUES_EQUAL_C(queryResult.GetStatus(), NYdb::EStatus::SUCCESS, queryResult.GetIssues().ToString());
297+
UNIT_ASSERT(queryResult.GetStats());
298+
UNIT_ASSERT(queryResult.GetStats()->GetPlan());
299+
Cerr << "Plan: " << *queryResult.GetStats()->GetPlan() << Endl;
300+
NJson::TJsonValue plan;
301+
UNIT_ASSERT(NJson::ReadJsonTree(*queryResult.GetStats()->GetPlan(), &plan));
302+
303+
const auto& writeStagePlan = plan["Plan"]["Plans"][0]["Plans"][0];
304+
UNIT_ASSERT_VALUES_EQUAL(writeStagePlan["Node Type"].GetStringSafe(), "Stage");
305+
UNIT_ASSERT_VALUES_EQUAL(writeStagePlan["Stats"]["Tasks"], 42);
306+
307+
const auto& sinkPlan = plan["Plan"]["Plans"][0];
308+
UNIT_ASSERT_VALUES_EQUAL(sinkPlan["Node Type"].GetStringSafe(), "Sink");
309+
UNIT_ASSERT(sinkPlan["Operators"].GetArraySafe().size() >= 1);
310+
const auto& sinkOp = sinkPlan["Operators"].GetArraySafe()[0];
311+
UNIT_ASSERT_VALUES_EQUAL(sinkOp["ExternalDataSource"].GetStringSafe(), "insert_data_sink");
312+
UNIT_ASSERT_VALUES_EQUAL(sinkOp["Extension"].GetStringSafe(), ".parquet");
313+
314+
const auto& readStagePlan = plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0];
315+
UNIT_ASSERT_VALUES_EQUAL(readStagePlan["Node Type"].GetStringSafe(), "TableFullScan");
316+
UNIT_ASSERT_VALUES_EQUAL(readStagePlan["Stats"]["Tasks"], 42);
317+
}
228318
}
229319

230320
} // namespace NKikimr::NKqp

ydb/core/kqp/ut/query/kqp_stats_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ Y_UNIT_TEST_TWIN(DataQueryWithEffects, UseSink) {
279279
NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true);
280280

281281
if (UseSink) {
282-
auto node = FindPlanNodeByKv(plan, "Node Type", "Stage-Sink");
282+
auto node = FindPlanNodeByKv(plan, "Node Type", "Stage");
283283
UNIT_ASSERT_EQUAL(node.GetMap().at("Stats").GetMapSafe().at("Tasks").GetIntegerSafe(), 1);
284284
} else {
285285
auto node = FindPlanNodeByKv(plan, "Node Type", "Upsert-ConstantExpr");

ydb/tests/functional/canonical/canondata/test_sql.TestCanonicalFolder1.test_case_explain.script-script_/explain.script.plan

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -542,11 +542,11 @@
542542
"Name": "Iterator"
543543
}
544544
],
545-
"PlanNodeId": 1,
546-
"Tables": [
547-
"base_explain_script_script/ScriptingTest"
548-
]
545+
"PlanNodeId": 1
549546
}
547+
],
548+
"Tables": [
549+
"base_explain_script_script/ScriptingTest"
550550
]
551551
}
552552
],
@@ -598,11 +598,11 @@
598598
"Name": "Iterator"
599599
}
600600
],
601-
"PlanNodeId": 1,
602-
"Tables": [
603-
"base_explain_script_script/ScriptingTest"
604-
]
601+
"PlanNodeId": 1
605602
}
603+
],
604+
"Tables": [
605+
"base_explain_script_script/ScriptingTest"
606606
]
607607
}
608608
],

ydb/tests/functional/canonical/canondata/test_sql.TestCanonicalFolder1.test_case_join_group_by_lookup.script-script_/join_group_by_lookup.script.plan

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,11 +317,11 @@
317317
"Name": "Iterator"
318318
}
319319
],
320-
"PlanNodeId": 1,
321-
"Tables": [
322-
"base_join_group_by_lookup_script_script/Temp"
323-
]
320+
"PlanNodeId": 1
324321
}
322+
],
323+
"Tables": [
324+
"base_join_group_by_lookup_script_script/Temp"
325325
]
326326
}
327327
],

ydb/tests/functional/canonical/canondata/test_sql.TestCanonicalFolder1.test_case_simple_ct.script-script_/simple_ct.script.plan

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@
3333
"Name": "Iterator"
3434
}
3535
],
36-
"PlanNodeId": 1,
37-
"Tables": [
38-
"base_simple_ct_script_script/Questions"
39-
]
36+
"PlanNodeId": 1
4037
}
38+
],
39+
"Tables": [
40+
"base_simple_ct_script_script/Questions"
4141
]
4242
}
4343
],

ydb/tests/functional/canonical/canondata/test_sql.TestCanonicalFolder1.test_case_table_types.script-script_/table_types.script.plan

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,11 @@
158158
"Name": "Iterator"
159159
}
160160
],
161-
"PlanNodeId": 1,
162-
"Tables": [
163-
"base_table_types_script_script/TableTypes"
164-
]
161+
"PlanNodeId": 1
165162
}
163+
],
164+
"Tables": [
165+
"base_table_types_script_script/TableTypes"
166166
]
167167
}
168168
],

0 commit comments

Comments
 (0)