diff --git a/ydb/core/kqp/host/kqp_statement_rewrite.cpp b/ydb/core/kqp/host/kqp_statement_rewrite.cpp index fe34a40e7f9b..750b103c5b9d 100644 --- a/ydb/core/kqp/host/kqp_statement_rewrite.cpp +++ b/ydb/core/kqp/host/kqp_statement_rewrite.cpp @@ -278,7 +278,9 @@ namespace { create = exprCtx.ReplaceNode(std::move(create), *tableNameNode, exprCtx.NewAtom(pos, tmpTableName)); } - const auto topLevelRead = NYql::FindTopLevelRead(insertData.Ptr()); + NYql::TNodeOnNodeOwnedMap deepClones; + auto insertDataCopy = exprCtx.DeepCopy(insertData.Ref(), exprCtx, deepClones, false, false); + const auto topLevelRead = NYql::FindTopLevelRead(insertDataCopy); NYql::TExprNode::TListType insertSettings; insertSettings.push_back( @@ -305,7 +307,7 @@ namespace { }), }), }), - insertData.Ptr(), + insertDataCopy, exprCtx.NewList(pos, std::move(insertSettings)), }); diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_s3_plan_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_s3_plan_ut.cpp index 9f82728cf597..3b66d3c89ef6 100644 --- a/ydb/core/kqp/ut/federated_query/s3/kqp_s3_plan_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/s3/kqp_s3_plan_ut.cpp @@ -159,6 +159,69 @@ Y_UNIT_TEST_SUITE(KqpS3PlanTest) { UNIT_ASSERT_VALUES_EQUAL(sourceOp["ReadColumns"].GetArraySafe()[0].GetStringSafe(), "key"); UNIT_ASSERT_VALUES_EQUAL(sourceOp["ReadColumns"].GetArraySafe()[1].GetStringSafe(), "value"); } + + Y_UNIT_TEST(S3CreateTableAsSelect) { + { + Aws::S3::S3Client s3Client = MakeS3Client(); + CreateBucketWithObject("test_ctas_read", "test_ctas_read1", TEST_CONTENT, s3Client); + UploadObject("test_ctas_read", "test_ctas_read2", TEST_CONTENT, s3Client); + } + + auto kikimr = NTestUtils::MakeKikimrRunner(); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"sql( + CREATE EXTERNAL DATA SOURCE read_data_source WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{read_location}", + AUTH_METHOD="NONE" + ); + CREATE EXTERNAL TABLE read_table ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="read_data_source", + LOCATION="/", + FORMAT="json_each_row" + ); + )sql", + "read_location"_a = GetBucketLocation("test_ctas_read") + ); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + const TString sql = R"sql( + PRAGMA ydb.MaxTasksPerStage = "1"; + + CREATE TABLE result_table ( + PRIMARY KEY (key) + ) + WITH (STORE = COLUMN) + AS SELECT * FROM read_table + )sql"; + + auto queryClient = kikimr->GetQueryClient(); + TExecuteQueryResult queryResult = queryClient.ExecuteQuery( + sql, + TTxControl::NoTx(), + TExecuteQuerySettings().StatsMode(EStatsMode::Full)).GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(queryResult.GetStatus(), NYdb::EStatus::SUCCESS, queryResult.GetIssues().ToString()); + UNIT_ASSERT(queryResult.GetStats()); + UNIT_ASSERT(queryResult.GetStats()->GetPlan()); + Cerr << "Plan: " << *queryResult.GetStats()->GetPlan() << Endl; + NJson::TJsonValue plan; + UNIT_ASSERT(NJson::ReadJsonTree(*queryResult.GetStats()->GetPlan(), &plan)); + + const auto& writeStagePlan = plan["Plan"]["Plans"][0]["Plans"][0]; + UNIT_ASSERT_VALUES_EQUAL(writeStagePlan["Node Type"].GetStringSafe(), "Stage-Sink"); + UNIT_ASSERT_VALUES_EQUAL(writeStagePlan["Stats"]["Tasks"], 1); + + const auto& readStagePlan = plan["Plan"]["Plans"][0]["Plans"][0]["Plans"][0]["Plans"][0]; + UNIT_ASSERT_VALUES_EQUAL(readStagePlan["Node Type"].GetStringSafe(), "Stage"); + UNIT_ASSERT_VALUES_EQUAL(readStagePlan["Stats"]["Tasks"], 1); + } } } // namespace NKikimr::NKqp