Skip to content

Commit fd935ba

Browse files
authored
CTAS & Temporary for Column Tables (#20292)
1 parent 5d5c752 commit fd935ba

File tree

8 files changed

+115
-70
lines changed

8 files changed

+115
-70
lines changed

ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -197,23 +197,31 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
197197
case NKqpProto::TKqpSchemeOperation::kCreateTable: {
198198
auto modifyScheme = schemeOp.GetCreateTable();
199199
if (Temporary) {
200-
NKikimrSchemeOp::TTableDescription* tableDesc = nullptr;
200+
auto changePath = [this](NKikimrSchemeOp::TTableDescription* tableDesc) {
201+
const auto fullPath = JoinPath({tableDesc->GetPath(), tableDesc->GetName()});
202+
YQL_ENSURE(fullPath.size() > 1);
203+
tableDesc->SetName(GetCreateTempTablePath(Database, SessionId, fullPath));
204+
tableDesc->SetPath(Database);
205+
};
206+
201207
switch (modifyScheme.GetOperationType()) {
202208
case NKikimrSchemeOp::ESchemeOpCreateTable: {
203-
tableDesc = modifyScheme.MutableCreateTable();
209+
changePath(modifyScheme.MutableCreateTable());
204210
break;
205211
}
206212
case NKikimrSchemeOp::ESchemeOpCreateIndexedTable: {
207-
tableDesc = modifyScheme.MutableCreateIndexedTable()->MutableTableDescription();
213+
changePath(modifyScheme.MutableCreateIndexedTable()->MutableTableDescription());
214+
break;
215+
}
216+
case NKikimrSchemeOp::ESchemeOpCreateColumnTable: {
217+
modifyScheme.MutableCreateColumnTable()->SetName(
218+
GetCreateTempTablePath(Database, SessionId, modifyScheme.GetCreateColumnTable().GetName()));
208219
break;
209220
}
210221
default:
211222
YQL_ENSURE(false, "Unexpected operation type");
212223
}
213-
const auto fullPath = JoinPath({tableDesc->GetPath(), tableDesc->GetName()});
214-
YQL_ENSURE(fullPath.size() > 1);
215-
tableDesc->SetName(GetCreateTempTablePath(Database, SessionId, fullPath));
216-
tableDesc->SetPath(Database);
224+
217225
modifyScheme.SetAllowCreateInTempDir(true);
218226
}
219227
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);

ydb/core/kqp/host/kqp_gateway_proxy.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,8 @@ bool FillCreateColumnTableDesc(NYql::TKikimrTableMetadataPtr metadata,
543543
}
544544
}
545545

546+
tableDesc.SetTemporary(metadata->Temporary);
547+
546548
return true;
547549
}
548550

ydb/core/kqp/host/kqp_statement_rewrite.cpp

Lines changed: 31 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,6 @@ namespace {
2020
NYql::TExprNode::TPtr MoveTable = nullptr;
2121
};
2222

23-
bool IsOlap(const NYql::NNodes::TMaybeNode<NYql::NNodes::TCoNameValueTupleList>& tableSettings) {
24-
if (!tableSettings) {
25-
return false;
26-
}
27-
for (const auto& field : tableSettings.Cast()) {
28-
if (field.Name().Value() == "storeType") {
29-
YQL_ENSURE(field.Value().Maybe<NYql::NNodes::TCoAtom>());
30-
if (field.Value().Cast<NYql::NNodes::TCoAtom>().StringValue() == "COLUMN") {
31-
return true;
32-
}
33-
}
34-
}
35-
return false;
36-
}
37-
3823
bool IsCreateTableAs(
3924
NYql::TExprNode::TPtr root,
4025
NYql::TExprContext& exprCtx) {
@@ -187,8 +172,6 @@ namespace {
187172
const auto& insertData = writeArgs.Get(3);
188173
YQL_ENSURE(insertData.Ptr()->Content() != "Void");
189174

190-
const bool isOlap = IsOlap(settings.TableSettings);
191-
192175
const auto pos = insertData.Ref().Pos();
193176

194177
auto type = insertDataPtr->GetTypeAnn();
@@ -243,33 +226,27 @@ namespace {
243226
return std::nullopt;
244227
}
245228

246-
const bool isAtomicOperation = !isOlap;
247-
248229
const TString tmpTableName = TStringBuilder()
249230
<< tableName
250231
<< "_cas_"
251232
<< TAppData::RandomProvider->GenRand();
252233

253-
const TString createTableName = !isAtomicOperation
254-
? tableName
255-
: (TStringBuilder()
234+
const TString createTableName = (TStringBuilder()
256235
<< CanonizePath(AppData()->TenantName)
257236
<< "/.tmp/sessions/"
258237
<< sessionCtx->GetSessionId()
259238
<< CanonizePath(tmpTableName));
260239

261240
create = exprCtx.ReplaceNode(std::move(create), *columns, exprCtx.NewList(pos, std::move(columnNodes)));
262241

263-
if (isAtomicOperation) {
264-
std::vector<NYql::TExprNodePtr> settingsNodes;
265-
for (size_t index = 0; index < create->Child(4)->ChildrenSize(); ++index) {
266-
settingsNodes.push_back(create->Child(4)->ChildPtr(index));
267-
}
268-
settingsNodes.push_back(
269-
exprCtx.NewList(pos, {exprCtx.NewAtom(pos, "temporary")}));
270-
create = exprCtx.ReplaceNode(std::move(create), *create->Child(4), exprCtx.NewList(pos, std::move(settingsNodes)));
271-
create = exprCtx.ReplaceNode(std::move(create), *tableNameNode, exprCtx.NewAtom(pos, tmpTableName));
242+
std::vector<NYql::TExprNodePtr> settingsNodes;
243+
for (size_t index = 0; index < create->Child(4)->ChildrenSize(); ++index) {
244+
settingsNodes.push_back(create->Child(4)->ChildPtr(index));
272245
}
246+
settingsNodes.push_back(
247+
exprCtx.NewList(pos, {exprCtx.NewAtom(pos, "temporary")}));
248+
create = exprCtx.ReplaceNode(std::move(create), *create->Child(4), exprCtx.NewList(pos, std::move(settingsNodes)));
249+
create = exprCtx.ReplaceNode(std::move(create), *tableNameNode, exprCtx.NewAtom(pos, tmpTableName));
273250

274251
NYql::TNodeOnNodeOwnedMap deepClones;
275252
auto insertDataCopy = exprCtx.DeepCopy(insertData.Ref(), exprCtx, deepClones, false, false);
@@ -325,39 +302,37 @@ namespace {
325302
}),
326303
});
327304

328-
if (isAtomicOperation) {
329-
result.MoveTable = exprCtx.NewCallable(pos, "Write!", {
330-
exprCtx.NewWorld(pos),
331-
exprCtx.NewCallable(pos, "DataSink", {
332-
exprCtx.NewAtom(pos, "kikimr"),
333-
exprCtx.NewAtom(pos, "db"),
334-
}),
335-
exprCtx.NewCallable(pos, "Key", {
336-
exprCtx.NewList(pos, {
337-
exprCtx.NewAtom(pos, "tablescheme"),
338-
exprCtx.NewCallable(pos, "String", {
339-
exprCtx.NewAtom(pos, createTableName),
340-
}),
305+
result.MoveTable = exprCtx.NewCallable(pos, "Write!", {
306+
exprCtx.NewWorld(pos),
307+
exprCtx.NewCallable(pos, "DataSink", {
308+
exprCtx.NewAtom(pos, "kikimr"),
309+
exprCtx.NewAtom(pos, "db"),
310+
}),
311+
exprCtx.NewCallable(pos, "Key", {
312+
exprCtx.NewList(pos, {
313+
exprCtx.NewAtom(pos, "tablescheme"),
314+
exprCtx.NewCallable(pos, "String", {
315+
exprCtx.NewAtom(pos, createTableName),
341316
}),
342317
}),
343-
exprCtx.NewCallable(pos, "Void", {}),
318+
}),
319+
exprCtx.NewCallable(pos, "Void", {}),
320+
exprCtx.NewList(pos, {
344321
exprCtx.NewList(pos, {
322+
exprCtx.NewAtom(pos, "mode"),
323+
exprCtx.NewAtom(pos, "alter"),
324+
}),
325+
exprCtx.NewList(pos, {
326+
exprCtx.NewAtom(pos, "actions"),
345327
exprCtx.NewList(pos, {
346-
exprCtx.NewAtom(pos, "mode"),
347-
exprCtx.NewAtom(pos, "alter"),
348-
}),
349-
exprCtx.NewList(pos, {
350-
exprCtx.NewAtom(pos, "actions"),
351328
exprCtx.NewList(pos, {
352-
exprCtx.NewList(pos, {
353-
exprCtx.NewAtom(pos, "renameTo"),
354-
exprCtx.NewAtom(pos, tableName),
355-
}),
329+
exprCtx.NewAtom(pos, "renameTo"),
330+
exprCtx.NewAtom(pos, tableName),
356331
}),
357332
}),
358333
}),
359-
});
360-
}
334+
}),
335+
});
361336

362337
return result;
363338
}

ydb/core/kqp/query_data/kqp_prepared_query.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,16 @@ TKqpPhyTxHolder::GetSchemeOpTempTablePath() const {
131131
tableDesc = &modifyScheme.GetCreateIndexedTable().GetTableDescription();
132132
break;
133133
}
134+
case NKikimrSchemeOp::ESchemeOpCreateColumnTable: {
135+
if (modifyScheme.GetCreateColumnTable().HasTemporary() && modifyScheme.GetCreateColumnTable().GetTemporary()) {
136+
return {{true, {modifyScheme.GetWorkingDir(), modifyScheme.GetCreateColumnTable().GetName()}}};
137+
}
138+
break;
139+
}
134140
default:
135141
return std::nullopt;
136142
}
137-
if (tableDesc->HasTemporary()) {
143+
if (tableDesc && tableDesc->HasTemporary()) {
138144
if (tableDesc->GetTemporary()) {
139145
return {{true, {modifyScheme.GetWorkingDir(), tableDesc->GetName()}}};
140146
}

ydb/core/kqp/ut/query/kqp_query_ut.cpp

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1437,6 +1437,54 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
14371437
}
14381438
}
14391439

1440+
Y_UNIT_TEST(OlapTemporary) {
1441+
NKikimrConfig::TAppConfig appConfig;
1442+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
1443+
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
1444+
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
1445+
auto settings = TKikimrSettings()
1446+
.SetAppConfig(appConfig)
1447+
.SetEnableTempTables(true)
1448+
.SetWithSampleTables(false);
1449+
TKikimrRunner kikimr(settings);
1450+
1451+
auto client = kikimr.GetQueryClient();
1452+
auto session1 = client.GetSession().GetValueSync().GetSession();
1453+
{
1454+
auto result = session1.ExecuteQuery(R"(
1455+
CREATE TEMP TABLE `/Root/test/TestTable` (
1456+
Col1 Uint64 NOT NULL,
1457+
Col2 Int32,
1458+
PRIMARY KEY (Col1)
1459+
)
1460+
WITH (STORE = COLUMN);)",
1461+
NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1462+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
1463+
}
1464+
1465+
auto session2 = client.GetSession().GetValueSync().GetSession();
1466+
{
1467+
// Session2 can't use tmp table
1468+
auto result = session2.ExecuteQuery(R"(
1469+
SELECT * FROM `/Root/test/TestTable`;
1470+
)",
1471+
NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1472+
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString());
1473+
UNIT_ASSERT_C(
1474+
result.GetIssues().ToString().contains("does not exist or you do not have access permissions."),
1475+
result.GetIssues().ToString());
1476+
}
1477+
1478+
{
1479+
// Session1 can use tmp table
1480+
auto result = session1.ExecuteQuery(R"(
1481+
SELECT * FROM `/Root/test/TestTable`;
1482+
)",
1483+
NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1484+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
1485+
}
1486+
}
1487+
14401488
Y_UNIT_TEST(OlapCreateAsSelect_Simple) {
14411489
NKikimrConfig::TAppConfig appConfig;
14421490
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
@@ -1457,11 +1505,11 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
14571505
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
14581506
)";
14591507

1460-
Tests::NCommon::TLoggerInit(kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").Initialize();
1461-
14621508
auto client = kikimr.GetQueryClient();
1463-
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1464-
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
1509+
{
1510+
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1511+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
1512+
}
14651513

14661514
{
14671515
auto prepareResult = client.ExecuteQuery(R"(

ydb/core/protos/flat_scheme_op.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ message TTableDescription {
330330
TTableIncrementalBackupConfig IncrementalBackupConfig = 43;
331331
}
332332

333+
// For kqp internal use only
333334
optional bool Temporary = 41;
334335

335336
// This flag is create-only, and has to be set up
@@ -914,6 +915,9 @@ message TColumnTableDescription {
914915

915916
// Channels for standalone column table
916917
optional TColumnStorageConfig StorageConfig = 13;
918+
919+
// For kqp internal use only
920+
optional bool Temporary = 14;
917921
}
918922

919923
message TAlterColumnTable {

ydb/tests/fq/tools/kqprun.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def add_query(self, sql: str):
4747
self.queries.append(query_path)
4848

4949
def yql_exec(self, verbose: bool = False, check_error: bool = True, var_templates: Optional[List[str]] = None,
50-
yql_program: Optional[str] = None, yql_tables: List[yql_utils.Table] = []) -> yql_utils.YQLExecResult:
50+
yql_program: Optional[str] = None, yql_tables: List[yql_utils.Table] = [], user: Optional[str] = None) -> yql_utils.YQLExecResult:
5151
udfs_dir = self.udfs_dir
5252

5353
config_file = self.config_file
@@ -75,6 +75,8 @@ def yql_exec(self, verbose: bool = False, check_error: bool = True, var_template
7575
'--plan-format json '
7676
'--result-rows-limit 0 '
7777
)
78+
if user is not None:
79+
cmd += f'-U {user} '
7880

7981
if var_templates is not None:
8082
for var_template in var_templates:

ydb/tests/fq/yt/kqp_yt_import/test_ctas.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ def test_simple_ctast(self, kqp_run: KqpRun):
2323
ORDER BY subkey
2424
""")
2525

26-
result = kqp_run.yql_exec(verbose=True)
26+
result = kqp_run.yql_exec(verbose=True, user='root@system')
2727
validate_sample_result(result.results)

0 commit comments

Comments
 (0)