Skip to content

Commit 4fbb0a8

Browse files
authored
Fix CTAS with view (#19365)
1 parent 49c3781 commit 4fbb0a8

File tree

4 files changed

+129
-3
lines changed

4 files changed

+129
-3
lines changed

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1648,7 +1648,13 @@ class TKqpHost : public IKqpHost {
16481648

16491649
YQL_ENSURE(ExprCtxStorage);
16501650

1651-
auto prepareData = PrepareRewrite(compileResult.QueryExpr, *ExprCtxStorage, *TypesCtx, SessionCtx, Cluster);
1651+
auto prepareData = PrepareRewrite(
1652+
compileResult.QueryExpr,
1653+
*ExprCtxStorage,
1654+
*TypesCtx,
1655+
SessionCtx,
1656+
*FuncRegistry,
1657+
Cluster);
16521658

16531659
return MakeIntrusive<TAsyncSplitQueryResult>(
16541660
compileResult.QueryExpr,

ydb/core/kqp/host/kqp_statement_rewrite.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ namespace {
9898
NYql::TExprContext& exprCtx,
9999
NYql::TTypeAnnotationContext& typeCtx,
100100
const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
101+
const NMiniKQL::IFunctionRegistry& funcRegistry,
101102
const TString& cluster) {
102103
NYql::NNodes::TExprBase expr(root);
103104
auto maybeWrite = expr.Maybe<NYql::NNodes::TCoWrite>();
@@ -135,6 +136,7 @@ namespace {
135136

136137
auto typeTransformer = NYql::TTransformationPipeline(&typeCtx)
137138
.AddServiceTransformers()
139+
.AddExpressionEvaluation(funcRegistry)
138140
.AddPreTypeAnnotation()
139141
.AddIOAnnotation()
140142
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(cluster, sessionCtx->TablesPtr(), typeCtx, sessionCtx->ConfigPtr()))
@@ -428,6 +430,7 @@ TPrepareRewriteInfo PrepareRewrite(
428430
NYql::TExprContext& exprCtx,
429431
NYql::TTypeAnnotationContext& typeCtx,
430432
const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
433+
const NMiniKQL::IFunctionRegistry& funcRegistry,
431434
const TString& cluster) {
432435
// CREATE TABLE AS statement can be used only with perstatement execution.
433436
// Thus we assume that there is only one such statement. (it was checked in CheckRewrite)
@@ -440,7 +443,7 @@ TPrepareRewriteInfo PrepareRewrite(
440443
});
441444
YQL_ENSURE(createTableAsNode);
442445

443-
return PrepareCreateTableAs(createTableAsNode, exprCtx, typeCtx, sessionCtx, cluster);
446+
return PrepareCreateTableAs(createTableAsNode, exprCtx, typeCtx, sessionCtx, funcRegistry, cluster);
444447
}
445448

446449
TVector<NYql::TExprNode::TPtr> RewriteExpression(

ydb/core/kqp/host/kqp_statement_rewrite.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ TPrepareRewriteInfo PrepareRewrite(
2525
NYql::TExprContext& exprCtx,
2626
NYql::TTypeAnnotationContext& typeCtx,
2727
const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
28+
const NMiniKQL::IFunctionRegistry& funcRegistry,
2829
const TString& cluster);
2930

3031
TVector<NYql::TExprNode::TPtr> RewriteExpression(

ydb/core/kqp/ut/query/kqp_query_ut.cpp

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2663,7 +2663,123 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
26632663
UPDATE test_table SET data = "a"
26642664
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
26652665
UNIT_ASSERT_VALUES_EQUAL_C(hangingResult.GetStatus(), EStatus::SUCCESS, hangingResult.GetIssues().ToString());
2666-
}
2666+
}
2667+
2668+
Y_UNIT_TEST(CreateAsSelectView) {
2669+
NKikimrConfig::TAppConfig appConfig;
2670+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
2671+
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
2672+
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
2673+
auto settings = TKikimrSettings()
2674+
.SetAppConfig(appConfig)
2675+
.SetWithSampleTables(false);
2676+
TKikimrRunner kikimr(settings);
2677+
2678+
Tests::NCommon::TLoggerInit(kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").Initialize();
2679+
2680+
auto client = kikimr.GetQueryClient();
2681+
2682+
{
2683+
auto result = client.ExecuteQuery( R"(
2684+
CREATE TABLE `l_source` (
2685+
id Uint64,
2686+
num Uint64,
2687+
unused String,
2688+
PRIMARY KEY (id)
2689+
);
2690+
2691+
CREATE TABLE `r_source` (
2692+
id Uint64,
2693+
id2 Uint64,
2694+
unused String,
2695+
PRIMARY KEY (id)
2696+
);
2697+
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2698+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
2699+
}
2700+
2701+
{
2702+
auto result = client.ExecuteQuery( R"(
2703+
CREATE VIEW `l`
2704+
with (security_invoker = TRUE)
2705+
AS (
2706+
SELECT
2707+
id,
2708+
num
2709+
FROM
2710+
`l_source`
2711+
);
2712+
2713+
CREATE VIEW `r`
2714+
with (security_invoker = TRUE)
2715+
AS (
2716+
SELECT
2717+
id,
2718+
id2
2719+
FROM
2720+
`r_source`
2721+
);
2722+
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2723+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
2724+
}
2725+
2726+
{
2727+
auto prepareResult = client.ExecuteQuery(R"(
2728+
INSERT INTO `/Root/l_source` (id, num) VALUES
2729+
(1u, 1u), (100u, 100u), (10u, 10u);
2730+
INSERT INTO `/Root/r_source` (id, id2) VALUES
2731+
(1u, 1u), (100u, 100u), (10u, 10u);
2732+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
2733+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
2734+
}
2735+
2736+
{
2737+
auto prepareResult = client.ExecuteQuery(R"(
2738+
CREATE TABLE `table1`
2739+
(PRIMARY KEY (id))
2740+
AS (
2741+
SELECT
2742+
id, num
2743+
FROM `l`
2744+
)
2745+
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2746+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
2747+
}
2748+
2749+
{
2750+
auto it = client.StreamExecuteQuery(R"(
2751+
SELECT id, num FROM `/Root/table1` ORDER BY id;
2752+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
2753+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
2754+
TString output = StreamResultToYson(it);
2755+
CompareYson(output, R"([[[1u];[1u]];[[10u];[10u]];[[100u];[100u]]])");
2756+
}
2757+
2758+
{
2759+
auto prepareResult = client.ExecuteQuery(R"(
2760+
CREATE TABLE `table2`
2761+
(PRIMARY KEY (id2))
2762+
AS (
2763+
SELECT
2764+
r.id2 AS id2,
2765+
sum(l.num) AS num
2766+
FROM `l` AS l
2767+
LEFT JOIN `r` AS r ON l.id = r.id
2768+
GROUP BY r.id2
2769+
)
2770+
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2771+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
2772+
}
2773+
2774+
{
2775+
auto it = client.StreamExecuteQuery(R"(
2776+
SELECT id2, num FROM `/Root/table2` ORDER BY id2;
2777+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
2778+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
2779+
TString output = StreamResultToYson(it);
2780+
CompareYson(output, R"([[[1u];[1u]];[[10u];[10u]];[[100u];[100u]]])");
2781+
}
2782+
}
26672783
}
26682784

26692785
} // namespace NKqp

0 commit comments

Comments
 (0)