Skip to content

Commit 900a13d

Browse files
committed
Fix CTAS with view (#19365)
1 parent cfefaab commit 900a13d

File tree

4 files changed

+129
-3
lines changed

4 files changed

+129
-3
lines changed

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1635,7 +1635,13 @@ class TKqpHost : public IKqpHost {
16351635

16361636
YQL_ENSURE(ExprCtxStorage);
16371637

1638-
auto prepareData = PrepareRewrite(compileResult.QueryExpr, *ExprCtxStorage, *TypesCtx, SessionCtx, Cluster);
1638+
auto prepareData = PrepareRewrite(
1639+
compileResult.QueryExpr,
1640+
*ExprCtxStorage,
1641+
*TypesCtx,
1642+
SessionCtx,
1643+
*FuncRegistry,
1644+
Cluster);
16391645

16401646
return MakeIntrusive<TAsyncSplitQueryResult>(
16411647
compileResult.QueryExpr,

ydb/core/kqp/host/kqp_statement_rewrite.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ namespace {
9898
NYql::TExprContext& exprCtx,
9999
NYql::TTypeAnnotationContext& typeCtx,
100100
const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
101+
const NMiniKQL::IFunctionRegistry& funcRegistry,
101102
const TString& cluster) {
102103
NYql::NNodes::TExprBase expr(root);
103104
auto maybeWrite = expr.Maybe<NYql::NNodes::TCoWrite>();
@@ -135,6 +136,7 @@ namespace {
135136

136137
auto typeTransformer = NYql::TTransformationPipeline(&typeCtx)
137138
.AddServiceTransformers()
139+
.AddExpressionEvaluation(funcRegistry)
138140
.AddPreTypeAnnotation()
139141
.AddIOAnnotation()
140142
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(cluster, sessionCtx->TablesPtr(), typeCtx, sessionCtx->ConfigPtr()))
@@ -428,6 +430,7 @@ TPrepareRewriteInfo PrepareRewrite(
428430
NYql::TExprContext& exprCtx,
429431
NYql::TTypeAnnotationContext& typeCtx,
430432
const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
433+
const NMiniKQL::IFunctionRegistry& funcRegistry,
431434
const TString& cluster) {
432435
// CREATE TABLE AS statement can be used only with perstatement execution.
433436
// Thus we assume that there is only one such statement. (it was checked in CheckRewrite)
@@ -440,7 +443,7 @@ TPrepareRewriteInfo PrepareRewrite(
440443
});
441444
YQL_ENSURE(createTableAsNode);
442445

443-
return PrepareCreateTableAs(createTableAsNode, exprCtx, typeCtx, sessionCtx, cluster);
446+
return PrepareCreateTableAs(createTableAsNode, exprCtx, typeCtx, sessionCtx, funcRegistry, cluster);
444447
}
445448

446449
TVector<NYql::TExprNode::TPtr> RewriteExpression(

ydb/core/kqp/host/kqp_statement_rewrite.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ TPrepareRewriteInfo PrepareRewrite(
2525
NYql::TExprContext& exprCtx,
2626
NYql::TTypeAnnotationContext& typeCtx,
2727
const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
28+
const NMiniKQL::IFunctionRegistry& funcRegistry,
2829
const TString& cluster);
2930

3031
TVector<NYql::TExprNode::TPtr> RewriteExpression(

ydb/core/kqp/ut/query/kqp_query_ut.cpp

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2674,7 +2674,123 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
26742674
UPDATE test_table SET data = "a"
26752675
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
26762676
UNIT_ASSERT_VALUES_EQUAL_C(hangingResult.GetStatus(), EStatus::SUCCESS, hangingResult.GetIssues().ToString());
2677-
}
2677+
}
2678+
2679+
Y_UNIT_TEST(CreateAsSelectView) {
2680+
NKikimrConfig::TAppConfig appConfig;
2681+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
2682+
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
2683+
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
2684+
auto settings = TKikimrSettings()
2685+
.SetAppConfig(appConfig)
2686+
.SetWithSampleTables(false);
2687+
TKikimrRunner kikimr(settings);
2688+
2689+
Tests::NCommon::TLoggerInit(kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").Initialize();
2690+
2691+
auto client = kikimr.GetQueryClient();
2692+
2693+
{
2694+
auto result = client.ExecuteQuery( R"(
2695+
CREATE TABLE `l_source` (
2696+
id Uint64,
2697+
num Uint64,
2698+
unused String,
2699+
PRIMARY KEY (id)
2700+
);
2701+
2702+
CREATE TABLE `r_source` (
2703+
id Uint64,
2704+
id2 Uint64,
2705+
unused String,
2706+
PRIMARY KEY (id)
2707+
);
2708+
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2709+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
2710+
}
2711+
2712+
{
2713+
auto result = client.ExecuteQuery( R"(
2714+
CREATE VIEW `l`
2715+
with (security_invoker = TRUE)
2716+
AS (
2717+
SELECT
2718+
id,
2719+
num
2720+
FROM
2721+
`l_source`
2722+
);
2723+
2724+
CREATE VIEW `r`
2725+
with (security_invoker = TRUE)
2726+
AS (
2727+
SELECT
2728+
id,
2729+
id2
2730+
FROM
2731+
`r_source`
2732+
);
2733+
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2734+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
2735+
}
2736+
2737+
{
2738+
auto prepareResult = client.ExecuteQuery(R"(
2739+
INSERT INTO `/Root/l_source` (id, num) VALUES
2740+
(1u, 1u), (100u, 100u), (10u, 10u);
2741+
INSERT INTO `/Root/r_source` (id, id2) VALUES
2742+
(1u, 1u), (100u, 100u), (10u, 10u);
2743+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
2744+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
2745+
}
2746+
2747+
{
2748+
auto prepareResult = client.ExecuteQuery(R"(
2749+
CREATE TABLE `table1`
2750+
(PRIMARY KEY (id))
2751+
AS (
2752+
SELECT
2753+
id, num
2754+
FROM `l`
2755+
)
2756+
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2757+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
2758+
}
2759+
2760+
{
2761+
auto it = client.StreamExecuteQuery(R"(
2762+
SELECT id, num FROM `/Root/table1` ORDER BY id;
2763+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
2764+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
2765+
TString output = StreamResultToYson(it);
2766+
CompareYson(output, R"([[[1u];[1u]];[[10u];[10u]];[[100u];[100u]]])");
2767+
}
2768+
2769+
{
2770+
auto prepareResult = client.ExecuteQuery(R"(
2771+
CREATE TABLE `table2`
2772+
(PRIMARY KEY (id2))
2773+
AS (
2774+
SELECT
2775+
r.id2 AS id2,
2776+
sum(l.num) AS num
2777+
FROM `l` AS l
2778+
LEFT JOIN `r` AS r ON l.id = r.id
2779+
GROUP BY r.id2
2780+
)
2781+
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
2782+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
2783+
}
2784+
2785+
{
2786+
auto it = client.StreamExecuteQuery(R"(
2787+
SELECT id2, num FROM `/Root/table2` ORDER BY id2;
2788+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
2789+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
2790+
TString output = StreamResultToYson(it);
2791+
CompareYson(output, R"([[[1u];[1u]];[[10u];[10u]];[[100u];[100u]]])");
2792+
}
2793+
}
26782794
}
26792795

26802796
} // namespace NKqp

0 commit comments

Comments
 (0)