Skip to content

Commit 49e5dc5

Browse files
committed
YQ CTAS fix for script executions (ydb-platform#9228)
1 parent 0aecd9b commit 49e5dc5

File tree

4 files changed

+51
-25
lines changed

4 files changed

+51
-25
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
236236
break;
237237

238238
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
239-
AsyncCompileResult = KqpHost->PrepareGenericScript(QueryRef, prepareSettings);
239+
AsyncCompileResult = KqpHost->PrepareGenericScript(QueryRef, prepareSettings, SplitExpr);
240240
break;
241241

242242
default:

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,10 +1139,10 @@ class TKqpHost : public IKqpHost {
11391139
});
11401140
}
11411141

1142-
IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings) override {
1142+
IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings, NYql::TExprNode::TPtr expr = nullptr) override {
11431143
return CheckedProcessQuery(*ExprCtx,
1144-
[this, &query, settings] (TExprContext& ctx) mutable {
1145-
return PrepareQueryInternal(query, nullptr, EKikimrQueryType::Script, settings, ctx);
1144+
[this, &query, settings, expr] (TExprContext& ctx) mutable {
1145+
return PrepareQueryInternal(query, expr, EKikimrQueryType::Script, settings, ctx);
11461146
});
11471147
}
11481148

ydb/core/kqp/host/kqp_host.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class IKqpHost : public TThrRefBase {
9090
virtual IAsyncQueryResultPtr PrepareGenericQuery(const TKqpQueryRef& query, const TPrepareSettings& settings, NYql::TExprNode::TPtr expr = nullptr) = 0;
9191

9292
/* Federated queries */
93-
virtual IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings) = 0;
93+
virtual IAsyncQueryResultPtr PrepareGenericScript(const TKqpQueryRef& query, const TPrepareSettings& settings, NYql::TExprNode::TPtr expr = nullptr) = 0;
9494

9595
/* Scripting */
9696
virtual IAsyncQueryResultPtr ValidateYqlScript(const TKqpQueryRef& script) = 0;

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1793,13 +1793,13 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
17931793
ExecuteSelectQuery("test_bucket_execute_script_with_large_file", 5_MB, 500000);
17941794
}
17951795

1796-
std::shared_ptr<TKikimrRunner> CreateSampleDataSource(const TString& externalDataSourceName, const TString& externalTableName) {
1796+
std::shared_ptr<TKikimrRunner> CreateSampleDataSource(const TString& externalDataSourceName, const TString& externalTableName, bool enableOltp) {
17971797
const TString bucket = "test_bucket3";
17981798
const TString object = "test_object";
17991799

18001800
NKikimrConfig::TAppConfig appConfig;
18011801
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
1802-
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
1802+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(enableOltp);
18031803
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
18041804
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
18051805
appConfig.MutableFeatureFlags()->SetEnableTempTables(true);
@@ -1852,8 +1852,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
18521852

18531853
}
18541854

1855-
void ValidateTables(TQueryClient& client, const TString& oltpTable, const TString& olapTable) {
1856-
{
1855+
void ValidateTables(TQueryClient& client, const TString& oltpTable, const TString& olapTable, bool enableOltp) {
1856+
if (enableOltp) {
18571857
const TString query = TStringBuilder() << "SELECT Unwrap(key), Unwrap(value) FROM `" << oltpTable << "`;";
18581858
ValidateResult(client.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync());
18591859
}
@@ -1864,15 +1864,15 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
18641864
}
18651865
}
18661866

1867-
Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSource) {
1867+
void DoCreateTableAsSelectFromExternalDataSource(std::function<void(const TString&, TQueryClient&, const TDriver&)> requestRunner, bool enableOltp) {
18681868
const TString externalDataSourceName = "external_data_source";
18691869
const TString externalTableName = "test_binding_resolve";
18701870

1871-
auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName);
1871+
auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName, enableOltp);
18721872
auto client = kikimr->GetQueryClient();
18731873

18741874
const TString oltpTable = "DestinationOltp";
1875-
{
1875+
if (enableOltp) {
18761876
const TString query = fmt::format(R"(
18771877
PRAGMA TablePathPrefix = "TestDomain";
18781878
CREATE TABLE `{destination}` (
@@ -1889,8 +1889,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
18891889
"destination"_a = oltpTable,
18901890
"external_source"_a = externalDataSourceName
18911891
);
1892-
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1893-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1892+
requestRunner(query, client, kikimr->GetDriver());
18941893
}
18951894

18961895
const TString olapTable = "DestinationOlap";
@@ -1912,22 +1911,43 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
19121911
"destination"_a = olapTable,
19131912
"external_source"_a = externalDataSourceName
19141913
);
1915-
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1916-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1914+
requestRunner(query, client, kikimr->GetDriver());
19171915
}
19181916

1919-
ValidateTables(client, oltpTable, olapTable);
1917+
ValidateTables(client, oltpTable, olapTable, enableOltp);
1918+
}
1919+
1920+
void RunGenericQuery(const TString& query, TQueryClient& client, const TDriver&) {
1921+
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1922+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1923+
}
1924+
1925+
void RunGenericScript(const TString& script, TQueryClient& client, const TDriver& driver) {
1926+
auto scriptExecutionOperation = client.ExecuteScript(script).ExtractValueSync();
1927+
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
1928+
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
1929+
1930+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), driver);
1931+
UNIT_ASSERT_VALUES_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToOneLineString());
19201932
}
19211933

1922-
Y_UNIT_TEST(CreateTableAsSelectFromExternalTable) {
1934+
Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSourceGenericQuery) {
1935+
DoCreateTableAsSelectFromExternalDataSource(&RunGenericQuery, true);
1936+
}
1937+
1938+
Y_UNIT_TEST(CreateTableAsSelectFromExternalDataSourceGenericScript) {
1939+
DoCreateTableAsSelectFromExternalDataSource(&RunGenericScript, false);
1940+
}
1941+
1942+
void DoCreateTableAsSelectFromExternalTable(std::function<void(const TString&, TQueryClient&, const TDriver&)> requestRunner, bool enableOltp) {
19231943
const TString externalDataSourceName = "external_data_source";
19241944
const TString externalTableName = "test_binding_resolve";
19251945

1926-
auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName);
1946+
auto kikimr = CreateSampleDataSource(externalDataSourceName, externalTableName, enableOltp);
19271947
auto client = kikimr->GetQueryClient();
19281948

19291949
const TString oltpTable = "DestinationOltp";
1930-
{
1950+
if (enableOltp) {
19311951
const TString query = fmt::format(R"(
19321952
PRAGMA TablePathPrefix = "TestDomain";
19331953
CREATE TABLE `{destination}` (
@@ -1938,8 +1958,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
19381958
"destination"_a = oltpTable,
19391959
"external_table"_a = externalTableName
19401960
);
1941-
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1942-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1961+
requestRunner(query, client, kikimr->GetDriver());
19431962
}
19441963

19451964
const TString olapTable = "DestinationOlap";
@@ -1955,11 +1974,18 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
19551974
"destination"_a = olapTable,
19561975
"external_table"_a = externalTableName
19571976
);
1958-
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
1959-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
1977+
requestRunner(query, client, kikimr->GetDriver());
19601978
}
19611979

1962-
ValidateTables(client, oltpTable, olapTable);
1980+
ValidateTables(client, oltpTable, olapTable, enableOltp);
1981+
}
1982+
1983+
Y_UNIT_TEST(CreateTableAsSelectFromExternalTableGenericQuery) {
1984+
DoCreateTableAsSelectFromExternalTable(&RunGenericQuery, true);
1985+
}
1986+
1987+
Y_UNIT_TEST(CreateTableAsSelectFromExternalTableGenericScript) {
1988+
DoCreateTableAsSelectFromExternalTable(&RunGenericScript, false);
19631989
}
19641990
}
19651991

0 commit comments

Comments
 (0)