Skip to content

Commit d98c32b

Browse files
authored
Allow single CTAS without perstatement (#8241)
1 parent 3be09f1 commit d98c32b

File tree

8 files changed

+126
-32
lines changed

8 files changed

+126
-32
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
172172
}
173173

174174
void StartSplitting(const TActorContext &ctx) {
175-
YQL_ENSURE(PerStatementResult);
176-
177175
const auto prepareSettings = PrepareCompilationSettings(ctx);
178176
auto result = KqpHost->SplitQuery(QueryRef, prepareSettings);
179177

@@ -280,7 +278,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
280278
IKqpHost::TPrepareSettings prepareSettings;
281279
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;
282280
prepareSettings.IsInternalCall = QueryId.Settings.IsInternalCall;
283-
prepareSettings.PerStatementResult = PerStatementResult;
284281

285282
switch (QueryId.Settings.Syntax) {
286283
case Ydb::Query::Syntax::SYNTAX_YQL_V1:

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,8 +1259,13 @@ class TKqpHost : public IKqpHost {
12591259
YQL_CLOG(INFO, ProviderKqp) << "Compiled query:\n" << KqpExprToPrettyString(*queryExpr, ctx);
12601260

12611261
if (Config->EnableCreateTableAs) {
1262-
result.QueryExprs = RewriteExpression(queryExpr, ctx, *TypesCtx, SessionCtx, Cluster);
1262+
auto [rewriteResults, rewriteIssues] = RewriteExpression(queryExpr, ctx, *TypesCtx, SessionCtx, Cluster);
1263+
ctx.IssueManager.AddIssues(rewriteIssues);
1264+
if (!rewriteIssues.Empty()) {
1265+
return result;
1266+
}
12631267

1268+
result.QueryExprs = rewriteResults;
12641269
for (const auto& resultPart : result.QueryExprs) {
12651270
YQL_CLOG(INFO, ProviderKqp) << "Splitted Compiled query part:\n" << KqpExprToPrettyString(*resultPart, ctx);
12661271
}
@@ -1280,7 +1285,7 @@ class TKqpHost : public IKqpHost {
12801285
settingsBuilder
12811286
.SetSqlAutoCommit(false)
12821287
.SetUsePgParser(settings.UsePgParser);
1283-
auto compileResult = CompileYqlQuery(query, /* isSql */ true, *ExprCtx, sqlVersion, settingsBuilder, settings.PerStatementResult);
1288+
auto compileResult = CompileYqlQuery(query, /* isSql */ true, *ExprCtx, sqlVersion, settingsBuilder);
12841289

12851290
return TSplitResult{
12861291
.Ctx = std::move(ExprCtxStorage),
@@ -1290,7 +1295,7 @@ class TKqpHost : public IKqpHost {
12901295
}
12911296

12921297
TCompileExprResult CompileYqlQuery(const TKqpQueryRef& query, bool isSql, TExprContext& ctx, TMaybe<TSqlVersion>& sqlVersion,
1293-
TKqpTranslationSettingsBuilder& settingsBuilder, bool perStatementResult) const
1298+
TKqpTranslationSettingsBuilder& settingsBuilder) const
12941299
{
12951300
auto compileResult = CompileQuery(query, isSql, ctx, sqlVersion, settingsBuilder);
12961301
if (!compileResult.QueryExprs) {
@@ -1302,12 +1307,7 @@ class TKqpHost : public IKqpHost {
13021307
}
13031308

13041309
// Currently used only for create table as
1305-
if (!perStatementResult && compileResult.QueryExprs.size() > 1) {
1306-
ctx.AddError(YqlIssue(TPosition(), TIssuesIds::KIKIMR_BAD_REQUEST,
1307-
"Query can be executed only in per-statement mode (NoTx)"));
1308-
compileResult.QueryExprs = {};
1309-
return compileResult;
1310-
} else if (compileResult.QueryExprs.size() > 1) {
1310+
if (compileResult.QueryExprs.size() > 1) {
13111311
return compileResult;
13121312
}
13131313

@@ -1379,7 +1379,7 @@ class TKqpHost : public IKqpHost {
13791379
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, query.Text, SessionCtx->Config().BindingsMode, GUCSettings);
13801380
settingsBuilder.SetSqlAutoCommit(false)
13811381
.SetUsePgParser(settings.UsePgParser);
1382-
auto compileResult = CompileYqlQuery(query, isSql, ctx, sqlVersion, settingsBuilder, false);
1382+
auto compileResult = CompileYqlQuery(query, isSql, ctx, sqlVersion, settingsBuilder);
13831383
if (compileResult.QueryExprs.empty()) {
13841384
return nullptr;
13851385
}
@@ -1439,7 +1439,7 @@ class TKqpHost : public IKqpHost {
14391439
TMaybe<TSqlVersion> sqlVersion;
14401440
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, query.Text, SessionCtx->Config().BindingsMode, GUCSettings);
14411441
settingsBuilder.SetSqlAutoCommit(false);
1442-
auto compileResult = CompileYqlQuery(query, /* isSql */ true, ctx, sqlVersion, settingsBuilder, false);
1442+
auto compileResult = CompileYqlQuery(query, /* isSql */ true, ctx, sqlVersion, settingsBuilder);
14431443
if (compileResult.QueryExprs.empty()) {
14441444
return nullptr;
14451445
}
@@ -1467,7 +1467,7 @@ class TKqpHost : public IKqpHost {
14671467
TMaybe<TSqlVersion> sqlVersion;
14681468
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, queryAst.Text, SessionCtx->Config().BindingsMode, GUCSettings);
14691469
settingsBuilder.SetSqlAutoCommit(false);
1470-
auto compileResult = CompileYqlQuery(queryAst, false, ctx, sqlVersion, settingsBuilder, false);
1470+
auto compileResult = CompileYqlQuery(queryAst, false, ctx, sqlVersion, settingsBuilder);
14711471
if (compileResult.QueryExprs.empty()) {
14721472
return nullptr;
14731473
}
@@ -1513,7 +1513,7 @@ class TKqpHost : public IKqpHost {
15131513
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, query.Text, SessionCtx->Config().BindingsMode, GUCSettings);
15141514
settingsBuilder.SetSqlAutoCommit(false)
15151515
.SetUsePgParser(settings.UsePgParser);
1516-
auto compileResult = CompileYqlQuery(query, /* isSql */ true, ctx, sqlVersion, settingsBuilder, settings.PerStatementResult);
1516+
auto compileResult = CompileYqlQuery(query, /* isSql */ true, ctx, sqlVersion, settingsBuilder);
15171517
if (compileResult.QueryExprs.empty()) {
15181518
return nullptr;
15191519
}
@@ -1550,7 +1550,7 @@ class TKqpHost : public IKqpHost {
15501550
TMaybe<TSqlVersion> sqlVersion = 1;
15511551
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, query.Text, SessionCtx->Config().BindingsMode, GUCSettings);
15521552
settingsBuilder.SetSqlAutoCommit(false);
1553-
auto compileResult = CompileYqlQuery(query, true, ctx, sqlVersion, settingsBuilder, false);
1553+
auto compileResult = CompileYqlQuery(query, true, ctx, sqlVersion, settingsBuilder);
15541554
if (compileResult.QueryExprs.empty()) {
15551555
return nullptr;
15561556
}
@@ -1571,7 +1571,7 @@ class TKqpHost : public IKqpHost {
15711571
TMaybe<TSqlVersion> sqlVersion;
15721572
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, queryAst.Text, SessionCtx->Config().BindingsMode, GUCSettings);
15731573
settingsBuilder.SetSqlAutoCommit(false);
1574-
auto compileResult = CompileYqlQuery(queryAst, false, ctx, sqlVersion, settingsBuilder, false);
1574+
auto compileResult = CompileYqlQuery(queryAst, false, ctx, sqlVersion, settingsBuilder);
15751575
if (compileResult.QueryExprs.empty()) {
15761576
return nullptr;
15771577
}
@@ -1598,7 +1598,7 @@ class TKqpHost : public IKqpHost {
15981598
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, script.Text, SessionCtx->Config().BindingsMode, GUCSettings);
15991599
settingsBuilder.SetSqlAutoCommit(true)
16001600
.SetUsePgParser(settings.UsePgParser);
1601-
auto compileResult = CompileYqlQuery(script, true, ctx, sqlVersion, settingsBuilder, false);
1601+
auto compileResult = CompileYqlQuery(script, true, ctx, sqlVersion, settingsBuilder);
16021602
if (compileResult.QueryExprs.empty()) {
16031603
return nullptr;
16041604
}
@@ -1627,7 +1627,7 @@ class TKqpHost : public IKqpHost {
16271627
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, script.Text, SessionCtx->Config().BindingsMode, GUCSettings);
16281628
settingsBuilder.SetSqlAutoCommit(true)
16291629
.SetUsePgParser(settings.UsePgParser);
1630-
auto compileResult = CompileYqlQuery(script, true, ctx, sqlVersion, settingsBuilder, false);
1630+
auto compileResult = CompileYqlQuery(script, true, ctx, sqlVersion, settingsBuilder);
16311631
if (compileResult.QueryExprs.empty()) {
16321632
return nullptr;
16331633
}
@@ -1651,7 +1651,7 @@ class TKqpHost : public IKqpHost {
16511651
TMaybe<TSqlVersion> sqlVersion;
16521652
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, script.Text, SessionCtx->Config().BindingsMode, GUCSettings);
16531653
settingsBuilder.SetSqlAutoCommit(true);
1654-
auto compileResult = CompileYqlQuery(script, true, ctx, sqlVersion, settingsBuilder, false);
1654+
auto compileResult = CompileYqlQuery(script, true, ctx, sqlVersion, settingsBuilder);
16551655
if (compileResult.QueryExprs.empty()) {
16561656
return nullptr;
16571657
}
@@ -1679,7 +1679,7 @@ class TKqpHost : public IKqpHost {
16791679
TMaybe<TSqlVersion> sqlVersion;
16801680
TKqpTranslationSettingsBuilder settingsBuilder(SessionCtx->Query().Type, SessionCtx->Config()._KqpYqlSyntaxVersion.Get().GetRef(), Cluster, script.Text, SessionCtx->Config().BindingsMode, GUCSettings);
16811681
settingsBuilder.SetSqlAutoCommit(true);
1682-
auto compileResult = CompileYqlQuery(script, true, ctx, sqlVersion, settingsBuilder, false);
1682+
auto compileResult = CompileYqlQuery(script, true, ctx, sqlVersion, settingsBuilder);
16831683
if (compileResult.QueryExprs.empty()) {
16841684
return nullptr;
16851685
}

ydb/core/kqp/host/kqp_host.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ class IKqpHost : public TThrRefBase {
4545
struct TPrepareSettings: public TExecSettings {
4646
TMaybe<bool> IsInternalCall;
4747
TMaybe<bool> ConcurrentResults;
48-
bool PerStatementResult;
4948

5049
TString ToString() const {
5150
return TStringBuilder() << "TPrepareSettings{"

ydb/core/kqp/host/kqp_statement_rewrite.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,20 +292,25 @@ namespace {
292292
}
293293
}
294294

295-
TVector<NYql::TExprNode::TPtr> RewriteExpression(
295+
std::pair<TVector<NYql::TExprNode::TPtr>, NYql::TIssues> RewriteExpression(
296296
const NYql::TExprNode::TPtr& root,
297297
NYql::TExprContext& exprCtx,
298298
NYql::TTypeAnnotationContext& typeCtx,
299299
const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
300300
const TString& cluster) {
301+
NYql::TIssues issues;
301302
// CREATE TABLE AS statement can be used only with perstatement execution.
302303
// Thus we assume that there is only one such statement.
304+
ui64 actionsCount = 0;
303305
TVector<NYql::TExprNode::TPtr> result;
304306
VisitExpr(root, [&](const NYql::TExprNode::TPtr& node) {
305307
if (NYql::NNodes::TCoWrite::Match(node.Get())) {
308+
++actionsCount;
306309
const auto rewriteResult = RewriteCreateTableAs(node, exprCtx, typeCtx, sessionCtx, cluster);
307310
if (rewriteResult) {
308-
YQL_ENSURE(result.empty());
311+
if (!result.empty()) {
312+
issues.AddIssue("Several CTAS statement can't be used without per-statement mode.");
313+
}
309314
result.push_back(rewriteResult->CreateTable);
310315
result.push_back(rewriteResult->ReplaceInto);
311316
if (rewriteResult->MoveTable) {
@@ -316,10 +321,14 @@ TVector<NYql::TExprNode::TPtr> RewriteExpression(
316321
return true;
317322
});
318323

324+
if (!result.empty() && actionsCount > 1) {
325+
issues.AddIssue("CTAS statement can't be used with other statements without per-statement mode.");
326+
}
327+
319328
if (result.empty()) {
320329
result.push_back(root);
321330
}
322-
return result;
331+
return {result, issues};
323332
}
324333

325334
}

ydb/core/kqp/host/kqp_statement_rewrite.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
namespace NKikimr {
88
namespace NKqp {
99

10-
TVector<NYql::TExprNode::TPtr> RewriteExpression(
10+
std::pair<TVector<NYql::TExprNode::TPtr>, NYql::TIssues> RewriteExpression(
1111
const NYql::TExprNode::TPtr& root,
1212
NYql::TExprContext& ctx,
1313
NYql::TTypeAnnotationContext& typeCtx,

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -581,10 +581,18 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
581581
LWTRACK(KqpSessionQueryCompiled, QueryState->Orbit, TStringBuilder() << QueryState->CompileResult->Status);
582582

583583
if (QueryState->CompileResult->NeedToSplit) {
584-
YQL_ENSURE(!QueryState->HasTxControl() && QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_EXECUTE);
585-
auto ev = QueryState->BuildSplitRequest(CompilationCookie, GUCSettings);
586-
Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId,
587-
QueryState->KqpSessionSpan.GetTraceId());
584+
if (!QueryState->HasTxControl()) {
585+
YQL_ENSURE(QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_EXECUTE);
586+
auto ev = QueryState->BuildSplitRequest(CompilationCookie, GUCSettings);
587+
Send(MakeKqpCompileServiceID(SelfId().NodeId()), ev.release(), 0, QueryState->QueryId,
588+
QueryState->KqpSessionSpan.GetTraceId());
589+
} else {
590+
NYql::TIssues issues;
591+
ReplyQueryError(
592+
::Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST,
593+
"CTAS statement can be executed only in NoTx mode.",
594+
MessageFromIssues(issues));
595+
}
588596
} else {
589597
ReplyQueryCompileError();
590598
}

ydb/core/kqp/ut/query/kqp_query_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1483,7 +1483,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
14831483
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
14841484
UNIT_ASSERT(!prepareResult.IsSuccess());
14851485
UNIT_ASSERT_C(
1486-
prepareResult.GetIssues().ToString().Contains("Query can be executed only in per-statement mode (NoTx)"),
1486+
prepareResult.GetIssues().ToString().Contains("CTAS statement can be executed only in NoTx mode."),
14871487
prepareResult.GetIssues().ToString());
14881488
}
14891489

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2620,6 +2620,87 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
26202620
}
26212621
}
26222622

2623+
Y_UNIT_TEST(CTASWithoutPerStatement) {
2624+
NKikimrConfig::TAppConfig appConfig;
2625+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
2626+
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
2627+
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
2628+
appConfig.MutableTableServiceConfig()->SetEnableAstCache(false);
2629+
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(false);
2630+
auto setting = NKikimrKqp::TKqpSetting();
2631+
auto serverSettings = TKikimrSettings()
2632+
.SetAppConfig(appConfig)
2633+
.SetKqpSettings({setting})
2634+
.SetWithSampleTables(false)
2635+
.SetEnableTempTables(true);
2636+
2637+
TKikimrRunner kikimr(serverSettings);
2638+
auto db = kikimr.GetQueryClient();
2639+
2640+
{
2641+
auto result = db.ExecuteQuery(R"(
2642+
CREATE TABLE Table1 (
2643+
PRIMARY KEY (Key)
2644+
) AS SELECT 1u AS Key, "1" AS Value1, "1" AS Value2;
2645+
CREATE TABLE Table2 (
2646+
PRIMARY KEY (Key)
2647+
) AS SELECT 2u AS Key, "2" AS Value1, "2" AS Value2;
2648+
)", TTxControl::NoTx(), TExecuteQuerySettings()).ExtractValueSync();
2649+
2650+
UNIT_ASSERT(!result.IsSuccess());
2651+
UNIT_ASSERT_C(
2652+
result.GetIssues().ToString().Contains("Several CTAS statement can't be used without per-statement mode."),
2653+
result.GetIssues().ToString());
2654+
}
2655+
2656+
{
2657+
auto result = db.ExecuteQuery(R"(
2658+
CREATE TABLE Table2 (
2659+
PRIMARY KEY (Key)
2660+
) AS SELECT 2u AS Key, "2" AS Value1, "2" AS Value2;
2661+
SELECT * FROM Table1 ORDER BY Key;
2662+
)", TTxControl::NoTx(), TExecuteQuerySettings()).ExtractValueSync();
2663+
2664+
UNIT_ASSERT(!result.IsSuccess());
2665+
UNIT_ASSERT_C(
2666+
result.GetIssues().ToString().Contains("CTAS statement can't be used with other statements without per-statement mode."),
2667+
result.GetIssues().ToString());
2668+
}
2669+
2670+
{
2671+
auto result = db.ExecuteQuery(R"(
2672+
SELECT * FROM Table1 ORDER BY Key;
2673+
CREATE TABLE Table2 (
2674+
PRIMARY KEY (Key)
2675+
) AS SELECT 2u AS Key, "2" AS Value1, "2" AS Value2;
2676+
)", TTxControl::NoTx(), TExecuteQuerySettings()).ExtractValueSync();
2677+
2678+
UNIT_ASSERT(!result.IsSuccess());
2679+
UNIT_ASSERT_C(
2680+
result.GetIssues().ToString().Contains("CTAS statement can't be used with other statements without per-statement mode."),
2681+
result.GetIssues().ToString());
2682+
}
2683+
2684+
{
2685+
auto result = db.ExecuteQuery(R"(
2686+
CREATE TABLE Table1 (
2687+
PRIMARY KEY (Key)
2688+
) AS SELECT 1u AS Key, "1" AS Value1, "1" AS Value2;
2689+
)", TTxControl::NoTx(), TExecuteQuerySettings()).ExtractValueSync();
2690+
2691+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2692+
}
2693+
2694+
{
2695+
auto result = db.ExecuteQuery(R"(
2696+
SELECT * FROM Table1 ORDER BY Key;
2697+
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
2698+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2699+
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
2700+
CompareYson(R"([[[1u];["1"];["1"]]])", FormatResultSetYson(result.GetResultSet(0)));
2701+
}
2702+
}
2703+
26232704
Y_UNIT_TEST(SeveralCTAS) {
26242705
NKikimrConfig::TAppConfig appConfig;
26252706
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);

0 commit comments

Comments
 (0)